diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
index a1987cd27..3fab010cb 100644
--- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
+++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
@@ -1318,6 +1318,7 @@ private synchronized void onConnected(ProtocolMessage message) {
maxIdleInterval = connectionDetails.maxIdleInterval;
connectionStateTtl = connectionDetails.connectionStateTtl;
maxMessageSize = connectionDetails.maxMessageSize;
+ siteCode = connectionDetails.siteCode; // CD2j
/* set the clientId resolved from token, if any */
String clientId = connectionDetails.clientId;
@@ -2033,6 +2034,7 @@ private boolean isFatalError(ErrorInfo err) {
private CMConnectivityListener connectivityListener;
private long connectionStateTtl = Defaults.connectionStateTtl;
public int maxMessageSize = Defaults.maxMessageSize;
+ public String siteCode; // CD2j
long maxIdleInterval = Defaults.maxIdleInterval;
private int disconnectedRetryAttempt = 0;
diff --git a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
index 0977a2350..587b9241f 100644
--- a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
+++ b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
@@ -73,6 +73,13 @@ public class ConnectionDetails {
* Spec: CD2f, RTN14e, DF1a
*/
public Long connectionStateTtl;
+ /**
+ * An opaque string identifying the server instance that the client is connected to.
+ * Used as a key in siteTimeserials maps for LiveObjects operations.
+ *
+ * Spec: CD2j
+ */
+ public String siteCode;
ConnectionDetails() {
maxIdleInterval = Defaults.maxIdleInterval;
@@ -114,6 +121,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException {
case "connectionStateTtl":
connectionStateTtl = unpacker.unpackLong();
break;
+ case "siteCode":
+ siteCode = unpacker.unpackString();
+ break;
default:
Log.v(TAG, "Unexpected field: " + fieldName);
unpacker.skipValue();
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
index 00401c50e..056969aa8 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
@@ -12,6 +12,7 @@ import io.ably.lib.objects.type.map.LiveMapValue
import io.ably.lib.realtime.ChannelState
import io.ably.lib.types.AblyException
import io.ably.lib.types.ProtocolMessage
+import io.ably.lib.types.PublishResult
import io.ably.lib.util.Log
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
@@ -31,6 +32,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
internal var state = ObjectsState.Initialized
+ /**
+ * Set of serials for operations applied locally upon ACK, awaiting deduplication of the server echo.
+ * @spec RTO7b, RTO7b1
+ */
+ internal val appliedOnAckSerials = mutableSetOf()
+
/**
* @spec RTO4 - Used for handling object messages and object sync messages
*/
@@ -125,13 +132,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
)
)
- // RTO11g - Publish the message
- publish(arrayOf(msg))
+ // RTO11i - publish and apply locally on ACK
+ publishAndApply(arrayOf(msg))
- // RTO11h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope
- return objectsPool.get(objectId) as? LiveMap ?: withContext(sequentialScope.coroutineContext) {
- objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveMap
- }
+ // RTO11h2 - Return existing object if found after apply
+ return objectsPool.get(objectId) as? LiveMap
+ ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d
}
private suspend fun createCounterAsync(initialValue: Number): LiveCounter {
@@ -161,13 +167,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
)
)
- // RTO12g - Publish the message
- publish(arrayOf(msg))
+ // RTO12i - publish and apply locally on ACK
+ publishAndApply(arrayOf(msg))
- // RTO12h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope
- return objectsPool.get(objectId) as? LiveCounter ?: withContext(sequentialScope.coroutineContext) {
- objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveCounter
- }
+ // RTO12h2 - Return existing object if found after apply
+ return objectsPool.get(objectId) as? LiveCounter
+ ?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d
}
/**
@@ -182,7 +187,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
/**
* Spec: RTO15
*/
- internal suspend fun publish(objectMessages: Array) {
+ internal suspend fun publish(objectMessages: Array): PublishResult {
// RTO15b, RTL6c - Ensure that the channel is in a valid state for publishing
adapter.throwIfUnpublishableState(channelName)
adapter.ensureMessageSizeWithinLimit(objectMessages)
@@ -190,7 +195,47 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
val protocolMessage = ProtocolMessage(ProtocolMessage.Action.`object`, channelName)
protocolMessage.state = objectMessages
// RTO15f, RTO15g - Send the ProtocolMessage using the adapter and capture success/failure
- adapter.sendAsync(protocolMessage)
+ return adapter.sendAsync(protocolMessage) // RTO15h
+ }
+
+ /**
+ * Publishes the given object messages and, upon receiving the ACK, immediately applies them
+ * locally as synthetic inbound messages using the assigned serial and connection's siteCode.
+ *
+ * Spec: RTO20
+ */
+ internal suspend fun publishAndApply(objectMessages: Array) {
+ // RTO20b - publish, propagate failure
+ val publishResult = publish(objectMessages)
+
+ // RTO20c - validate required info
+ val siteCode = adapter.connectionManager.siteCode
+ if (siteCode == null) {
+ Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
+ return
+ }
+ val serials = publishResult.serials
+ if (serials == null || serials.size != objectMessages.size) {
+ Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed")
+ return
+ }
+
+ // RTO20d - create synthetic inbound ObjectMessages
+ val syntheticMessages = mutableListOf()
+ objectMessages.forEachIndexed { i, msg ->
+ val serial = serials[i]
+ if (serial == null) {
+ Log.d(tag, "RTO20d1: serial null at index $i (conflated), skipping")
+ return@forEachIndexed
+ }
+ syntheticMessages.add(msg.copy(serial = serial, siteCode = siteCode)) // RTO20d2a, RTO20d2b, RTO20d3
+ }
+ if (syntheticMessages.isEmpty()) return
+
+ // RTO20e, RTO20f - dispatch to sequential scope for ordering
+ withContext(sequentialScope.coroutineContext) {
+ objectsManager.applyAckResult(syntheticMessages) // suspends if SYNCING (RTO20e), applies on SYNCED (RTO20f)
+ }
}
/**
@@ -272,7 +317,17 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
}
}
ChannelState.detached,
+ ChannelState.suspended,
ChannelState.failed -> {
+ // RTO20e1 - fail any publishAndApply operations waiting for sync
+ val errorReason = try { adapter.getChannel(channelName).reason } catch (e: Exception) { null }
+ val error = ablyException(
+ "publishAndApply could not be applied locally: channel entered $state whilst waiting for objects sync",
+ ErrorCode.PublishAndApplyFailedDueToChannelState,
+ HttpStatusCode.BadRequest,
+ cause = errorReason?.let { AblyException.fromErrorInfo(it) }
+ )
+ objectsManager.failBufferedAcks(error)
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
objectsPool.clearObjectsData(false)
objectsManager.clearSyncObjectsDataPool()
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt
index 17612b043..1a8d1b8ad 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt
@@ -11,6 +11,7 @@ internal enum class ErrorCode(public val code: Int) {
// Channel mode and state validation error codes
ChannelModeRequired(40_024),
ChannelStateError(90_001),
+ PublishAndApplyFailedDueToChannelState(92_008),
}
internal enum class HttpStatusCode(public val code: Int) {
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
index 2132c84e9..c46af8135 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
@@ -4,7 +4,9 @@ import io.ably.lib.objects.type.BaseRealtimeObject
import io.ably.lib.objects.type.ObjectUpdate
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
import io.ably.lib.objects.type.livemap.DefaultLiveMap
+import io.ably.lib.types.AblyException
import io.ably.lib.util.Log
+import kotlinx.coroutines.CompletableDeferred
/**
* @spec RTO5 - Processes OBJECT and OBJECT_SYNC messages during sync sequences
@@ -21,6 +23,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
* @spec RTO7 - Buffered object operations during sync
*/
private val bufferedObjectOperations = mutableListOf() // RTO7a
+ /**
+ * @spec RTO22 - ACK results buffered during sync, with deferred for caller waiting
+ */
+ private val bufferedAcks = mutableListOf, CompletableDeferred>>()
/**
* Handles object messages (non-sync messages).
@@ -39,7 +45,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}
// Apply messages immediately if synced
- applyObjectMessages(objectMessages) // RTO8b
+ applyObjectMessages(objectMessages, ObjectsOperationSource.CHANNEL) // RTO8b
}
/**
@@ -77,6 +83,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
// need to discard all buffered object operation messages on new sync start
bufferedObjectOperations.clear() // RTO5a2b
syncObjectsDataPool.clear() // RTO5a2a
+ // RTO21b - clear ACK tracking state on new sync (safety guard; RTO20e1 should have already failed deferreds)
+ for ((_, deferred) in bufferedAcks) { deferred.cancel() }
+ bufferedAcks.clear()
+ realtimeObjects.appliedOnAckSerials.clear()
currentSyncId = syncId
stateChange(ObjectsState.Syncing, false)
}
@@ -89,14 +99,46 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
internal fun endSync(deferStateEvent: Boolean) {
Log.v(tag, "Ending sync sequence")
applySync()
- // should apply buffered object operations after we applied the sync.
- // can use regular non-sync object.operation logic
- applyObjectMessages(bufferedObjectOperations) // RTO5c6
+ realtimeObjects.appliedOnAckSerials.clear() // RTO5c9 - sync replaces state
+ // RTO5c6b: apply buffered ACKs before buffered OBJECT messages (proper dedup ordering)
+ for ((messages, deferred) in bufferedAcks) {
+ applyObjectMessages(messages, ObjectsOperationSource.LOCAL)
+ deferred.complete(Unit) // signal publishAndApply to resume
+ }
+ bufferedAcks.clear()
+
+ applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6
bufferedObjectOperations.clear() // RTO5c5
syncObjectsDataPool.clear() // RTO5c4
currentSyncId = null // RTO5c3
- stateChange(ObjectsState.Synced, deferStateEvent)
+ stateChange(ObjectsState.Synced, deferStateEvent) // RTO5c8
+ }
+
+ /**
+ * Called from publishAndApply (via withContext sequentialScope).
+ * If SYNCED: apply immediately with LOCAL source.
+ * If SYNCING: buffer, suspend deferred until endSync processes it (RTO20e).
+ */
+ internal suspend fun applyAckResult(messages: List) {
+ if (realtimeObjects.state == ObjectsState.Synced) {
+ applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
+ } else {
+ val deferred = CompletableDeferred()
+ bufferedAcks.add(Pair(messages, deferred))
+ deferred.await() // RTO20e - suspends until endSync completes or channel fails (RTO20e1)
+ }
+ }
+
+ /**
+ * Fails all buffered ACK deferreds.
+ * Called when the channel enters DETACHED/SUSPENDED/FAILED (RTO20e1).
+ */
+ internal fun failBufferedAcks(error: AblyException) {
+ for ((_, deferred) in bufferedAcks) {
+ deferred.completeExceptionally(error)
+ }
+ bufferedAcks.clear()
}
/**
@@ -162,7 +204,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
*
* @spec RTO9 - Creates zero-value objects if they don't exist
*/
- private fun applyObjectMessages(objectMessages: List) {
+ private fun applyObjectMessages(
+ objectMessages: List,
+ source: ObjectsOperationSource = ObjectsOperationSource.CHANNEL,
+ ) {
// RTO9a
for (objectMessage in objectMessages) {
if (objectMessage.operation == null) {
@@ -177,6 +222,15 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
Log.w(tag, "Object operation action is unknown, skipping message: ${objectMessage.id}")
continue
}
+
+ // RTO9a3 - skip operations already applied on ACK
+ if (objectMessage.serial != null &&
+ realtimeObjects.appliedOnAckSerials.contains(objectMessage.serial)) {
+ Log.d(tag, "RTO9a3: serial ${objectMessage.serial} already applied on ACK; discarding")
+ realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial)
+ continue // discard without taking any further action
+ }
+
// RTO9a2a - we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
@@ -184,7 +238,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
// and then we can always apply the operation on the existing object in the pool.
val obj = realtimeObjects.objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId) // RTO9a2a1
- obj.applyObject(objectMessage) // RTO9a2a2, RTO9a2a3
+ val applied = obj.applyObject(objectMessage, source) // RTO9a2a2, RTO9a2a3
+ if (source == ObjectsOperationSource.LOCAL && applied && objectMessage.serial != null) {
+ realtimeObjects.appliedOnAckSerials.add(objectMessage.serial) // RTO9a2a4
+ }
}
}
@@ -240,6 +297,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}
internal fun dispose() {
+ for ((_, deferred) in bufferedAcks) { deferred.cancel() }
+ bufferedAcks.clear()
syncObjectsDataPool.clear()
bufferedObjectOperations.clear()
disposeObjectsStateListeners()
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt
new file mode 100644
index 000000000..e850d31b8
--- /dev/null
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt
@@ -0,0 +1,7 @@
+package io.ably.lib.objects
+
+/** @spec RTO22 */
+internal enum class ObjectsOperationSource {
+ LOCAL, // RTO22a - applied upon receipt of ACK
+ CHANNEL // RTO22b - received over a Realtime channel
+}
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
index fa94e0a59..91bfeb011 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
@@ -3,6 +3,7 @@ package io.ably.lib.objects.type
import io.ably.lib.objects.ObjectMessage
import io.ably.lib.objects.ObjectOperation
import io.ably.lib.objects.ObjectState
+import io.ably.lib.objects.ObjectsOperationSource
import io.ably.lib.objects.ObjectsPoolDefaults
import io.ably.lib.objects.objectError
import io.ably.lib.objects.type.livecounter.noOpCounterUpdate
@@ -66,11 +67,11 @@ internal abstract class BaseRealtimeObject(
/**
* This is invoked by ObjectMessage having updated data with parent `ProtocolMessageAction` as `object`
- * @return an update describing the changes
+ * @return true if the operation was meaningfully applied, false otherwise
*
* @spec RTLM15/RTLC7 - Applies ObjectMessage with object data operations to LiveMap/LiveCounter
*/
- internal fun applyObject(objectMessage: ObjectMessage) {
+ internal fun applyObject(objectMessage: ObjectMessage, source: ObjectsOperationSource): Boolean {
validateObjectId(objectMessage.operation?.objectId)
val msgTimeSerial = objectMessage.serial
@@ -84,17 +85,18 @@ internal abstract class BaseRealtimeObject(
"Skipping ${objectOperation.action} op: op serial $msgTimeSerial <= site serial ${siteTimeserials[msgSiteCode]}; " +
"objectId=$objectId"
)
- return
+ return false // RTLC7b / RTLM15b
+ }
+ // RTLC7c / RTLM15c - only update siteTimeserials for CHANNEL source
+ if (source == ObjectsOperationSource.CHANNEL) {
+ siteTimeserials[msgSiteCode!!] = msgTimeSerial!! // RTLC7c, RTLM15c
}
- // should update stored site serial immediately. doesn't matter if we successfully apply the op,
- // as it's important to mark that the op was processed by the object
- siteTimeserials[msgSiteCode!!] = msgTimeSerial!! // RTLC7c, RTLM15c
if (isTombstoned) {
// this object is tombstoned so the operation cannot be applied
- return
+ return false // RTLC7e / RTLM15e
}
- applyObjectOperation(objectOperation, objectMessage) // RTLC7d
+ return applyObjectOperation(objectOperation, objectMessage) // RTLC7d
}
/**
@@ -166,9 +168,10 @@ internal abstract class BaseRealtimeObject(
*
* @param operation The operation containing the action and data to apply
* @param message The complete object message containing the operation
+ * @return true if the operation was meaningfully applied, false otherwise
*
*/
- abstract fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage)
+ abstract fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage): Boolean
/**
* Clears the object's data and returns an update describing the changes.
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt
index b34188b62..164cdb28a 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt
@@ -85,16 +85,16 @@ internal class DefaultLiveCounter private constructor(
)
)
- // RTLC12f - Publish the message
- realtimeObjects.publish(arrayOf(msg))
+ // RTLC12g - publish and apply locally on ACK
+ realtimeObjects.publishAndApply(arrayOf(msg))
}
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveCounterUpdate {
return liveCounterManager.applyState(objectState, message.serialTimestamp)
}
- override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
- liveCounterManager.applyOperation(operation, message.serialTimestamp)
+ override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage): Boolean {
+ return liveCounterManager.applyOperation(operation, message.serialTimestamp)
}
override fun clearData(): LiveCounterUpdate {
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt
index d96d65b64..943faf4ce 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt
@@ -39,21 +39,32 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
/**
* @spec RTLC7 - Applies operations to LiveCounter
*/
- internal fun applyOperation(operation: ObjectOperation, serialTimestamp: Long?) {
- val update = when (operation.action) {
- ObjectOperationAction.CounterCreate -> applyCounterCreate(operation) // RTLC7d1
+ internal fun applyOperation(operation: ObjectOperation, serialTimestamp: Long?): Boolean {
+ return when (operation.action) {
+ ObjectOperationAction.CounterCreate -> {
+ val update = applyCounterCreate(operation) // RTLC7d1
+ liveCounter.notifyUpdated(update) // RTLC7d1a
+ true // RTLC7d1b
+ }
ObjectOperationAction.CounterInc -> {
if (operation.counterOp != null) {
- applyCounterInc(operation.counterOp) // RTLC7d2
+ val update = applyCounterInc(operation.counterOp) // RTLC7d2
+ liveCounter.notifyUpdated(update) // RTLC7d2a
+ true // RTLC7d2b
} else {
throw objectError("No payload found for ${operation.action} op for LiveCounter objectId=${objectId}")
}
}
- ObjectOperationAction.ObjectDelete -> liveCounter.tombstone(serialTimestamp)
- else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3
+ ObjectOperationAction.ObjectDelete -> {
+ val update = liveCounter.tombstone(serialTimestamp)
+ liveCounter.notifyUpdated(update)
+ true // RTLC7d4b
+ }
+ else -> {
+ Log.w(tag, "Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3
+ false
+ }
}
-
- liveCounter.notifyUpdated(update) // RTLC7d1a, RTLC7d2a
}
/**
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt
index 8c2da8e6a..cd0604dbf 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt
@@ -135,8 +135,8 @@ internal class DefaultLiveMap private constructor(
)
)
- // RTLM20f - Publish the message
- realtimeObjects.publish(arrayOf(msg))
+ // RTLM20g - publish and apply locally on ACK
+ realtimeObjects.publishAndApply(arrayOf(msg))
}
private suspend fun removeAsync(keyName: String) {
@@ -157,16 +157,16 @@ internal class DefaultLiveMap private constructor(
)
)
- // RTLM21f - Publish the message
- realtimeObjects.publish(arrayOf(msg))
+ // RTLM21g - publish and apply locally on ACK
+ realtimeObjects.publishAndApply(arrayOf(msg))
}
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveMapUpdate {
return liveMapManager.applyState(objectState, message.serialTimestamp)
}
- override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
- liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp)
+ override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage): Boolean {
+ return liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp)
}
override fun clearData(): LiveMapUpdate {
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt
index 19a6ef592..90c920cf2 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt
@@ -51,28 +51,41 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
/**
* @spec RTLM15 - Applies operations to LiveMap
*/
- internal fun applyOperation(operation: ObjectOperation, serial: String?, serialTimestamp: Long?) {
- val update = when (operation.action) {
- ObjectOperationAction.MapCreate -> applyMapCreate(operation) // RTLM15d1
+ internal fun applyOperation(operation: ObjectOperation, serial: String?, serialTimestamp: Long?): Boolean {
+ return when (operation.action) {
+ ObjectOperationAction.MapCreate -> {
+ val update = applyMapCreate(operation) // RTLM15d1
+ liveMap.notifyUpdated(update) // RTLM15d1a
+ true // RTLM15d1b
+ }
ObjectOperationAction.MapSet -> {
if (operation.mapOp != null) {
- applyMapSet(operation.mapOp, serial) // RTLM15d2
+ val update = applyMapSet(operation.mapOp, serial) // RTLM15d2
+ liveMap.notifyUpdated(update) // RTLM15d2a
+ true // RTLM15d2b
} else {
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
}
}
ObjectOperationAction.MapRemove -> {
if (operation.mapOp != null) {
- applyMapRemove(operation.mapOp, serial, serialTimestamp) // RTLM15d3
+ val update = applyMapRemove(operation.mapOp, serial, serialTimestamp) // RTLM15d3
+ liveMap.notifyUpdated(update) // RTLM15d3a
+ true // RTLM15d3b
} else {
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
}
}
- ObjectOperationAction.ObjectDelete -> liveMap.tombstone(serialTimestamp)
- else -> throw objectError("Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4
+ ObjectOperationAction.ObjectDelete -> {
+ val update = liveMap.tombstone(serialTimestamp)
+ liveMap.notifyUpdated(update)
+ true // RTLM15d5b
+ }
+ else -> {
+ Log.w(tag, "Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4
+ false
+ }
}
-
- liveMap.notifyUpdated(update) // RTLM15d1a, RTLM15d2a, RTLM15d3a
}
/**
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
index 3e04d9e06..e068b7175 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
@@ -3,6 +3,7 @@ package io.ably.lib.objects.unit.objects
import io.ably.lib.objects.*
import io.ably.lib.objects.ObjectMessage
import io.ably.lib.objects.ObjectState
+import io.ably.lib.objects.ObjectsOperationSource
import io.ably.lib.objects.ObjectsState
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
import io.ably.lib.objects.type.livemap.DefaultLiveMap
@@ -148,17 +149,17 @@ class ObjectsManagerTest {
val testObject1 = objectsPool.get("map:testObject@1")
assertNotNull(testObject1, "map:testObject@1 should exist in pool after sync")
verify(exactly = 1) {
- testObject1.applyObject(objectMessage1)
+ testObject1.applyObject(objectMessage1, any())
}
val testObject2 = objectsPool.get("counter:testObject@2")
assertNotNull(testObject2, "counter:testObject@2 should exist in pool after sync")
verify(exactly = 1) {
- testObject2.applyObject(objectMessage2)
+ testObject2.applyObject(objectMessage2, any())
}
val testObject3 = objectsPool.get("map:testObject@3")
assertNotNull(testObject3, "map:testObject@3 should exist in pool after sync")
verify(exactly = 1) {
- testObject3.applyObject(objectMessage3)
+ testObject3.applyObject(objectMessage3, any())
}
}
@@ -193,7 +194,7 @@ class ObjectsManagerTest {
objectsManager.handleObjectMessages(listOf(objectMessage))
verify(exactly = 0) {
- objectsManager["applyObjectMessages"](any>())
+ objectsManager["applyObjectMessages"](any>(), any())
}
assertEquals(1, objectsManager.BufferedObjectOperations.size)
assertEquals(objectMessage, objectsManager.BufferedObjectOperations[0])
@@ -202,7 +203,7 @@ class ObjectsManagerTest {
// RTO7 - Apply buffered operations after sync
objectsManager.endSync(false) // End sync without new sync
verify(exactly = 1) {
- objectsManager["applyObjectMessages"](any>())
+ objectsManager["applyObjectMessages"](any>(), any())
}
assertEquals(0, objectsManager.BufferedObjectOperations.size)
assertEquals(2, objectsPool.size(), "Pool should contain 2 objects after applying buffered operations")
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt
index 77576a907..3f084a6f4 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt
@@ -4,6 +4,7 @@ import io.ably.lib.objects.ObjectsCounter
import io.ably.lib.objects.ObjectMessage
import io.ably.lib.objects.ObjectOperation
import io.ably.lib.objects.ObjectOperationAction
+import io.ably.lib.objects.ObjectsOperationSource
import io.ably.lib.objects.ObjectState
import io.ably.lib.objects.unit.getDefaultLiveCounterWithMockedDeps
import io.ably.lib.types.AblyException
@@ -57,7 +58,7 @@ class DefaultLiveCounterTest {
// RTLC7a - Should throw error when objectId doesn't match
val exception = assertFailsWith {
- liveCounter.applyObject(message)
+ liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
}
val errorInfo = exception.errorInfo
assertNotNull(errorInfo)
@@ -88,7 +89,7 @@ class DefaultLiveCounterTest {
)
// RTLC7b - Should skip operation when serial is not newer
- liveCounter.applyObject(message)
+ liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
// Verify that the site serial was not updated (operation was skipped)
assertEquals("serial2", liveCounter.siteTimeserials["site1"])
@@ -115,7 +116,7 @@ class DefaultLiveCounterTest {
)
// RTLC7c - Should update site serial when operation is valid
- liveCounter.applyObject(message)
+ liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
// Verify that the site serial was updated
assertEquals("serial2", liveCounter.siteTimeserials["site1"])
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt
index 6c1e49748..fce613bcd 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt
@@ -64,7 +64,7 @@ class DefaultLiveCounterManagerTest {
@Test
- fun `(RTLC7, RTLC7d3) LiveCounterManager should throw error for unsupported action`() {
+ fun `(RTLC7, RTLC7d3) LiveCounterManager should return false for unsupported action`() {
val liveCounter = getDefaultLiveCounterWithMockedDeps()
val liveCounterManager = liveCounter.LiveCounterManager
@@ -74,15 +74,9 @@ class DefaultLiveCounterManagerTest {
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = emptyMap())
)
- // RTLC7d3 - Should throw error for unsupported action
- val exception = assertFailsWith {
- liveCounterManager.applyOperation(operation, null)
- }
-
- val errorInfo = exception.errorInfo
- assertNotNull(errorInfo)
- assertEquals(92000, errorInfo.code) // InvalidObject error code
- assertEquals(500, errorInfo.statusCode) // InternalServerError status code
+ // RTLC7d3 - Should return false for unsupported action (no longer throws)
+ val result = liveCounterManager.applyOperation(operation, null)
+ assertFalse(result, "Should return false for unsupported action")
}
@Test
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt
index 783cfe928..98a697768 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt
@@ -2,6 +2,7 @@ package io.ably.lib.objects.unit.type.livemap
import io.ably.lib.objects.ObjectsMapSemantics
import io.ably.lib.objects.ObjectsMap
+import io.ably.lib.objects.ObjectsOperationSource
import io.ably.lib.objects.ObjectState
import io.ably.lib.objects.ObjectMessage
import io.ably.lib.objects.ObjectOperation
@@ -64,7 +65,7 @@ class DefaultLiveMapTest {
// RTLM15a - Should throw error when objectId doesn't match
val exception = assertFailsWith {
- liveMap.applyObject(message)
+ liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
}
val errorInfo = exception.errorInfo
assertNotNull(errorInfo)
@@ -98,7 +99,7 @@ class DefaultLiveMapTest {
)
// RTLM15b - Should skip operation when serial is not newer
- liveMap.applyObject(message)
+ liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
// Verify that the site serial was not updated (operation was skipped)
assertEquals("serial2", liveMap.siteTimeserials["site1"])
@@ -128,7 +129,7 @@ class DefaultLiveMapTest {
)
// RTLM15c - Should update site serial when operation is valid
- liveMap.applyObject(message)
+ liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
// Verify that the site serial was updated
assertEquals("serial2", liveMap.siteTimeserials["site1"])
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt
index 8f5e37bbd..89bdbe7c5 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt
@@ -459,7 +459,7 @@ class LiveMapManagerTest {
}
@Test
- fun `(RTLM15, RTLM15d4) LiveMapManager should throw error for unsupported action`() {
+ fun `(RTLM15, RTLM15d4) LiveMapManager should return false for unsupported action`() {
val liveMap = getDefaultLiveMapWithMockedDeps()
val liveMapManager = liveMap.LiveMapManager
@@ -469,15 +469,9 @@ class LiveMapManagerTest {
counter = ObjectsCounter(count = 20.0)
)
- // RTLM15d4 - Should throw error for unsupported action
- val exception = assertFailsWith {
- liveMapManager.applyOperation(operation, "serial1", null)
- }
-
- val errorInfo = exception.errorInfo
- assertNotNull(errorInfo, "Error info should not be null")
- assertEquals(92000, errorInfo?.code) // InvalidObject error code
- assertEquals(500, errorInfo?.statusCode) // InternalServerError status code
+ // RTLM15d4 - Should return false for unsupported action (no longer throws)
+ val result = liveMapManager.applyOperation(operation, "serial1", null)
+ assertFalse(result, "Should return false for unsupported action")
}
@Test