-
Notifications
You must be signed in to change notification settings - Fork 41
[AIT-98] feat: realtime edits and deletes #1183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: AIT-99/message-appends
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -31,6 +31,7 @@ | |||||||||||||||||
| import io.ably.lib.types.DeltaExtras; | ||||||||||||||||||
| import io.ably.lib.types.ErrorInfo; | ||||||||||||||||||
| import io.ably.lib.types.Message; | ||||||||||||||||||
| import io.ably.lib.types.MessageAction; | ||||||||||||||||||
| import io.ably.lib.types.MessageAnnotations; | ||||||||||||||||||
| import io.ably.lib.types.MessageDecodeException; | ||||||||||||||||||
| import io.ably.lib.types.MessageOperation; | ||||||||||||||||||
|
|
@@ -46,6 +47,7 @@ | |||||||||||||||||
| import io.ably.lib.types.UpdateDeleteResult; | ||||||||||||||||||
| import io.ably.lib.util.CollectionUtils; | ||||||||||||||||||
| import io.ably.lib.util.EventEmitter; | ||||||||||||||||||
| import io.ably.lib.util.Listeners; | ||||||||||||||||||
| import io.ably.lib.util.Log; | ||||||||||||||||||
| import io.ably.lib.util.ReconnectionStrategy; | ||||||||||||||||||
| import io.ably.lib.util.StringUtils; | ||||||||||||||||||
|
|
@@ -1123,7 +1125,7 @@ public synchronized void publish(Message[] messages, CompletionListener listener | |||||||||||||||||
| case suspended: | ||||||||||||||||||
| throw AblyException.fromErrorInfo(new ErrorInfo("Unable to publish in failed or suspended state", 400, 40000)); | ||||||||||||||||||
| default: | ||||||||||||||||||
| connectionManager.send(msg, queueMessages, listener); | ||||||||||||||||||
| connectionManager.send(msg, queueMessages, Listeners.fromCompletionListener(listener)); | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -1206,103 +1208,89 @@ public void getMessageAsync(String serial, Callback<Message> callback) { | |||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Updates an existing message using patch semantics. | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * Non-null fields in the provided message (name, data, extras) will replace the corresponding | ||||||||||||||||||
| * fields in the existing message, while null fields will be left unchanged. | ||||||||||||||||||
| * Asynchronously updates an existing message. | ||||||||||||||||||
| * | ||||||||||||||||||
| * @param message A {@link Message} object containing the fields to update and the serial identifier. | ||||||||||||||||||
| * Only non-null fields will be applied to the existing message. | ||||||||||||||||||
| * @param operation operation metadata such as clientId, description, or metadata in the version field | ||||||||||||||||||
| * @throws AblyException If the update operation fails. | ||||||||||||||||||
| * @return A {@link UpdateDeleteResult} containing the updated message version serial. | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public UpdateDeleteResult updateMessage(Message message, MessageOperation operation) throws AblyException { | ||||||||||||||||||
| return messageEditsMixin.updateMessage(ably.http, message, operation); | ||||||||||||||||||
| public void updateMessage(Message message) throws AblyException { | ||||||||||||||||||
| updateMessage(message, null, null); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Updates an existing message using patch semantics. | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * Non-null fields in the provided message (name, data, extras) will replace the corresponding | ||||||||||||||||||
| * fields in the existing message, while null fields will be left unchanged. | ||||||||||||||||||
| * Asynchronously updates an existing message. | ||||||||||||||||||
| * | ||||||||||||||||||
| * @param message A {@link Message} object containing the fields to update and the serial identifier. | ||||||||||||||||||
| * Only non-null fields will be applied to the existing message. | ||||||||||||||||||
| * @throws AblyException If the update operation fails. | ||||||||||||||||||
| * @return A {@link UpdateDeleteResult} containing the updated message version serial. | ||||||||||||||||||
| * @param operation operation metadata such as clientId, description, or metadata in the version field | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public UpdateDeleteResult updateMessage(Message message) throws AblyException { | ||||||||||||||||||
| return updateMessage(message, null); | ||||||||||||||||||
| public void updateMessage(Message message, MessageOperation operation) throws AblyException { | ||||||||||||||||||
| updateMessage(message, operation, null); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Asynchronously updates an existing message. | ||||||||||||||||||
| * | ||||||||||||||||||
| * @param message A {@link Message} object containing the fields to update and the serial identifier. | ||||||||||||||||||
| * @param operation operation metadata such as clientId, description, or metadata in the version field | ||||||||||||||||||
| * @param callback A callback to be notified of the outcome of this operation. | ||||||||||||||||||
| * @param listener A callback to be notified of the outcome of this operation. | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public void updateMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) { | ||||||||||||||||||
| messageEditsMixin.updateMessageAsync(ably.http, message, operation, callback); | ||||||||||||||||||
| public void updateMessage(Message message, MessageOperation operation, Callback<UpdateDeleteResult> listener) throws AblyException { | ||||||||||||||||||
| Log.v(TAG, "updateMessage(Message); channel = " + this.name + "; serial = " + message.serial); | ||||||||||||||||||
| updateDeleteImpl(message, operation, MessageAction.MESSAGE_UPDATE, listener); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Asynchronously updates an existing message. | ||||||||||||||||||
| * | ||||||||||||||||||
| * @param message A {@link Message} object containing the fields to update and the serial identifier. | ||||||||||||||||||
| * @param callback A callback to be notified of the outcome of this operation. | ||||||||||||||||||
| * @param listener A callback to be notified of the outcome of this operation. | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public void updateMessageAsync(Message message, Callback<UpdateDeleteResult> callback) { | ||||||||||||||||||
| updateMessageAsync(message, null, callback); | ||||||||||||||||||
| public void updateMessage(Message message, Callback<UpdateDeleteResult> listener) throws AblyException { | ||||||||||||||||||
| updateMessage(message, null, listener); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Marks a message as deleted. | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This operation does not remove the message from history; it marks it as deleted | ||||||||||||||||||
| * while preserving the full message history. The deleted message can still be | ||||||||||||||||||
| * retrieved and will have its action set to MESSAGE_DELETE. | ||||||||||||||||||
| * Asynchronously marks a message as deleted. | ||||||||||||||||||
| * | ||||||||||||||||||
| * @param message A {@link Message} message containing the serial identifier. | ||||||||||||||||||
| * @param operation operation metadata such as clientId, description, or metadata in the version field | ||||||||||||||||||
| * @throws AblyException If the delete operation fails. | ||||||||||||||||||
| * @return A {@link UpdateDeleteResult} containing the deleted message version serial. | ||||||||||||||||||
| * @param message A {@link Message} object containing the serial identifier and operation metadata. | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public UpdateDeleteResult deleteMessage(Message message, MessageOperation operation) throws AblyException { | ||||||||||||||||||
| return messageEditsMixin.deleteMessage(ably.http, message, operation); | ||||||||||||||||||
| public void deleteMessage(Message message) throws AblyException { | ||||||||||||||||||
| deleteMessage(message, null, null); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Marks a message as deleted. | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This operation does not remove the message from history; it marks it as deleted | ||||||||||||||||||
| * while preserving the full message history. The deleted message can still be | ||||||||||||||||||
| * retrieved and will have its action set to MESSAGE_DELETE. | ||||||||||||||||||
| * Asynchronously marks a message as deleted. | ||||||||||||||||||
| * | ||||||||||||||||||
| * @param message A {@link Message} message containing the serial identifier. | ||||||||||||||||||
| * @throws AblyException If the delete operation fails. | ||||||||||||||||||
| * @return A {@link UpdateDeleteResult} containing the deleted message version serial. | ||||||||||||||||||
| * @param message A {@link Message} object containing the serial identifier and operation metadata. | ||||||||||||||||||
| * @param operation operation metadata such as clientId, description, or metadata in the version field | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public UpdateDeleteResult deleteMessage(Message message) throws AblyException { | ||||||||||||||||||
| return deleteMessage(message, null); | ||||||||||||||||||
| public void deleteMessage(Message message, MessageOperation operation) throws AblyException { | ||||||||||||||||||
| deleteMessage(message, operation, null); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Asynchronously marks a message as deleted. | ||||||||||||||||||
| * | ||||||||||||||||||
| * @param message A {@link Message} object containing the serial identifier and operation metadata. | ||||||||||||||||||
| * @param operation operation metadata such as clientId, description, or metadata in the version field | ||||||||||||||||||
| * @param callback A callback to be notified of the outcome of this operation. | ||||||||||||||||||
| * @param listener A callback to be notified of the outcome of this operation. | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public void deleteMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) { | ||||||||||||||||||
| messageEditsMixin.deleteMessageAsync(ably.http, message, operation, callback); | ||||||||||||||||||
| public void deleteMessage(Message message, MessageOperation operation, Callback<UpdateDeleteResult> listener) throws AblyException { | ||||||||||||||||||
| Log.v(TAG, "deleteMessage(Message); channel = " + this.name + "; serial = " + message.serial); | ||||||||||||||||||
| updateDeleteImpl(message, operation, MessageAction.MESSAGE_DELETE, listener); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
|
|
@@ -1313,44 +1301,45 @@ public void deleteMessageAsync(Message message, MessageOperation operation, Call | |||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public void deleteMessageAsync(Message message, Callback<UpdateDeleteResult> callback) { | ||||||||||||||||||
| deleteMessageAsync(message, null, callback); | ||||||||||||||||||
| public void deleteMessage(Message message, Callback<UpdateDeleteResult> callback) throws AblyException { | ||||||||||||||||||
| deleteMessage(message, null, callback); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Appends message text to the end of the message. | ||||||||||||||||||
| * Asynchronously appends message text to the end of the message. | ||||||||||||||||||
| * | ||||||||||||||||||
| * @param message A {@link Message} object containing the serial identifier and data to append. | ||||||||||||||||||
| * @param operation operation details such as clientId, description, or metadata | ||||||||||||||||||
| * @return A {@link UpdateDeleteResult} containing the updated message version serial. | ||||||||||||||||||
| * @throws AblyException If the append operation fails. | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public UpdateDeleteResult appendMessage(Message message, MessageOperation operation) throws AblyException { | ||||||||||||||||||
| return messageEditsMixin.appendMessage(ably.http, message, operation); | ||||||||||||||||||
| public void appendMessage(Message message) throws AblyException { | ||||||||||||||||||
| appendMessage(message, null, null); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Appends message text to the end of the message. | ||||||||||||||||||
| * Asynchronously appends message text to the end of the message. | ||||||||||||||||||
| * | ||||||||||||||||||
| * @param message A {@link Message} object containing the serial identifier and data to append. | ||||||||||||||||||
| * @return A {@link UpdateDeleteResult} containing the updated message version serial. | ||||||||||||||||||
| * @throws AblyException If the append operation fails. | ||||||||||||||||||
| * @param operation operation details such as clientId, description, or metadata | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public UpdateDeleteResult appendMessage(Message message) throws AblyException { | ||||||||||||||||||
| return appendMessage(message, null); | ||||||||||||||||||
| public void appendMessage(Message message, MessageOperation operation) throws AblyException { | ||||||||||||||||||
| appendMessage(message, operation, null); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Asynchronously appends message text to the end of the message. | ||||||||||||||||||
| * | ||||||||||||||||||
| * @param message A {@link Message} object containing the serial identifier and data to append. | ||||||||||||||||||
| * @param operation operation details such as clientId, description, or metadata | ||||||||||||||||||
| * @param callback A callback to be notified of the outcome of this operation. | ||||||||||||||||||
| * @param listener A callback to be notified of the outcome of this operation. | ||||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public void appendMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) { | ||||||||||||||||||
| messageEditsMixin.appendMessageAsync(ably.http, message, operation, callback); | ||||||||||||||||||
| public void appendMessage(Message message, MessageOperation operation, Callback<UpdateDeleteResult> listener) throws AblyException { | ||||||||||||||||||
| Log.v(TAG, "appendMessage(Message); channel = " + this.name + "; serial = " + message.serial); | ||||||||||||||||||
| updateDeleteImpl(message, operation, MessageAction.MESSAGE_APPEND, listener); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
|
|
@@ -1361,8 +1350,50 @@ public void appendMessageAsync(Message message, MessageOperation operation, Call | |||||||||||||||||
| * <p> | ||||||||||||||||||
| * This callback is invoked on a background thread. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public void appendMessageAsync(Message message, Callback<UpdateDeleteResult> callback) { | ||||||||||||||||||
| appendMessageAsync(message, null, callback); | ||||||||||||||||||
| public void appendMessage(Message message, Callback<UpdateDeleteResult> callback) throws AblyException { | ||||||||||||||||||
| appendMessage(message, null, callback); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private void updateDeleteImpl( | ||||||||||||||||||
| Message message, | ||||||||||||||||||
| MessageOperation operation, | ||||||||||||||||||
| MessageAction action, | ||||||||||||||||||
| Callback<UpdateDeleteResult> listener | ||||||||||||||||||
| ) throws AblyException { | ||||||||||||||||||
| if (message.serial == null || message.serial.isEmpty()) { | ||||||||||||||||||
| throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003)); | ||||||||||||||||||
| } | ||||||||||||||||||
| ConnectionManager connectionManager = ably.connection.connectionManager; | ||||||||||||||||||
| ConnectionManager.State connectionState = connectionManager.getConnectionState(); | ||||||||||||||||||
| boolean queueMessages = ably.options.queueMessages; | ||||||||||||||||||
| if (!connectionManager.isActive() || (connectionState.queueEvents && !queueMessages)) { | ||||||||||||||||||
| throw AblyException.fromErrorInfo(connectionState.defaultErrorInfo); | ||||||||||||||||||
| } | ||||||||||||||||||
| boolean connected = (connectionState.sendEvents); | ||||||||||||||||||
|
|
||||||||||||||||||
| Message updatedMessage = new Message(message.name, message.data, message.extras); | ||||||||||||||||||
| updatedMessage.serial = message.serial; | ||||||||||||||||||
| updatedMessage.action = action; | ||||||||||||||||||
| updatedMessage.version = new MessageVersion(); | ||||||||||||||||||
| if (operation != null) { | ||||||||||||||||||
| updatedMessage.version.clientId = operation.clientId; | ||||||||||||||||||
| updatedMessage.version.description = operation.description; | ||||||||||||||||||
| updatedMessage.version.metadata = operation.metadata; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| try { | ||||||||||||||||||
| ably.auth.checkClientId(message, true, connected); | ||||||||||||||||||
| updatedMessage.encode(options); | ||||||||||||||||||
| } catch (AblyException e) { | ||||||||||||||||||
| if (listener != null) { | ||||||||||||||||||
| listener.onError(e.errorInfo); | ||||||||||||||||||
| } | ||||||||||||||||||
| return; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| ProtocolMessage msg = new ProtocolMessage(Action.message, this.name); | ||||||||||||||||||
| msg.messages = new Message[] { updatedMessage }; | ||||||||||||||||||
| connectionManager.send(msg, queueMessages, Listeners.toPublishResultListener(listener)); | ||||||||||||||||||
|
||||||||||||||||||
| connectionManager.send(msg, queueMessages, Listeners.toPublishResultListener(listener)); | |
| try { | |
| connectionManager.send(msg, queueMessages, Listeners.toPublishResultListener(listener)); | |
| } catch (AblyException e) { | |
| if (listener != null) { | |
| listener.onError(e.errorInfo); | |
| } | |
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,8 @@ | |
| import io.ably.lib.types.PresenceMessage; | ||
| import io.ably.lib.types.PresenceSerializer; | ||
| import io.ably.lib.types.ProtocolMessage; | ||
| import io.ably.lib.types.PublishResult; | ||
| import io.ably.lib.util.Listeners; | ||
| import io.ably.lib.util.Log; | ||
| import io.ably.lib.util.StringUtils; | ||
|
|
||
|
|
@@ -120,9 +122,9 @@ public synchronized PresenceMessage[] get(String clientId, boolean wait) throws | |
| return get(new Param(GET_WAITFORSYNC, String.valueOf(wait)), new Param(GET_CLIENTID, clientId)); | ||
| } | ||
|
|
||
| void addPendingPresence(PresenceMessage presenceMessage, CompletionListener listener) { | ||
| void addPendingPresence(PresenceMessage presenceMessage, Callback<PublishResult> listener) { | ||
| synchronized(channel) { | ||
| final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage,listener); | ||
| final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage, Listeners.unwrap(listener)); | ||
| pendingPresence.add(queuedPresence); | ||
| } | ||
| } | ||
|
Comment on lines
+125
to
130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # First, let's find and examine the Presence.java file
find . -type f -name "Presence.java" | grep -E "(lib|src)"Repository: ably/ably-java Length of output: 112 🏁 Script executed: # Let's look at the addPendingPresence method around lines 125-130
head -150 lib/src/main/java/io/ably/lib/realtime/Presence.java | tail -50Repository: ably/ably-java Length of output: 2578 🏁 Script executed: # Find the Listeners class to understand unwrap() behavior
find . -type f -name "Listeners.java" | head -5Repository: ably/ably-java Length of output: 109 🏁 Script executed: # Examine the Listeners.java file to understand unwrap() and fromCompletionListener()
cat -n ./lib/src/main/java/io/ably/lib/util/Listeners.javaRepository: ably/ably-java Length of output: 2810 🏁 Script executed: # Find all callers of addPendingPresence to see how it's being called
rg -B 3 -A 3 "addPendingPresence" lib/src/main/java/io/ably/lib/realtime/Presence.javaRepository: ably/ably-java Length of output: 462 🏁 Script executed: # Search for other callers of addPendingPresence in the entire codebase
rg "addPendingPresence" --type javaRepository: ably/ably-java Length of output: 362 🏁 Script executed: # Find the call to addPendingPresence in ChannelBase.java
rg -B 10 -A 5 "this.presence.addPendingPresence" lib/src/main/java/io/ably/lib/realtime/ChannelBase.javaRepository: ably/ably-java Length of output: 872 🏁 Script executed: # Find where queuedMessage.listener is created/set
rg -B 5 "queuedMessage.listener" lib/src/main/java/io/ably/lib/realtime/ChannelBase.java | head -100Repository: ably/ably-java Length of output: 506 🏁 Script executed: # Look at the QueuedMessage class to understand its listener field
rg -B 5 -A 15 "class QueuedMessage" lib/src/main/java/io/ably/lib/realtime/ChannelBase.javaRepository: ably/ably-java Length of output: 40 🏁 Script executed: # Search for QueuedMessage class definition in the codebase
rg -B 5 -A 20 "class QueuedMessage" --type javaRepository: ably/ably-java Length of output: 2556 🏁 Script executed: # Look at the ConnectionManager to see where QueuedMessage is created
find . -type f -name "ConnectionManager.java" | head -1 | xargs cat -n | head -200Repository: ably/ably-java Length of output: 9747 🏁 Script executed: # Search for where QueuedMessage listener is set in ConnectionManager
rg "new QueuedMessage" --type java -B 2 -A 5Repository: ably/ably-java Length of output: 4022 🏁 Script executed: # Find where presence messages are sent to ConnectionManager.send
rg "presence" lib/src/main/java/io/ably/lib/realtime/Presence.java -i -A 3 | grep -E "(send|Listeners\.from)" | head -20Repository: ably/ably-java Length of output: 489 🏁 Script executed: # Look for public methods in Presence that accept listeners and handle presence operations
rg -B 5 -A 10 "public.*Callback<PublishResult>" lib/src/main/java/io/ably/lib/realtime/Presence.javaRepository: ably/ably-java Length of output: 40 🏁 Script executed: # Search for calls to send() in Presence with presence messages
rg "connectionManager\.send|\.send\(" lib/src/main/java/io/ably/lib/realtime/Presence.java -B 3 -A 3Repository: ably/ably-java Length of output: 941 🏁 Script executed: # Let's check the full sendQueuedMessages method to understand the complete flow
rg -B 2 -A 30 "void sendQueuedMessages" lib/src/main/java/io/ably/lib/realtime/Presence.javaRepository: ably/ably-java Length of output: 1478 🏁 Script executed: # Check QueuedPresence class to see what it does with the listener
rg -B 5 -A 20 "class QueuedPresence" lib/src/main/java/io/ably/lib/realtime/Presence.javaRepository: ably/ably-java Length of output: 1095 🏁 Script executed: # Let's verify if there are any other code paths where addPendingPresence is called
rg "addPendingPresence\|QueuedPresence" --type javaRepository: ably/ably-java Length of output: 40 🏁 Script executed: # Double-check if addPendingPresence is called from anywhere else besides transferQueuedPresenceMessages
rg -r "addPendingPresence" --type java --entire-file lib/src/main/java/Repository: ably/ably-java Length of output: 502 🏁 Script executed: # Verify all the public presence methods to see the full public API
rg -B 3 "public.*CompletionListener\|public.*Callback<PublishResult>" lib/src/main/java/io/ably/lib/realtime/Presence.java | head -60Repository: ably/ably-java Length of output: 40 🏁 Script executed: # Check if there are any tests that demonstrate how addPendingPresence should be used
find . -type f -name "*PresenceTest*.java" -o -name "*Presence*Test.java" | xargs rg "addPendingPresence" 2>/dev/null | head -20Repository: ably/ably-java Length of output: 40 🏁 Script executed: # Check the visibility of addPendingPresence method
rg "addPendingPresence" lib/src/main/java/io/ably/lib/realtime/Presence.java -B 1 -A 1Repository: ably/ably-java Length of output: 187 🏁 Script executed: # Examine the complete flow - look at what listeners are stored in ConnectionManager.queuedMessages
rg -B 5 -A 10 "queuedMessages.add" lib/src/main/java/io/ably/lib/transport/ConnectionManager.javaRepository: ably/ably-java Length of output: 1342 🏁 Script executed: # Check if CompletionListenerWrapper is the only type of Callback that gets stored in QueuedMessage
rg "ConnectionManager.send\|new QueuedMessage" lib/src/main/java/io/ably/lib/realtime/Presence.java -B 3 -A 3Repository: ably/ably-java Length of output: 40 🏁 Script executed: # Check the exact line where addPendingPresence is defined to confirm its visibility
sed -n '125,130p' lib/src/main/java/io/ably/lib/realtime/Presence.javaRepository: ably/ably-java Length of output: 366 🏁 Script executed: # Look at the complete Presence class to see all methods that take CompletionListener or callbacks
rg "public|protected|private|void " lib/src/main/java/io/ably/lib/realtime/Presence.java | grep -E "(enter|update|leave)" | head -20Repository: ably/ably-java Length of output: 1516 🏁 Script executed: # Check if there's any way a non-wrapped callback could be passed through the public API
rg -B 10 "enter\(|update\(|leave\(" lib/src/main/java/io/ably/lib/realtime/Presence.java | grep -E "(public|Callback|CompletionListener)" | head -30Repository: ably/ably-java Length of output: 393 🏁 Script executed: # Let's trace the complete flow from public methods to see if there's ANY way a non-wrapped callback could reach addPendingPresence
rg -B 15 "void updatePresence" lib/src/main/java/io/ably/lib/realtime/Presence.java | head -40Repository: ably/ably-java Length of output: 800 🏁 Script executed: # Check if ConnectionManager.send is ever called from outside Presence with presence messages
rg "connectionManager.send" lib/src/main/java/io/ably/lib/realtime/ --type java -B 2 | grep -v "Listeners.from"Repository: ably/ably-java Length of output: 2039 🏁 Script executed: # Verify all the paths that lead to transferQueuedPresenceMessages
rg "transferQueuedPresenceMessages\|transferToChannelQueue" lib/src/main/java/io/ably/lib/transport/ConnectionManager.java -B 5 -A 5Repository: ably/ably-java Length of output: 40 🏁 Script executed: # Let's verify the specific lines 768 and 897 mentioned in the review
sed -n '760,780p' lib/src/main/java/io/ably/lib/realtime/Presence.javaRepository: ably/ably-java Length of output: 1062 🏁 Script executed: # Check line 897
sed -n '890,910p' lib/src/main/java/io/ably/lib/realtime/Presence.javaRepository: ably/ably-java Length of output: 777 🏁 Script executed: # Verify the complete data flow: public methods -> updatePresence -> send with Listeners.from
rg -A 20 "public void updatePresence" lib/src/main/java/io/ably/lib/realtime/Presence.java | head -30Repository: ably/ably-java Length of output: 862 🏁 Script executed: # Let's check if there are any other code paths that might call addPendingPresence
rg "pendingPresence" lib/src/main/java/io/ably/lib/realtime/Presence.java -B 2 -A 2Repository: ably/ably-java Length of output: 1619 🏁 Script executed: # Verify the complete updatePresence method to see all branches
sed -n '740,780p' lib/src/main/java/io/ably/lib/realtime/Presence.javaRepository: ably/ably-java Length of output: 1780 🏁 Script executed: # Confirm the visibility and full signature of addPendingPresence
sed -n '124,131p' lib/src/main/java/io/ably/lib/realtime/Presence.javaRepository: ably/ably-java Length of output: 368 🏁 Script executed: # Check if there are integration tests that might show the expected behavior
find . -type f -name "*.java" -path "*/test/*" | xargs rg "addPendingPresence\|QueuedMessage" 2>/dev/null | head -10Repository: ably/ably-java Length of output: 40 🏁 Script executed: # Verify that ConnectionManager only ever creates QueuedMessage with wrapped callbacks
rg "new QueuedMessage" lib/src/main/java/io/ably/lib/transport/ConnectionManager.java -B 8 -A 2Repository: ably/ably-java Length of output: 2333 🏁 Script executed: # Final verification: check if transferQueuedPresenceMessages is only called from one place
rg "transferQueuedPresenceMessages" lib/src/main/java/io/ably/lib/ --type javaRepository: ably/ably-java Length of output: 360 Design fragility: The method is package-private and currently safe—all call paths from Consider adding an assertion or log when unwrapping fails to catch accidental misuse: final CompletionListener completionListener = Listeners.unwrap(listener);
if (listener != null && completionListener == null) {
Log.w(TAG, "Warning: addPendingPresence received unwrapped callback");
// or assert: assert completionListener != null;
} |
||
|
|
@@ -763,7 +765,7 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr | |
| ProtocolMessage message = new ProtocolMessage(ProtocolMessage.Action.presence, channel.name); | ||
| message.presence = new PresenceMessage[] { msg }; | ||
| ConnectionManager connectionManager = ably.connection.connectionManager; | ||
| connectionManager.send(message, ably.options.queueMessages, listener); | ||
| connectionManager.send(message, ably.options.queueMessages, Listeners.fromCompletionListener(listener)); | ||
| break; | ||
| default: | ||
| throw AblyException.fromErrorInfo(new ErrorInfo("Unable to enter presence channel in detached or failed state", 400, 91001)); | ||
|
|
@@ -892,7 +894,7 @@ private void sendQueuedMessages() { | |
| pendingPresence.clear(); | ||
|
|
||
| try { | ||
| connectionManager.send(message, queueMessages, listener); | ||
| connectionManager.send(message, queueMessages, Listeners.fromCompletionListener(listener)); | ||
| } catch(AblyException e) { | ||
| Log.e(TAG, "sendQueuedMessages(): Unexpected exception sending message", e); | ||
| if(listener != null) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.