From 97f7c1c44c557640123b515d3310a7f4df5e471e Mon Sep 17 00:00:00 2001 From: evgeny Date: Thu, 5 Dec 2024 23:04:15 +0000 Subject: [PATCH] [ECO-5163] fix: duplicated messages because of duplicated attach message --- .../io/ably/lib/realtime/ChannelBase.java | 10 +++++ .../realtime/RealtimeConnectFailTest.java | 3 +- .../test/realtime/RealtimeMessageTest.java | 42 +++++++++++++++++++ 3 files changed, 54 insertions(+), 1 deletion(-) diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index b84ba7dc0..9e0c9974c 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -243,6 +243,16 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo()); } + // (RTL4i) + if (connectionManager.getConnectionState().state == ConnectionState.connecting + || connectionManager.getConnectionState().state == ConnectionState.disconnected) { + if (listener != null) { + on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed)); + } + setState(ChannelState.attaching, null); + return; + } + /* send attach request and pending state */ Log.v(TAG, "attach(); channel = " + name + "; sending ATTACH request"); ProtocolMessage attachMessage = new ProtocolMessage(Action.attach, this.name); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java index 02a1d07d5..d9c6d5e58 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java @@ -75,7 +75,8 @@ public void connect_fail_notfound_error() throws AblyException { public void connect_fail_authorized_error() throws AblyException { AblyRealtime ably = null; try { - ClientOptions opts = createOptions(testVars.appId + ".invalid_key_id:invalid_key_value"); + String keyId = testVars.keys[0].keyName.split("\\.")[1]; + ClientOptions opts = createOptions(testVars.appId + "." + keyId + ":invalid_key_value"); ably = new AblyRealtime(opts); ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java index 2d00524f1..b46caaa49 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java @@ -13,12 +13,14 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.concurrent.atomic.AtomicInteger; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; +import io.ably.lib.types.ChannelOptions; import io.ably.lib.types.MessageAction; import io.ably.lib.types.MessageExtras; import io.ably.lib.types.Param; @@ -1010,4 +1012,44 @@ public void should_have_serial_action_createdAt() throws AblyException { assertNull(msgComplete.waitFor(1, 10_000)); } } + + @Test + public void should_not_duplicate_messages() throws Exception { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + String testChannelName = "my-channel" + System.currentTimeMillis(); + try (AblyRest rest = new AblyRest(opts)) { + final io.ably.lib.rest.Channel channel = rest.channels.get(testChannelName); + + Message[] messages = new Message[] { + new Message("name", "message 1"), + new Message("name", "message 2"), + new Message("name", "message 3"), + }; + + channel.publish(messages); + } + + try (AblyRealtime realtime = new AblyRealtime(opts)) { + final ChannelOptions options = new ChannelOptions(); + options.params = new HashMap<>(); + options.params.put("rewind", "10"); + final Channel channel = realtime.channels.get(testChannelName, options); + final CompletionWaiter completionWaiter = new CompletionWaiter(); + final AtomicInteger counter = new AtomicInteger(); + + channel.subscribe(message -> { + int value = counter.incrementAndGet(); + if (value == 3) completionWaiter.onSuccess(); + }); + + completionWaiter.waitFor(); + + assertEquals("Should be exactly 3 messages", 3, counter.get()); + + Thread.sleep(1500); + + assertEquals("Should be exactly 3 messages even after 1.5 sec wait", 3, counter.get()); + } + } + }