Skip to content

Commit

Permalink
wip better exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
emil-bar committed Jan 14, 2025
1 parent 6a3eea9 commit e5f2092
Show file tree
Hide file tree
Showing 28 changed files with 297 additions and 280 deletions.
111 changes: 75 additions & 36 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Stream;

import com.softwaremill.jox.Channel;
import com.softwaremill.jox.ChannelDone;
import com.softwaremill.jox.ChannelError;
import com.softwaremill.jox.SelectClause;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.JoxScopeExecutionException;
import com.softwaremill.jox.structured.Scopes;
import com.softwaremill.jox.structured.ThrowingFunction;
import com.softwaremill.jox.structured.UnsupervisedScope;

class GroupByImpl<T, V, U> {
Expand Down Expand Up @@ -48,10 +51,10 @@ public ChildDone(V v) {

private final Flow<T> parent;
private final int parallelism;
private final Function<T, V> predicate;
private final ThrowingFunction<T, V> predicate;
private final Flow.ChildFlowTransformer<T, V, U> childFlowTransform;

public GroupByImpl(Flow<T> parent, int parallelism, Function<T, V> predicate, Flow.ChildFlowTransformer<T, V, U> childFlowTransform) {
public GroupByImpl(Flow<T> parent, int parallelism, ThrowingFunction<T, V> predicate, Flow.ChildFlowTransformer<T, V, U> childFlowTransform) {
this.parent = parent;
this.parallelism = parallelism;
this.predicate = predicate;
Expand Down
24 changes: 3 additions & 21 deletions flows/src/test/java/com/softwaremill/jox/flows/FlowAlsoToTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,7 @@ void alsoToTap_shouldSendToBothSinksWhenOtherIsFaster() throws Exception {
Flow<Integer> flow = Flows
.fromValues(1, 2, 3)
.alsoToTap(other)
.tap(v -> {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
.tap(_ -> Thread.sleep(50));

// when & then
assertEquals(List.of(1, 2, 3), flow.runToList());
Expand Down Expand Up @@ -193,13 +187,7 @@ void alsoTapTo_shouldNotFailTheFlowWhenTheOtherSinkFails() throws Exception {
List<Integer> result = Flows
.iterate(1, i -> i + 1)
.take(10)
.tap(v -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.tap(_ -> Thread.sleep(10))
.alsoToTap(other)
.runToList();

Expand All @@ -225,13 +213,7 @@ void alsoTapTo_shouldNotCloseTheFlowWhenTheOtherSinkCloses() throws Exception {
List<Integer> result = Flows
.iterate(1, i -> i + 1)
.take(10)
.tap(v -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.tap(_ -> Thread.sleep(10))
.alsoToTap(other)
.runToList();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.softwaremill.jox.flows;

import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;

public class FlowCompleteCallbacksTest {
@Test
Expand All @@ -27,7 +29,7 @@ void ensureOnCompleteRunsInCaseOfError() {
//given
AtomicBoolean didRun = new AtomicBoolean(false);
Flow<Integer> f = Flows.fromValues(1, 2, 3)
.tap(i -> {throw new RuntimeException();})
.tap(_ -> {throw new RuntimeException();})
.onComplete(() -> didRun.set(true));
assertFalse(didRun.get());

Expand Down Expand Up @@ -57,7 +59,7 @@ void ensureOnDoneDoesNotRunInCaseOfError() {
// given
AtomicBoolean didRun = new AtomicBoolean(false);
Flow<Integer> f = Flows.fromValues(1, 2, 3)
.tap(i -> {throw new RuntimeException();})
.tap(_ -> {throw new RuntimeException();})
.onDone(() -> didRun.set(true));
assertFalse(didRun.get());

Expand All @@ -73,7 +75,7 @@ void ensureOnErrorDoesNotRunInCaseOfSuccess() throws Exception {
// given
AtomicBoolean didRun = new AtomicBoolean(false);
Flow<Integer> f = Flows.fromValues(1, 2, 3)
.onError(e -> didRun.set(true));
.onError(_ -> didRun.set(true));
assertFalse(didRun.get());

// when
Expand All @@ -88,8 +90,8 @@ void ensureOnErrorRunsInCaseOfError() {
// given
AtomicBoolean didRun = new AtomicBoolean(false);
Flow<Integer> f = Flows.fromValues(1, 2, 3)
.tap(i -> {throw new RuntimeException();})
.onError(e -> didRun.set(true));
.tap(_ -> {throw new RuntimeException();})
.onError(_ -> didRun.set(true));
assertFalse(didRun.get());

// when
Expand Down
78 changes: 31 additions & 47 deletions flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;

import com.softwaremill.jox.Channel;
Expand Down Expand Up @@ -55,7 +54,7 @@ void shouldEmitGroupedElementsAndIncludeRemainingValuesWhenFlowCloses() throws E
}

@Test
void shouldReturnFailedFlowWhenTheOriginalFlowIsFailed() throws InterruptedException, ExecutionException {
void shouldReturnFailedFlowWhenTheOriginalFlowIsFailed() throws InterruptedException {
Scopes.unsupervised(scope -> {
// given
RuntimeException failure = new RuntimeException();
Expand Down Expand Up @@ -85,7 +84,7 @@ void shouldEmitGroupedElementsWithCustomCostFunction() throws Exception {
}

@Test
void shouldReturnFailedFlowWhenCostFunctionThrowsException() throws ExecutionException, InterruptedException {
void shouldReturnFailedFlowWhenCostFunctionThrowsException() throws InterruptedException {
Scopes.unsupervised(scope -> {
// when
ChannelClosedException exception = assertThrows(ChannelClosedException.class, () ->
Expand All @@ -101,7 +100,7 @@ void shouldReturnFailedFlowWhenCostFunctionThrowsException() throws ExecutionExc
}

@Test
void groupedWeighted_shouldReturnFailedSourceWhenTheOriginalSourceIsFailed() throws InterruptedException, ExecutionException {
void groupedWeighted_shouldReturnFailedSourceWhenTheOriginalSourceIsFailed() throws InterruptedException {
Scopes.unsupervised(scope -> {
// given
RuntimeException failure = new RuntimeException();
Expand All @@ -120,7 +119,7 @@ void groupedWeighted_shouldReturnFailedSourceWhenTheOriginalSourceIsFailed() thr
}

@Test
void groupedWithin_shouldGroupFirstBatchOfElementsDueToLimitAndSecondBatchDueToTimeout() throws ExecutionException, InterruptedException {
void groupedWithin_shouldGroupFirstBatchOfElementsDueToLimitAndSecondBatchDueToTimeout() throws InterruptedException {
Scopes.supervised(scope -> {
// given
var c = Channel.<Integer>newUnlimitedChannel();
Expand All @@ -139,7 +138,7 @@ void groupedWithin_shouldGroupFirstBatchOfElementsDueToLimitAndSecondBatchDueToT
// when
var elementsWithEmittedTimeOffset = Flows.fromSource(c)
.groupedWithin(3, Duration.ofMillis(100))
.map(s -> new AbstractMap.SimpleEntry<>(s, Duration.ofNanos(System.nanoTime() - start)))
.map(s -> Map.entry(s, Duration.ofNanos(System.nanoTime() - start)))
.runToList();

// then
Expand All @@ -156,7 +155,7 @@ void groupedWithin_shouldGroupFirstBatchOfElementsDueToLimitAndSecondBatchDueToT
}

@Test
void groupedWithin_shouldGroupFirstBatchOfElementsDueToTimeoutAndSecondBatchDueToLimit() throws ExecutionException, InterruptedException {
void groupedWithin_shouldGroupFirstBatchOfElementsDueToTimeoutAndSecondBatchDueToLimit() throws InterruptedException {
Scopes.supervised(scope -> {
// given
var c = Channel.<Integer>newUnlimitedChannel();
Expand Down Expand Up @@ -196,7 +195,7 @@ void groupedWithin_shouldGroupFirstBatchOfElementsDueToTimeoutAndSecondBatchDueT
}

@Test
void groupedWithin_shouldWakeUpOnNewElementAndSendItImmediatelyAfterFirstBatchIsSentAndChannelGoesToTimeoutMode() throws ExecutionException, InterruptedException {
void groupedWithin_shouldWakeUpOnNewElementAndSendItImmediatelyAfterFirstBatchIsSentAndChannelGoesToTimeoutMode() throws InterruptedException {
Scopes.supervised(scope -> {
// given
var c = Channel.<Integer>newUnlimitedChannel();
Expand Down Expand Up @@ -236,7 +235,7 @@ void groupedWithin_shouldWakeUpOnNewElementAndSendItImmediatelyAfterFirstBatchIs
}

@Test
void groupedWithin_shouldSendTheGroupOnlyOnceWhenTheChannelIsClosed() throws ExecutionException, InterruptedException {
void groupedWithin_shouldSendTheGroupOnlyOnceWhenTheChannelIsClosed() throws InterruptedException {
Scopes.supervised(scope -> {
// given
var c = Channel.<Integer>newUnlimitedChannel();
Expand All @@ -258,7 +257,7 @@ void groupedWithin_shouldSendTheGroupOnlyOnceWhenTheChannelIsClosed() throws Exe
}

@Test
void groupedWithin_shouldReturnFailedSourceWhenTheOriginalSourceIsFailed() throws ExecutionException, InterruptedException {
void groupedWithin_shouldReturnFailedSourceWhenTheOriginalSourceIsFailed() throws InterruptedException {
Scopes.supervised(scope -> {
// given
var failure = new RuntimeException();
Expand All @@ -269,13 +268,13 @@ void groupedWithin_shouldReturnFailedSourceWhenTheOriginalSourceIsFailed() throw
ChannelError result = (ChannelError) flow.runToChannel(scope).receiveOrClosed();

// then
assertEquals(failure, result.cause().getCause().getCause());
assertEquals(failure, result.cause().getCause());
return null;
});
}

@Test
void groupedWeightedWithin_shouldGroupElementsOnTimeoutInFirstBatchAndConsiderMaxWeightInRemainingBatches() throws ExecutionException, InterruptedException {
void groupedWeightedWithin_shouldGroupElementsOnTimeoutInFirstBatchAndConsiderMaxWeightInRemainingBatches() throws InterruptedException {
Scopes.supervised(scope -> {
// given
var c = Channel.<Integer>withScopedBufferSize();
Expand Down Expand Up @@ -304,7 +303,7 @@ void groupedWeightedWithin_shouldGroupElementsOnTimeoutInFirstBatchAndConsiderMa
}

@Test
void groupedWeightedWithin_shouldReturnFailedSourceWhenCostFunctionThrowsException() throws ExecutionException, InterruptedException {
void groupedWeightedWithin_shouldReturnFailedSourceWhenCostFunctionThrowsException() throws InterruptedException {
Scopes.supervised(scope -> {
// given
Flow<List<Integer>> flow = Flows.fromValues(1, 2, 3, 0, 4, 5, 6, 7)
Expand All @@ -315,13 +314,13 @@ void groupedWeightedWithin_shouldReturnFailedSourceWhenCostFunctionThrowsExcepti
.forEach(_ -> {}));

// then
assertInstanceOf(ArithmeticException.class, exception.getCause().getCause().getCause());
assertInstanceOf(ArithmeticException.class, exception.getCause().getCause());
return null;
});
}

@Test
void groupedWeightedWithin_shouldReturnFailedSourceWhenOriginalSourceIsFailed() throws ExecutionException, InterruptedException {
void groupedWeightedWithin_shouldReturnFailedSourceWhenOriginalSourceIsFailed() throws InterruptedException {
Scopes.supervised(scope -> {
// given
var failure = new RuntimeException();
Expand All @@ -334,7 +333,7 @@ void groupedWeightedWithin_shouldReturnFailedSourceWhenOriginalSourceIsFailed()

// then
assertEquals(ChannelError.class, result.getClass());
assertEquals(failure, ((ChannelError) result).cause().getCause().getCause());
assertEquals(failure, ((ChannelError) result).cause().getCause());
return null;
});
}
Expand Down Expand Up @@ -498,12 +497,8 @@ void groupBy_shouldGroupWhenChildProcessingIsSlow() throws Exception {
// when
List<Integer> result = Flows.fromIterable(input)
.groupBy(1, _ -> 0, _ -> f -> f.tap(_ -> {
try {
// the number of elements exceeds the buffer, the parent will get blocked
sleep(Duration.ofMillis(10));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// the number of elements exceeds the buffer, the parent will get blocked
sleep(Duration.ofMillis(10));
}))
.runToList();

Expand All @@ -513,59 +508,48 @@ void groupBy_shouldGroupWhenChildProcessingIsSlow() throws Exception {

@Test
void groupBy_shouldPropagateErrorsFromChildFlows() {
ExecutionException exception = assertThrows(ExecutionException.class, () -> {
ChannelErrorException exception = assertThrows(ChannelErrorException.class, () -> {
Flows.fromValues(10, 11, 12, 13, 20, 23, 33, 30)
.groupBy(10, i -> i % 10, _ -> f -> f.tap(i -> {
if (i == 13) throw new RuntimeException("boom!");
}))
.runToList();
});
assertEquals("boom!", exception.getCause().getCause().getMessage());
assertEquals("boom!", exception.getCause().getMessage());
}

@Test
void groupBy_shouldPropagateErrorsFromChildFlowsWhenParentIsBlockedOnSending() {
ExecutionException exception = assertThrows(ExecutionException.class, () -> {
ChannelErrorException exception = assertThrows(ChannelErrorException.class, () -> {
Flows.fromValues(IntStream.rangeClosed(1, 100).boxed().toArray(Integer[]::new))
.groupBy(1, _ -> 0, _ -> f -> f.tap(_ -> {
try {
sleep(Duration.ofMillis(100));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
sleep(Duration.ofMillis(100));
throw new RuntimeException("boom!");
}))
.runToList();
});
assertEquals("boom!", exception.getCause().getCause().getMessage());
assertEquals("boom!", exception.getCause().getMessage());
}

@Test
void groupBy_shouldPropagateRuntimeExceptionErrorsFromParentFlows() {
ExecutionException exception = assertThrows(ExecutionException.class, () -> {
ChannelErrorException exception = assertThrows(ChannelErrorException.class, () -> {
Flows.fromValues(10, 11, 12, 13, 20, 23, 33, 30)
.concat(Flows.failed(new RuntimeException("boom!")))
.groupBy(10, i -> i % 10, _ -> f -> f)
.runToList();
});
assertEquals("boom!", exception.getCause().getCause().getMessage());
assertEquals("boom!", exception.getCause().getMessage());
}

@Test
void groupBy_shouldThrowExecutionExceptionWithIllegalStateExceptionWhenChildStreamIsCompletedByUserProvidedTransformation() {
ExecutionException exception = assertThrows(ExecutionException.class, () -> {
Flows.fromValues(10, 20, 30)
.tap(_ -> {
try {
sleep(Duration.ofMillis(100));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.groupBy(10, i -> i % 10, _ -> f -> f.take(1))
.runToList();
});
assertInstanceOf(IllegalStateException.class, exception.getCause());
void groupBy_shouldThrowIllegalStateExceptionWhenChildStreamIsCompletedByUserProvidedTransformation() {
assertThrows(IllegalStateException.class, () ->
Flows.fromValues(10, 20, 30)
.tap(_ -> sleep(Duration.ofMillis(100)))
.groupBy(10, i -> i % 10, _ -> f -> f.take(1))
.runToList()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
Expand All @@ -27,7 +26,7 @@
public class FlowIOTest {

@Test
void returnEmptyInputStreamForEmptySource() throws ExecutionException, InterruptedException {
void returnEmptyInputStreamForEmptySource() throws InterruptedException {
Scopes.unsupervised(scope -> {
Flow.ByteFlow source = Flows.<byte[]>empty().toByteFlow();
try (InputStream stream = source.runToInputStream(scope)) {
Expand All @@ -38,7 +37,7 @@ void returnEmptyInputStreamForEmptySource() throws ExecutionException, Interrupt
}

@Test
void returnInputStreamForSimpleSource() throws ExecutionException, InterruptedException {
void returnInputStreamForSimpleSource() throws InterruptedException {
Scopes.unsupervised(scope -> {
var source = Flows.fromByteArrays("chunk1".getBytes(), "chunk2".getBytes());
try (InputStream stream = source.runToInputStream(scope)) {
Expand All @@ -49,7 +48,7 @@ void returnInputStreamForSimpleSource() throws ExecutionException, InterruptedEx
}

@Test
void correctlyTrackAvailableBytes() throws ExecutionException, InterruptedException {
void correctlyTrackAvailableBytes() throws InterruptedException {
Scopes.unsupervised(scope -> {
var source = Flows.fromByteArrays("chunk1".getBytes(), "chunk2".getBytes());
try (InputStream stream = source.runToInputStream(scope)) {
Expand Down
Loading

0 comments on commit e5f2092

Please sign in to comment.