Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix more flakey tests #949

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -1021,6 +1022,19 @@ public void detach_success_callback_detaching() throws AblyException {
}
}

class RealtimeChannelTestDetachingListener implements ChannelStateListener
{
public AtomicBoolean detachedReceived = new AtomicBoolean(false);

@Override
public void onChannelStateChanged(ChannelStateChange stateChange) {
System.out.println(stateChange.current == ChannelState.detached);
if (stateChange.current == ChannelState.detached) {
detachedReceived.set(true);
}
}
}


/**
* When client attaches to a channel in detaching state, verify that attach call will be done after detach
Expand Down Expand Up @@ -1052,9 +1066,12 @@ public void attach_when_channel_in_detaching_state() throws AblyException {

//block detached so we can ensure that we are in detaching state but unblock immediately after assertion
transportFactory.blockReceiveProcessingAndQueueBlockedMessages(message -> message.action == ProtocolMessage.Action.detached);
/* detach */
final Helpers.CompletionWaiter detachCompletionWaiter = new Helpers.CompletionWaiter();
channel.detach(detachCompletionWaiter);

// Attach a listener for the channel state and start the detach process
RealtimeChannelTestDetachingListener detachingListener = new RealtimeChannelTestDetachingListener();
channel.on(detachingListener);
channel.detach();

assertEquals("Verify detaching state reached", ChannelState.detaching, channel.state);

//now we can send an attach as we previously blocked detaching
Expand All @@ -1065,9 +1082,16 @@ public void attach_when_channel_in_detaching_state() throws AblyException {
//unblock and let the queued messages arrive
transportFactory.allowReceiveProcessing(message -> true);

detachCompletionWaiter.waitFor();
assertThat(detachCompletionWaiter.success, is(true));
assertThat(channel.state, is(ChannelState.detached));
// Wait to receive the detached state
long timeNow = System.currentTimeMillis();
do {
if (System.currentTimeMillis() > timeNow + 5000) {
fail("Channel did not enter detached state: " + detachingListener.detachedReceived);
return;
}

} while (!detachingListener.detachedReceived.get());

//verify reattach - after detach
attachCompletionWaiter.waitFor();
assertThat(attachCompletionWaiter.success,is(true));
Expand Down Expand Up @@ -1536,13 +1560,12 @@ public void channel_resume_lost_continuity() throws AblyException {

/* prepare channels */
Channel attachedChannel = ably.channels.get(attachedChannelName);
ChannelWaiter attachedChannelWaiter = new ChannelWaiter(attachedChannel);
ChannelWaiter preAttachChannelWaiter = new ChannelWaiter(attachedChannel);
attachedChannel.attach();
attachedChannelWaiter.waitFor(ChannelState.attached);
preAttachChannelWaiter.waitFor(ChannelState.attached);

Channel suspendedChannel = ably.channels.get(suspendedChannelName);
suspendedChannel.state = ChannelState.suspended;
ChannelWaiter suspendedChannelWaiter = new ChannelWaiter(suspendedChannel);

final boolean[] suspendedStateReached = new boolean[2];
final boolean[] attachingStateReached = new boolean[2];
Expand Down Expand Up @@ -1584,6 +1607,17 @@ public void onChannelStateChanged(ChannelStateChange stateChange) {
}
});

/*
We need to set up the waiters after the above state listeners, as listeners
without filters are run in the order registered.

This ensures that the various state changes have been recorded by the above
listeners, before the waiters are notified of a state change, thus allowing
the main test to continue.
*/
ChannelWaiter attachedChannelWaiter = new ChannelWaiter(attachedChannel);
ChannelWaiter suspendedChannelWaiter = new ChannelWaiter(suspendedChannel);

/* disconnect, and sabotage the resume */
String originalConnectionId = ably.connection.id;
ably.connection.key = "_____!ably___test_fake-key____";
Expand Down