Skip to content

Commit

Permalink
[ECO-5163] fix: duplicated messages because of duplicated attach message
Browse files Browse the repository at this point in the history
  • Loading branch information
ttypic committed Dec 6, 2024
1 parent 6c0ffcf commit 97f7c1c
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 1 deletion.
10 changes: 10 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo());
}

// (RTL4i)
if (connectionManager.getConnectionState().state == ConnectionState.connecting
|| connectionManager.getConnectionState().state == ConnectionState.disconnected) {
if (listener != null) {
on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed));
}
setState(ChannelState.attaching, null);
return;
}

/* send attach request and pending state */
Log.v(TAG, "attach(); channel = " + name + "; sending ATTACH request");
ProtocolMessage attachMessage = new ProtocolMessage(Action.attach, this.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void connect_fail_notfound_error() throws AblyException {
public void connect_fail_authorized_error() throws AblyException {
AblyRealtime ably = null;
try {
ClientOptions opts = createOptions(testVars.appId + ".invalid_key_id:invalid_key_value");
String keyId = testVars.keys[0].keyName.split("\\.")[1];
ClientOptions opts = createOptions(testVars.appId + "." + keyId + ":invalid_key_value");
ably = new AblyRealtime(opts);
ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.ably.lib.types.ChannelOptions;
import io.ably.lib.types.MessageAction;
import io.ably.lib.types.MessageExtras;
import io.ably.lib.types.Param;
Expand Down Expand Up @@ -1010,4 +1012,44 @@ public void should_have_serial_action_createdAt() throws AblyException {
assertNull(msgComplete.waitFor(1, 10_000));
}
}

@Test
public void should_not_duplicate_messages() throws Exception {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
String testChannelName = "my-channel" + System.currentTimeMillis();
try (AblyRest rest = new AblyRest(opts)) {
final io.ably.lib.rest.Channel channel = rest.channels.get(testChannelName);

Message[] messages = new Message[] {
new Message("name", "message 1"),
new Message("name", "message 2"),
new Message("name", "message 3"),
};

channel.publish(messages);
}

try (AblyRealtime realtime = new AblyRealtime(opts)) {
final ChannelOptions options = new ChannelOptions();
options.params = new HashMap<>();
options.params.put("rewind", "10");
final Channel channel = realtime.channels.get(testChannelName, options);
final CompletionWaiter completionWaiter = new CompletionWaiter();
final AtomicInteger counter = new AtomicInteger();

channel.subscribe(message -> {
int value = counter.incrementAndGet();
if (value == 3) completionWaiter.onSuccess();
});

completionWaiter.waitFor();

assertEquals("Should be exactly 3 messages", 3, counter.get());

Thread.sleep(1500);

assertEquals("Should be exactly 3 messages even after 1.5 sec wait", 3, counter.get());
}
}

}

0 comments on commit 97f7c1c

Please sign in to comment.