From 4a14bb7f6f92fc32798f1b4555b70fdb787e4242 Mon Sep 17 00:00:00 2001 From: Caroline Desprat Date: Mon, 6 Nov 2023 14:29:10 +0000 Subject: [PATCH] refactor: add batchSearchId to exception message --- .../icij/datashare/tasks/BatchSearchLoop.java | 16 ++++++++-------- .../icij/datashare/tasks/BatchSearchRunner.java | 12 ++++++++++-- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchLoop.java b/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchLoop.java index 38a4402f3..a17382496 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchLoop.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchLoop.java @@ -13,7 +13,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; -import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -51,10 +50,11 @@ public BatchSearchLoop(BatchSearchRepository batchSearchRepository, BlockingQueu public void run() { logger.info("Datashare running in batch mode. Waiting batch from ds:batchsearch:queue ({})", batchSearchQueue.getClass()); - String currentBatchId = null; waitForMainLoopCalled.countDown(); loopThread = Thread.currentThread(); - while (!POISON.equals(currentBatchId) && !exitAsked) { + String currentBatchId = null; + + do { try { currentBatchId = batchSearchQueue.poll(60, TimeUnit.SECONDS); if (currentBatchId != null && !POISON.equals(currentBatchId)) { @@ -72,17 +72,17 @@ public void run() { } catch (JooqBatchSearchRepository.BatchNotFoundException notFound) { logger.warn("batch was not executed : {}", notFound.toString()); } catch (BatchSearchRunner.CancelException cancelEx) { - Objects.requireNonNull(currentBatchId,"BatchSearch Id cannot be null"); - logger.info("cancelling batch search {}", currentBatchId); - batchSearchQueue.offer(currentBatchId); - repository.reset(currentBatchId); + String cancelledBatchSearch = cancelEx.getMessage(); + logger.info("cancelling batch search {}", cancelledBatchSearch); + batchSearchQueue.offer(cancelledBatchSearch); + repository.reset(cancelledBatchSearch); } catch (SearchException sex) { logger.error("exception while running batch " + currentBatchId, sex); repository.setState(currentBatchId, sex); } catch (InterruptedException e) { logger.warn("main loop interrupted"); } - } + } while (!POISON.equals(currentBatchId) && !exitAsked); logger.info("exiting main loop"); } diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchRunner.java b/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchRunner.java index e8a5f095c..b7e3cf789 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchRunner.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchRunner.java @@ -96,7 +96,7 @@ public Integer call() throws SearchException { long beforeScrollLoop = DatashareTime.getInstance().currentTimeMillis(); while (docsToProcess.size() != 0 && numberOfResults < MAX_BATCH_RESULT_SIZE - MAX_SCROLL_SIZE) { if (cancelAsked) { - throw new CancelException(); + throw new CancelException(batchSearch.uuid); } resultConsumer.apply(batchSearch.uuid, query, (List) docsToProcess); if (DatashareTime.getInstance().currentTimeMillis() - beforeScrollLoop < maxTimeSeconds * 1000L) { @@ -142,5 +142,13 @@ public void cancel() { logger.warn("batch search interrupted during cancel check status for {}", batchSearch.uuid); } } - public static class CancelException extends RuntimeException { } + public static class CancelException extends RuntimeException { + public CancelException() { + } + + public CancelException(String batchSearchId) { + super(batchSearchId); + } + + } }