Skip to content

Commit 557dac6

Browse files
committed
[ECO-5426] feat: implement handleStateChange method for channel state management
- Added impl. for handling channel attached state - Updated ObjectSyncTracker along with related unit tests
1 parent 4ab8a31 commit 557dac6

8 files changed

Lines changed: 119 additions & 54 deletions

File tree

lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.ably.lib.objects;
22

3+
import io.ably.lib.realtime.ChannelState;
34
import io.ably.lib.types.ProtocolMessage;
45
import org.jetbrains.annotations.NotNull;
56

@@ -30,6 +31,17 @@ public interface LiveObjectsPlugin {
3031
*/
3132
void handle(@NotNull ProtocolMessage message);
3233

34+
/**
35+
* Handles state changes for a specific channel.
36+
* This method is invoked whenever a channel's state changes, allowing the implementation
37+
* to update the LiveObjects instances accordingly based on the new state and presence of objects.
38+
*
39+
* @param channelName the name of the channel whose state has changed.
40+
* @param state the new state of the channel.
41+
* @param hasObjects flag indicates whether the channel has any associated live objects.
42+
*/
43+
void handleStateChange(@NotNull String channelName, @NotNull ChannelState state, boolean hasObjects);
44+
3345
/**
3446
* Disposes of the LiveObjects instance associated with the specified channel name.
3547
* This method removes the LiveObjects instance for the given channel, releasing any

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,15 @@ private void setState(ChannelState newState, ErrorInfo reason, boolean resumed,
145145
this.reason = stateChange.reason;
146146
}
147147

148+
// cover states other than attached, ChannelState.attached already covered in setAttached
149+
if (liveObjectsPlugin != null && newState!= ChannelState.attached) {
150+
try {
151+
liveObjectsPlugin.handleStateChange(name, newState, false);
152+
} catch (Throwable t) {
153+
Log.e(TAG, "Unexpected exception in LiveObjectsPlugin.handle", t);
154+
}
155+
}
156+
148157
if (newState != ChannelState.attaching && newState != ChannelState.suspended) {
149158
this.retryAttempt = 0;
150159
}
@@ -439,6 +448,13 @@ private void setAttached(ProtocolMessage message) {
439448
}
440449
return;
441450
}
451+
if (liveObjectsPlugin != null) {
452+
try {
453+
liveObjectsPlugin.handleStateChange(name, ChannelState.attached, message.hasFlag(Flag.has_objects));
454+
} catch (Throwable t) {
455+
Log.e(TAG, "Unexpected exception in LiveObjectsPlugin.handle", t);
456+
}
457+
}
442458
if(state == ChannelState.attached) {
443459
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
444460
if (!message.hasFlag(Flag.resumed)) { // RTL12

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.ably.lib.objects
22

3+
import io.ably.lib.realtime.ChannelState
34
import io.ably.lib.types.Callback
45
import io.ably.lib.types.ProtocolMessage
56
import io.ably.lib.util.Log
@@ -95,9 +96,9 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
9596

9697
/**
9798
* Handles a ProtocolMessage containing proto action as `object` or `object_sync`.
98-
* @spec RTL1 - Processes incoming object messages and object sync messages
99+
* @spec RTL1 - Processes incoming object messages and object sync messages
99100
*/
100-
fun handle(protocolMessage: ProtocolMessage) {
101+
internal fun handle(protocolMessage: ProtocolMessage) {
101102
// RTL15b - Set channel serial for OBJECT messages
102103
adapter.setChannelSerial(channelName, protocolMessage)
103104

@@ -141,6 +142,40 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
141142
}
142143
}
143144

145+
internal fun handleStateChange(state: ChannelState, hasObjects: Boolean) {
146+
sequentialScope.launch {
147+
when (state) {
148+
ChannelState.attached -> {
149+
Log.v(tag, "Objects.onAttached() channel=$channelName, hasObjects=$hasObjects")
150+
151+
// RTO4a
152+
val fromInitializedState = this@DefaultLiveObjects.state == ObjectsState.INITIALIZED
153+
if (hasObjects || fromInitializedState) {
154+
// should always start a new sync sequence if we're in the initialized state, no matter the HAS_OBJECTS flag value.
155+
// this guarantees we emit both "syncing" -> "synced" events in that order.
156+
objectsManager.startNewSync(null)
157+
}
158+
159+
// RTO4b
160+
if (!hasObjects) {
161+
// if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel.
162+
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
163+
objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2
164+
objectsManager.clearSyncObjectsDataPool() // RTO4b3
165+
objectsManager.clearBufferedObjectOperations() // RTO4b5
166+
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
167+
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
168+
objectsManager.endSync(fromInitializedState) // RTO4b4
169+
}
170+
}
171+
172+
else -> {
173+
// No action needed for other states
174+
}
175+
}
176+
}
177+
}
178+
144179
/**
145180
* Changes the state and emits events.
146181
*

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.ably.lib.objects
22

3+
import io.ably.lib.realtime.ChannelState
34
import io.ably.lib.types.ProtocolMessage
45
import java.util.concurrent.ConcurrentHashMap
56

@@ -16,6 +17,10 @@ public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) :
1617
liveObjects[channelName]?.handle(msg)
1718
}
1819

20+
override fun handleStateChange(channelName: String, state: ChannelState, hasObjects: Boolean) {
21+
liveObjects[channelName]?.handleStateChange(state, hasObjects)
22+
}
23+
1924
override fun dispose(channelName: String) {
2025
liveObjects[channelName]?.dispose()
2126
liveObjects.remove(channelName)

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
7070
*
7171
* @spec RTO5 - Sync sequence initialization
7272
*/
73-
private fun startNewSync(syncId: String?) {
73+
internal fun startNewSync(syncId: String?) {
7474
Log.v(tag, "Starting new sync sequence: syncId=$syncId")
7575

7676
// need to discard all buffered object operation messages on new sync start
@@ -85,7 +85,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
8585
*
8686
* @spec RTO5c - Applies sync data and buffered operations
8787
*/
88-
private fun endSync(deferStateEvent: Boolean) {
88+
internal fun endSync(deferStateEvent: Boolean) {
8989
Log.v(tag, "Ending sync sequence")
9090
applySync()
9191
// should apply buffered object operations after we applied the sync.
@@ -98,6 +98,22 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) {
9898
liveObjects.stateChange(ObjectsState.SYNCED, deferStateEvent)
9999
}
100100

101+
/**
102+
* Clears the sync objects data pool.
103+
* Used by DefaultLiveObjects.handleStateChange.
104+
*/
105+
internal fun clearSyncObjectsDataPool() {
106+
syncObjectsDataPool.clear()
107+
}
108+
109+
/**
110+
* Clears the buffered object operations.
111+
* Used by DefaultLiveObjects.handleStateChange.
112+
*/
113+
internal fun clearBufferedObjectOperations() {
114+
bufferedObjectOperations.clear()
115+
}
116+
101117
/**
102118
* Applies sync data to objects pool.
103119
*

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -63,34 +63,34 @@ internal class ObjectsPool(
6363
/**
6464
* Gets a live object from the pool by object ID.
6565
*/
66-
fun get(objectId: String): BaseLiveObject? {
66+
internal fun get(objectId: String): BaseLiveObject? {
6767
return pool[objectId]
6868
}
6969

7070
/**
7171
* Deletes objects from the pool for which object ids are not found in the provided array of ids.
7272
*/
73-
fun deleteExtraObjectIds(objectIds: MutableSet<String>) {
73+
internal fun deleteExtraObjectIds(objectIds: MutableSet<String>) {
7474
pool.entries.removeIf { (key, _) -> key !in objectIds }
7575
}
7676

7777
/**
7878
* Sets a live object in the pool.
7979
*/
80-
fun set(objectId: String, liveObject: BaseLiveObject) {
80+
internal fun set(objectId: String, liveObject: BaseLiveObject) {
8181
pool[objectId] = liveObject
8282
}
8383

8484
/**
8585
* Removes all objects but root from the pool and clears the data for root.
8686
* Does not create a new root object, so the reference to the root object remains the same.
8787
*/
88-
fun resetToInitialPool(emitUpdateEvents: Boolean) {
88+
internal fun resetToInitialPool(emitUpdateEvents: Boolean) {
8989
// Clear the pool first and keep the root object
9090
val root = pool[ROOT_OBJECT_ID]
9191
if (root != null) {
9292
pool.clear()
93-
pool[ROOT_OBJECT_ID] = root
93+
set(ROOT_OBJECT_ID, root)
9494

9595
// Clear the data, this will only clear the root object
9696
clearObjectsData(emitUpdateEvents)
@@ -178,33 +178,6 @@ internal class ObjectsPool(
178178
fun dispose() {
179179
gcJob?.cancel()
180180
gcScope.cancel()
181-
clear()
181+
pool.clear()
182182
}
183-
184-
/**
185-
* Gets all object IDs in the pool.
186-
* Useful for debugging and testing.
187-
*/
188-
fun getObjectIds(): Set<String> = pool.keys.toSet()
189-
190-
/**
191-
* Gets the size of the pool.
192-
* Useful for debugging and testing.
193-
*/
194-
fun size(): Int = pool.size
195-
196-
/**
197-
* Checks if the pool contains an object with the given ID.
198-
*/
199-
fun contains(objectId: String): Boolean = pool.containsKey(objectId)
200-
201-
/**
202-
* Removes an object from the pool.
203-
*/
204-
fun remove(objectId: String): BaseLiveObject? = pool.remove(objectId)
205-
206-
/**
207-
* Clears all objects from the pool.
208-
*/
209-
fun clear() = pool.clear()
210183
}

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsSyncTracker.kt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,36 @@ package io.ably.lib.objects
55
*/
66
internal class ObjectsSyncTracker(syncChannelSerial: String?) {
77
internal val syncId: String?
8-
private val syncCursor: String?
9-
private val hasEnded: Boolean
8+
internal val syncCursor: String?
9+
private val syncSerial: String? = syncChannelSerial
1010

1111
init {
1212
val parsed = parseSyncChannelSerial(syncChannelSerial)
1313
syncId = parsed.first
1414
syncCursor = parsed.second
15-
hasEnded = syncChannelSerial.isNullOrEmpty() || syncCursor.isNullOrEmpty()
1615
}
1716

1817
/**
1918
* Checks if a new sync sequence has started.
2019
*
2120
* @param prevSyncId The previously stored sync ID
2221
* @return true if a new sync sequence has started, false otherwise
22+
*
23+
* Spec: RTO5a5, RTO5a2
2324
*/
2425
internal fun hasSyncStarted(prevSyncId: String?): Boolean {
25-
return prevSyncId != syncId
26+
return syncSerial.isNullOrEmpty() || prevSyncId != syncId
2627
}
2728

2829
/**
2930
* Checks if the current sync sequence has ended.
3031
*
3132
* @return true if the sync sequence has ended, false otherwise
33+
*
34+
* Spec: RTO5a5, RTO5a4
3235
*/
3336
internal fun hasSyncEnded(): Boolean {
34-
return hasEnded
37+
return syncSerial.isNullOrEmpty() || syncCursor.isNullOrEmpty()
3538
}
3639

3740
companion object {

live-objects/src/test/kotlin/io/ably/lib/objects/unit/ObjectsSyncTrackerTest.kt

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,37 @@ import org.junit.Assert.*
77
class ObjectsSyncTrackerTest {
88

99
@Test
10-
fun `should parse valid sync channel serial with syncId and cursor`() {
10+
fun `(RTO5a, RTO5a1, RTO5a2) Should parse valid sync channel serial with syncId and cursor`() {
1111
val syncTracker = ObjectsSyncTracker("sync-123:cursor-456")
1212

1313
assertEquals("sync-123", syncTracker.syncId)
1414
assertFalse(syncTracker.hasSyncStarted("sync-123"))
15-
1615
assertTrue(syncTracker.hasSyncStarted(null))
1716
assertTrue(syncTracker.hasSyncStarted("sync-124"))
1817

18+
assertEquals("cursor-456", syncTracker.syncCursor)
1919
assertFalse(syncTracker.hasSyncEnded())
2020
}
2121

2222
@Test
23-
fun `should handle null sync channel serial`() {
23+
fun `(RTO5a5) Should handle null sync channel serial`() {
2424
val syncTracker = ObjectsSyncTracker(null)
2525

2626
assertNull(syncTracker.syncId)
27+
assertTrue(syncTracker.hasSyncStarted(null))
28+
29+
assertNull(syncTracker.syncCursor)
2730
assertTrue(syncTracker.hasSyncEnded())
2831
}
2932

3033
@Test
31-
fun `should handle empty sync channel serial`() {
34+
fun `(RTO5a5) Should handle empty sync channel serial`() {
3235
val syncTracker = ObjectsSyncTracker("")
3336

3437
assertNull(syncTracker.syncId)
38+
assertTrue(syncTracker.hasSyncStarted(null))
39+
40+
assertNull(syncTracker.syncCursor)
3541
assertTrue(syncTracker.hasSyncEnded())
3642
}
3743

@@ -40,21 +46,20 @@ class ObjectsSyncTrackerTest {
4046
val syncTracker = ObjectsSyncTracker("sync_123-456:cursor_789-012")
4147

4248
assertEquals("sync_123-456", syncTracker.syncId)
43-
assertFalse(syncTracker.hasSyncEnded())
44-
}
4549

46-
@Test
47-
fun `should detect sync sequence ended when syncChannelSerial is null`() {
48-
val syncTracker = ObjectsSyncTracker(null)
49-
50-
assertTrue(syncTracker.hasSyncEnded())
50+
assertEquals("cursor_789-012", syncTracker.syncCursor)
51+
assertFalse(syncTracker.hasSyncEnded())
5152
}
5253

5354
@Test
54-
fun `should detect sync sequence ended when sync cursor is empty`() {
55+
fun `(RTO5a4) should detect sync sequence ended when sync cursor is empty`() {
5556
val syncTracker = ObjectsSyncTracker("sync-123:")
57+
58+
assertEquals("sync-123", syncTracker.syncId)
5659
assertTrue(syncTracker.hasSyncStarted(null))
5760
assertTrue(syncTracker.hasSyncStarted(""))
61+
62+
assertEquals("", syncTracker.syncCursor)
5863
assertTrue(syncTracker.hasSyncEnded())
5964
}
6065
}

0 commit comments

Comments
 (0)