diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index a1987cd27..3fab010cb 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -1318,6 +1318,7 @@ private synchronized void onConnected(ProtocolMessage message) { maxIdleInterval = connectionDetails.maxIdleInterval; connectionStateTtl = connectionDetails.connectionStateTtl; maxMessageSize = connectionDetails.maxMessageSize; + siteCode = connectionDetails.siteCode; // CD2j /* set the clientId resolved from token, if any */ String clientId = connectionDetails.clientId; @@ -2033,6 +2034,7 @@ private boolean isFatalError(ErrorInfo err) { private CMConnectivityListener connectivityListener; private long connectionStateTtl = Defaults.connectionStateTtl; public int maxMessageSize = Defaults.maxMessageSize; + public String siteCode; // CD2j long maxIdleInterval = Defaults.maxIdleInterval; private int disconnectedRetryAttempt = 0; diff --git a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java index 0977a2350..587b9241f 100644 --- a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java +++ b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java @@ -73,6 +73,13 @@ public class ConnectionDetails { * Spec: CD2f, RTN14e, DF1a */ public Long connectionStateTtl; + /** + * An opaque string identifying the server instance that the client is connected to. + * Used as a key in siteTimeserials maps for LiveObjects operations. + *

+ * Spec: CD2j + */ + public String siteCode; ConnectionDetails() { maxIdleInterval = Defaults.maxIdleInterval; @@ -114,6 +121,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException { case "connectionStateTtl": connectionStateTtl = unpacker.unpackLong(); break; + case "siteCode": + siteCode = unpacker.unpackString(); + break; default: Log.v(TAG, "Unexpected field: " + fieldName); unpacker.skipValue(); diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt index 00401c50e..056969aa8 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt @@ -12,6 +12,7 @@ import io.ably.lib.objects.type.map.LiveMapValue import io.ably.lib.realtime.ChannelState import io.ably.lib.types.AblyException import io.ably.lib.types.ProtocolMessage +import io.ably.lib.types.PublishResult import io.ably.lib.util.Log import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED @@ -31,6 +32,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal internal var state = ObjectsState.Initialized + /** + * Set of serials for operations applied locally upon ACK, awaiting deduplication of the server echo. + * @spec RTO7b, RTO7b1 + */ + internal val appliedOnAckSerials = mutableSetOf() + /** * @spec RTO4 - Used for handling object messages and object sync messages */ @@ -125,13 +132,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal ) ) - // RTO11g - Publish the message - publish(arrayOf(msg)) + // RTO11i - publish and apply locally on ACK + publishAndApply(arrayOf(msg)) - // RTO11h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope - return objectsPool.get(objectId) as? LiveMap ?: withContext(sequentialScope.coroutineContext) { - objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveMap - } + // RTO11h2 - Return existing object if found after apply + return objectsPool.get(objectId) as? LiveMap + ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d } private suspend fun createCounterAsync(initialValue: Number): LiveCounter { @@ -161,13 +167,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal ) ) - // RTO12g - Publish the message - publish(arrayOf(msg)) + // RTO12i - publish and apply locally on ACK + publishAndApply(arrayOf(msg)) - // RTO12h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope - return objectsPool.get(objectId) as? LiveCounter ?: withContext(sequentialScope.coroutineContext) { - objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveCounter - } + // RTO12h2 - Return existing object if found after apply + return objectsPool.get(objectId) as? LiveCounter + ?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d } /** @@ -182,7 +187,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal /** * Spec: RTO15 */ - internal suspend fun publish(objectMessages: Array) { + internal suspend fun publish(objectMessages: Array): PublishResult { // RTO15b, RTL6c - Ensure that the channel is in a valid state for publishing adapter.throwIfUnpublishableState(channelName) adapter.ensureMessageSizeWithinLimit(objectMessages) @@ -190,7 +195,47 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal val protocolMessage = ProtocolMessage(ProtocolMessage.Action.`object`, channelName) protocolMessage.state = objectMessages // RTO15f, RTO15g - Send the ProtocolMessage using the adapter and capture success/failure - adapter.sendAsync(protocolMessage) + return adapter.sendAsync(protocolMessage) // RTO15h + } + + /** + * Publishes the given object messages and, upon receiving the ACK, immediately applies them + * locally as synthetic inbound messages using the assigned serial and connection's siteCode. + * + * Spec: RTO20 + */ + internal suspend fun publishAndApply(objectMessages: Array) { + // RTO20b - publish, propagate failure + val publishResult = publish(objectMessages) + + // RTO20c - validate required info + val siteCode = adapter.connectionManager.siteCode + if (siteCode == null) { + Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed") + return + } + val serials = publishResult.serials + if (serials == null || serials.size != objectMessages.size) { + Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed") + return + } + + // RTO20d - create synthetic inbound ObjectMessages + val syntheticMessages = mutableListOf() + objectMessages.forEachIndexed { i, msg -> + val serial = serials[i] + if (serial == null) { + Log.d(tag, "RTO20d1: serial null at index $i (conflated), skipping") + return@forEachIndexed + } + syntheticMessages.add(msg.copy(serial = serial, siteCode = siteCode)) // RTO20d2a, RTO20d2b, RTO20d3 + } + if (syntheticMessages.isEmpty()) return + + // RTO20e, RTO20f - dispatch to sequential scope for ordering + withContext(sequentialScope.coroutineContext) { + objectsManager.applyAckResult(syntheticMessages) // suspends if SYNCING (RTO20e), applies on SYNCED (RTO20f) + } } /** @@ -272,7 +317,17 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal } } ChannelState.detached, + ChannelState.suspended, ChannelState.failed -> { + // RTO20e1 - fail any publishAndApply operations waiting for sync + val errorReason = try { adapter.getChannel(channelName).reason } catch (e: Exception) { null } + val error = ablyException( + "publishAndApply could not be applied locally: channel entered $state whilst waiting for objects sync", + ErrorCode.PublishAndApplyFailedDueToChannelState, + HttpStatusCode.BadRequest, + cause = errorReason?.let { AblyException.fromErrorInfo(it) } + ) + objectsManager.failBufferedAcks(error) // do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states objectsPool.clearObjectsData(false) objectsManager.clearSyncObjectsDataPool() diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt index 17612b043..1a8d1b8ad 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt @@ -11,6 +11,7 @@ internal enum class ErrorCode(public val code: Int) { // Channel mode and state validation error codes ChannelModeRequired(40_024), ChannelStateError(90_001), + PublishAndApplyFailedDueToChannelState(92_008), } internal enum class HttpStatusCode(public val code: Int) { diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt index 2132c84e9..c46af8135 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt @@ -4,7 +4,9 @@ import io.ably.lib.objects.type.BaseRealtimeObject import io.ably.lib.objects.type.ObjectUpdate import io.ably.lib.objects.type.livecounter.DefaultLiveCounter import io.ably.lib.objects.type.livemap.DefaultLiveMap +import io.ably.lib.types.AblyException import io.ably.lib.util.Log +import kotlinx.coroutines.CompletableDeferred /** * @spec RTO5 - Processes OBJECT and OBJECT_SYNC messages during sync sequences @@ -21,6 +23,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject * @spec RTO7 - Buffered object operations during sync */ private val bufferedObjectOperations = mutableListOf() // RTO7a + /** + * @spec RTO22 - ACK results buffered during sync, with deferred for caller waiting + */ + private val bufferedAcks = mutableListOf, CompletableDeferred>>() /** * Handles object messages (non-sync messages). @@ -39,7 +45,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject } // Apply messages immediately if synced - applyObjectMessages(objectMessages) // RTO8b + applyObjectMessages(objectMessages, ObjectsOperationSource.CHANNEL) // RTO8b } /** @@ -77,6 +83,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject // need to discard all buffered object operation messages on new sync start bufferedObjectOperations.clear() // RTO5a2b syncObjectsDataPool.clear() // RTO5a2a + // RTO21b - clear ACK tracking state on new sync (safety guard; RTO20e1 should have already failed deferreds) + for ((_, deferred) in bufferedAcks) { deferred.cancel() } + bufferedAcks.clear() + realtimeObjects.appliedOnAckSerials.clear() currentSyncId = syncId stateChange(ObjectsState.Syncing, false) } @@ -89,14 +99,46 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject internal fun endSync(deferStateEvent: Boolean) { Log.v(tag, "Ending sync sequence") applySync() - // should apply buffered object operations after we applied the sync. - // can use regular non-sync object.operation logic - applyObjectMessages(bufferedObjectOperations) // RTO5c6 + realtimeObjects.appliedOnAckSerials.clear() // RTO5c9 - sync replaces state + // RTO5c6b: apply buffered ACKs before buffered OBJECT messages (proper dedup ordering) + for ((messages, deferred) in bufferedAcks) { + applyObjectMessages(messages, ObjectsOperationSource.LOCAL) + deferred.complete(Unit) // signal publishAndApply to resume + } + bufferedAcks.clear() + + applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6 bufferedObjectOperations.clear() // RTO5c5 syncObjectsDataPool.clear() // RTO5c4 currentSyncId = null // RTO5c3 - stateChange(ObjectsState.Synced, deferStateEvent) + stateChange(ObjectsState.Synced, deferStateEvent) // RTO5c8 + } + + /** + * Called from publishAndApply (via withContext sequentialScope). + * If SYNCED: apply immediately with LOCAL source. + * If SYNCING: buffer, suspend deferred until endSync processes it (RTO20e). + */ + internal suspend fun applyAckResult(messages: List) { + if (realtimeObjects.state == ObjectsState.Synced) { + applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f + } else { + val deferred = CompletableDeferred() + bufferedAcks.add(Pair(messages, deferred)) + deferred.await() // RTO20e - suspends until endSync completes or channel fails (RTO20e1) + } + } + + /** + * Fails all buffered ACK deferreds. + * Called when the channel enters DETACHED/SUSPENDED/FAILED (RTO20e1). + */ + internal fun failBufferedAcks(error: AblyException) { + for ((_, deferred) in bufferedAcks) { + deferred.completeExceptionally(error) + } + bufferedAcks.clear() } /** @@ -162,7 +204,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject * * @spec RTO9 - Creates zero-value objects if they don't exist */ - private fun applyObjectMessages(objectMessages: List) { + private fun applyObjectMessages( + objectMessages: List, + source: ObjectsOperationSource = ObjectsOperationSource.CHANNEL, + ) { // RTO9a for (objectMessage in objectMessages) { if (objectMessage.operation == null) { @@ -177,6 +222,15 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject Log.w(tag, "Object operation action is unknown, skipping message: ${objectMessage.id}") continue } + + // RTO9a3 - skip operations already applied on ACK + if (objectMessage.serial != null && + realtimeObjects.appliedOnAckSerials.contains(objectMessage.serial)) { + Log.d(tag, "RTO9a3: serial ${objectMessage.serial} already applied on ACK; discarding") + realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial) + continue // discard without taking any further action + } + // RTO9a2a - we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, // we can create a zero-value object for the provided object id and apply the operation to that zero-value object. // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, @@ -184,7 +238,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject // so to simplify operations handling, we always try to create a zero-value object in the pool first, // and then we can always apply the operation on the existing object in the pool. val obj = realtimeObjects.objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId) // RTO9a2a1 - obj.applyObject(objectMessage) // RTO9a2a2, RTO9a2a3 + val applied = obj.applyObject(objectMessage, source) // RTO9a2a2, RTO9a2a3 + if (source == ObjectsOperationSource.LOCAL && applied && objectMessage.serial != null) { + realtimeObjects.appliedOnAckSerials.add(objectMessage.serial) // RTO9a2a4 + } } } @@ -240,6 +297,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject } internal fun dispose() { + for ((_, deferred) in bufferedAcks) { deferred.cancel() } + bufferedAcks.clear() syncObjectsDataPool.clear() bufferedObjectOperations.clear() disposeObjectsStateListeners() diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt new file mode 100644 index 000000000..e850d31b8 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt @@ -0,0 +1,7 @@ +package io.ably.lib.objects + +/** @spec RTO22 */ +internal enum class ObjectsOperationSource { + LOCAL, // RTO22a - applied upon receipt of ACK + CHANNEL // RTO22b - received over a Realtime channel +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt index fa94e0a59..91bfeb011 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt @@ -3,6 +3,7 @@ package io.ably.lib.objects.type import io.ably.lib.objects.ObjectMessage import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState +import io.ably.lib.objects.ObjectsOperationSource import io.ably.lib.objects.ObjectsPoolDefaults import io.ably.lib.objects.objectError import io.ably.lib.objects.type.livecounter.noOpCounterUpdate @@ -66,11 +67,11 @@ internal abstract class BaseRealtimeObject( /** * This is invoked by ObjectMessage having updated data with parent `ProtocolMessageAction` as `object` - * @return an update describing the changes + * @return true if the operation was meaningfully applied, false otherwise * * @spec RTLM15/RTLC7 - Applies ObjectMessage with object data operations to LiveMap/LiveCounter */ - internal fun applyObject(objectMessage: ObjectMessage) { + internal fun applyObject(objectMessage: ObjectMessage, source: ObjectsOperationSource): Boolean { validateObjectId(objectMessage.operation?.objectId) val msgTimeSerial = objectMessage.serial @@ -84,17 +85,18 @@ internal abstract class BaseRealtimeObject( "Skipping ${objectOperation.action} op: op serial $msgTimeSerial <= site serial ${siteTimeserials[msgSiteCode]}; " + "objectId=$objectId" ) - return + return false // RTLC7b / RTLM15b + } + // RTLC7c / RTLM15c - only update siteTimeserials for CHANNEL source + if (source == ObjectsOperationSource.CHANNEL) { + siteTimeserials[msgSiteCode!!] = msgTimeSerial!! // RTLC7c, RTLM15c } - // should update stored site serial immediately. doesn't matter if we successfully apply the op, - // as it's important to mark that the op was processed by the object - siteTimeserials[msgSiteCode!!] = msgTimeSerial!! // RTLC7c, RTLM15c if (isTombstoned) { // this object is tombstoned so the operation cannot be applied - return + return false // RTLC7e / RTLM15e } - applyObjectOperation(objectOperation, objectMessage) // RTLC7d + return applyObjectOperation(objectOperation, objectMessage) // RTLC7d } /** @@ -166,9 +168,10 @@ internal abstract class BaseRealtimeObject( * * @param operation The operation containing the action and data to apply * @param message The complete object message containing the operation + * @return true if the operation was meaningfully applied, false otherwise * */ - abstract fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) + abstract fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage): Boolean /** * Clears the object's data and returns an update describing the changes. diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index b34188b62..164cdb28a 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -85,16 +85,16 @@ internal class DefaultLiveCounter private constructor( ) ) - // RTLC12f - Publish the message - realtimeObjects.publish(arrayOf(msg)) + // RTLC12g - publish and apply locally on ACK + realtimeObjects.publishAndApply(arrayOf(msg)) } override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveCounterUpdate { return liveCounterManager.applyState(objectState, message.serialTimestamp) } - override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) { - liveCounterManager.applyOperation(operation, message.serialTimestamp) + override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage): Boolean { + return liveCounterManager.applyOperation(operation, message.serialTimestamp) } override fun clearData(): LiveCounterUpdate { diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt index d96d65b64..943faf4ce 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt @@ -39,21 +39,32 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter): /** * @spec RTLC7 - Applies operations to LiveCounter */ - internal fun applyOperation(operation: ObjectOperation, serialTimestamp: Long?) { - val update = when (operation.action) { - ObjectOperationAction.CounterCreate -> applyCounterCreate(operation) // RTLC7d1 + internal fun applyOperation(operation: ObjectOperation, serialTimestamp: Long?): Boolean { + return when (operation.action) { + ObjectOperationAction.CounterCreate -> { + val update = applyCounterCreate(operation) // RTLC7d1 + liveCounter.notifyUpdated(update) // RTLC7d1a + true // RTLC7d1b + } ObjectOperationAction.CounterInc -> { if (operation.counterOp != null) { - applyCounterInc(operation.counterOp) // RTLC7d2 + val update = applyCounterInc(operation.counterOp) // RTLC7d2 + liveCounter.notifyUpdated(update) // RTLC7d2a + true // RTLC7d2b } else { throw objectError("No payload found for ${operation.action} op for LiveCounter objectId=${objectId}") } } - ObjectOperationAction.ObjectDelete -> liveCounter.tombstone(serialTimestamp) - else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3 + ObjectOperationAction.ObjectDelete -> { + val update = liveCounter.tombstone(serialTimestamp) + liveCounter.notifyUpdated(update) + true // RTLC7d4b + } + else -> { + Log.w(tag, "Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3 + false + } } - - liveCounter.notifyUpdated(update) // RTLC7d1a, RTLC7d2a } /** diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index 8c2da8e6a..cd0604dbf 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -135,8 +135,8 @@ internal class DefaultLiveMap private constructor( ) ) - // RTLM20f - Publish the message - realtimeObjects.publish(arrayOf(msg)) + // RTLM20g - publish and apply locally on ACK + realtimeObjects.publishAndApply(arrayOf(msg)) } private suspend fun removeAsync(keyName: String) { @@ -157,16 +157,16 @@ internal class DefaultLiveMap private constructor( ) ) - // RTLM21f - Publish the message - realtimeObjects.publish(arrayOf(msg)) + // RTLM21g - publish and apply locally on ACK + realtimeObjects.publishAndApply(arrayOf(msg)) } override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveMapUpdate { return liveMapManager.applyState(objectState, message.serialTimestamp) } - override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) { - liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp) + override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage): Boolean { + return liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp) } override fun clearData(): LiveMapUpdate { diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt index 19a6ef592..90c920cf2 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt @@ -51,28 +51,41 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang /** * @spec RTLM15 - Applies operations to LiveMap */ - internal fun applyOperation(operation: ObjectOperation, serial: String?, serialTimestamp: Long?) { - val update = when (operation.action) { - ObjectOperationAction.MapCreate -> applyMapCreate(operation) // RTLM15d1 + internal fun applyOperation(operation: ObjectOperation, serial: String?, serialTimestamp: Long?): Boolean { + return when (operation.action) { + ObjectOperationAction.MapCreate -> { + val update = applyMapCreate(operation) // RTLM15d1 + liveMap.notifyUpdated(update) // RTLM15d1a + true // RTLM15d1b + } ObjectOperationAction.MapSet -> { if (operation.mapOp != null) { - applyMapSet(operation.mapOp, serial) // RTLM15d2 + val update = applyMapSet(operation.mapOp, serial) // RTLM15d2 + liveMap.notifyUpdated(update) // RTLM15d2a + true // RTLM15d2b } else { throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}") } } ObjectOperationAction.MapRemove -> { if (operation.mapOp != null) { - applyMapRemove(operation.mapOp, serial, serialTimestamp) // RTLM15d3 + val update = applyMapRemove(operation.mapOp, serial, serialTimestamp) // RTLM15d3 + liveMap.notifyUpdated(update) // RTLM15d3a + true // RTLM15d3b } else { throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}") } } - ObjectOperationAction.ObjectDelete -> liveMap.tombstone(serialTimestamp) - else -> throw objectError("Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4 + ObjectOperationAction.ObjectDelete -> { + val update = liveMap.tombstone(serialTimestamp) + liveMap.notifyUpdated(update) + true // RTLM15d5b + } + else -> { + Log.w(tag, "Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4 + false + } } - - liveMap.notifyUpdated(update) // RTLM15d1a, RTLM15d2a, RTLM15d3a } /** diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt index 3e04d9e06..e068b7175 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt @@ -3,6 +3,7 @@ package io.ably.lib.objects.unit.objects import io.ably.lib.objects.* import io.ably.lib.objects.ObjectMessage import io.ably.lib.objects.ObjectState +import io.ably.lib.objects.ObjectsOperationSource import io.ably.lib.objects.ObjectsState import io.ably.lib.objects.type.livecounter.DefaultLiveCounter import io.ably.lib.objects.type.livemap.DefaultLiveMap @@ -148,17 +149,17 @@ class ObjectsManagerTest { val testObject1 = objectsPool.get("map:testObject@1") assertNotNull(testObject1, "map:testObject@1 should exist in pool after sync") verify(exactly = 1) { - testObject1.applyObject(objectMessage1) + testObject1.applyObject(objectMessage1, any()) } val testObject2 = objectsPool.get("counter:testObject@2") assertNotNull(testObject2, "counter:testObject@2 should exist in pool after sync") verify(exactly = 1) { - testObject2.applyObject(objectMessage2) + testObject2.applyObject(objectMessage2, any()) } val testObject3 = objectsPool.get("map:testObject@3") assertNotNull(testObject3, "map:testObject@3 should exist in pool after sync") verify(exactly = 1) { - testObject3.applyObject(objectMessage3) + testObject3.applyObject(objectMessage3, any()) } } @@ -193,7 +194,7 @@ class ObjectsManagerTest { objectsManager.handleObjectMessages(listOf(objectMessage)) verify(exactly = 0) { - objectsManager["applyObjectMessages"](any>()) + objectsManager["applyObjectMessages"](any>(), any()) } assertEquals(1, objectsManager.BufferedObjectOperations.size) assertEquals(objectMessage, objectsManager.BufferedObjectOperations[0]) @@ -202,7 +203,7 @@ class ObjectsManagerTest { // RTO7 - Apply buffered operations after sync objectsManager.endSync(false) // End sync without new sync verify(exactly = 1) { - objectsManager["applyObjectMessages"](any>()) + objectsManager["applyObjectMessages"](any>(), any()) } assertEquals(0, objectsManager.BufferedObjectOperations.size) assertEquals(2, objectsPool.size(), "Pool should contain 2 objects after applying buffered operations") diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt index 77576a907..3f084a6f4 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt @@ -4,6 +4,7 @@ import io.ably.lib.objects.ObjectsCounter import io.ably.lib.objects.ObjectMessage import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectOperationAction +import io.ably.lib.objects.ObjectsOperationSource import io.ably.lib.objects.ObjectState import io.ably.lib.objects.unit.getDefaultLiveCounterWithMockedDeps import io.ably.lib.types.AblyException @@ -57,7 +58,7 @@ class DefaultLiveCounterTest { // RTLC7a - Should throw error when objectId doesn't match val exception = assertFailsWith { - liveCounter.applyObject(message) + liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL) } val errorInfo = exception.errorInfo assertNotNull(errorInfo) @@ -88,7 +89,7 @@ class DefaultLiveCounterTest { ) // RTLC7b - Should skip operation when serial is not newer - liveCounter.applyObject(message) + liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL) // Verify that the site serial was not updated (operation was skipped) assertEquals("serial2", liveCounter.siteTimeserials["site1"]) @@ -115,7 +116,7 @@ class DefaultLiveCounterTest { ) // RTLC7c - Should update site serial when operation is valid - liveCounter.applyObject(message) + liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL) // Verify that the site serial was updated assertEquals("serial2", liveCounter.siteTimeserials["site1"]) diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt index 6c1e49748..fce613bcd 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt @@ -64,7 +64,7 @@ class DefaultLiveCounterManagerTest { @Test - fun `(RTLC7, RTLC7d3) LiveCounterManager should throw error for unsupported action`() { + fun `(RTLC7, RTLC7d3) LiveCounterManager should return false for unsupported action`() { val liveCounter = getDefaultLiveCounterWithMockedDeps() val liveCounterManager = liveCounter.LiveCounterManager @@ -74,15 +74,9 @@ class DefaultLiveCounterManagerTest { map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = emptyMap()) ) - // RTLC7d3 - Should throw error for unsupported action - val exception = assertFailsWith { - liveCounterManager.applyOperation(operation, null) - } - - val errorInfo = exception.errorInfo - assertNotNull(errorInfo) - assertEquals(92000, errorInfo.code) // InvalidObject error code - assertEquals(500, errorInfo.statusCode) // InternalServerError status code + // RTLC7d3 - Should return false for unsupported action (no longer throws) + val result = liveCounterManager.applyOperation(operation, null) + assertFalse(result, "Should return false for unsupported action") } @Test diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt index 783cfe928..98a697768 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt @@ -2,6 +2,7 @@ package io.ably.lib.objects.unit.type.livemap import io.ably.lib.objects.ObjectsMapSemantics import io.ably.lib.objects.ObjectsMap +import io.ably.lib.objects.ObjectsOperationSource import io.ably.lib.objects.ObjectState import io.ably.lib.objects.ObjectMessage import io.ably.lib.objects.ObjectOperation @@ -64,7 +65,7 @@ class DefaultLiveMapTest { // RTLM15a - Should throw error when objectId doesn't match val exception = assertFailsWith { - liveMap.applyObject(message) + liveMap.applyObject(message, ObjectsOperationSource.CHANNEL) } val errorInfo = exception.errorInfo assertNotNull(errorInfo) @@ -98,7 +99,7 @@ class DefaultLiveMapTest { ) // RTLM15b - Should skip operation when serial is not newer - liveMap.applyObject(message) + liveMap.applyObject(message, ObjectsOperationSource.CHANNEL) // Verify that the site serial was not updated (operation was skipped) assertEquals("serial2", liveMap.siteTimeserials["site1"]) @@ -128,7 +129,7 @@ class DefaultLiveMapTest { ) // RTLM15c - Should update site serial when operation is valid - liveMap.applyObject(message) + liveMap.applyObject(message, ObjectsOperationSource.CHANNEL) // Verify that the site serial was updated assertEquals("serial2", liveMap.siteTimeserials["site1"]) diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt index 8f5e37bbd..89bdbe7c5 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt @@ -459,7 +459,7 @@ class LiveMapManagerTest { } @Test - fun `(RTLM15, RTLM15d4) LiveMapManager should throw error for unsupported action`() { + fun `(RTLM15, RTLM15d4) LiveMapManager should return false for unsupported action`() { val liveMap = getDefaultLiveMapWithMockedDeps() val liveMapManager = liveMap.LiveMapManager @@ -469,15 +469,9 @@ class LiveMapManagerTest { counter = ObjectsCounter(count = 20.0) ) - // RTLM15d4 - Should throw error for unsupported action - val exception = assertFailsWith { - liveMapManager.applyOperation(operation, "serial1", null) - } - - val errorInfo = exception.errorInfo - assertNotNull(errorInfo, "Error info should not be null") - assertEquals(92000, errorInfo?.code) // InvalidObject error code - assertEquals(500, errorInfo?.statusCode) // InternalServerError status code + // RTLM15d4 - Should return false for unsupported action (no longer throws) + val result = liveMapManager.applyOperation(operation, "serial1", null) + assertFalse(result, "Should return false for unsupported action") } @Test