From 62d8476a5dec459e32366c698e35965c4e73bdcb Mon Sep 17 00:00:00 2001 From: artemananiev <33361937+artemananiev@users.noreply.github.com> Date: Mon, 26 Feb 2024 12:30:07 -0800 Subject: [PATCH] fix: 11746: Backport the fix for #11304 to release 0.47 (#11747) Signed-off-by: Artem Ananev --- .../reconnect/ConcurrentBlockingIterator.java | 14 +++++------ .../ConcurrentBlockingIteratorTest.java | 23 +++++++++++++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/ConcurrentBlockingIterator.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/ConcurrentBlockingIterator.java index fb4c96574331..9f73fd628219 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/ConcurrentBlockingIterator.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/ConcurrentBlockingIterator.java @@ -109,17 +109,15 @@ public boolean hasNext() { // if closed and buffer.poll != null || !closed final long waitMillis = maxWaitTimeUnit.toMillis(maxWaitTime); final long timeOutWhenMillisAre = System.currentTimeMillis() + waitMillis; - while ((next = buffer.poll()) == null) { - if (closed.get()) { - return false; - } else { - if (System.currentTimeMillis() > timeOutWhenMillisAre) { - throw new RuntimeException(new TimeoutException("Timed out trying to read from buffer")); - } + boolean isOpen = !closed.get(); + while (((next = buffer.poll()) == null) && isOpen) { + if (System.currentTimeMillis() > timeOutWhenMillisAre) { + throw new RuntimeException(new TimeoutException("Timed out trying to read from buffer")); } + isOpen = !closed.get(); } - return true; + return next != null; } /** diff --git a/platform-sdk/swirlds-virtualmap/src/test/java/com/swirlds/virtualmap/internal/reconnect/ConcurrentBlockingIteratorTest.java b/platform-sdk/swirlds-virtualmap/src/test/java/com/swirlds/virtualmap/internal/reconnect/ConcurrentBlockingIteratorTest.java index 52a26b2077f1..7074c0d827cc 100644 --- a/platform-sdk/swirlds-virtualmap/src/test/java/com/swirlds/virtualmap/internal/reconnect/ConcurrentBlockingIteratorTest.java +++ b/platform-sdk/swirlds-virtualmap/src/test/java/com/swirlds/virtualmap/internal/reconnect/ConcurrentBlockingIteratorTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; public class ConcurrentBlockingIteratorTest { @@ -217,6 +218,28 @@ void multiThreadIteration() throws ExecutionException, InterruptedException { } } + @RepeatedTest(1000) + @DisplayName("yield all elements even after closed") + void supplyBeforeClose() throws InterruptedException { + final ConcurrentBlockingIterator iterator = new ConcurrentBlockingIterator<>(4, 10, SECONDS); + final Thread supplier = new Thread(() -> { + try { + iterator.supply(1); + iterator.supply(2); + iterator.close(); + } catch (final InterruptedException e) { + fail("Supplier interrupted"); + } + }); + supplier.start(); + assertTrue(iterator.hasNext(), "Iterator must have more than zero elements"); + assertEquals(1, iterator.next()); + assertTrue(iterator.hasNext(), "Iterator must have more than one element"); + assertEquals(2, iterator.next()); + assertFalse(iterator.hasNext(), "Iterator must not have more than two elements"); + supplier.join(); + } + private Future> thrownByCall(Runnable lambda) { return threadPool.submit(() -> { try {