Skip to content

Commit

Permalink
refactor: add batchSearchId to exception message
Browse files Browse the repository at this point in the history
  • Loading branch information
caro3801 committed Nov 6, 2023
1 parent d6007a8 commit 4a14bb7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -51,10 +50,11 @@ public BatchSearchLoop(BatchSearchRepository batchSearchRepository, BlockingQueu

public void run() {
logger.info("Datashare running in batch mode. Waiting batch from ds:batchsearch:queue ({})", batchSearchQueue.getClass());
String currentBatchId = null;
waitForMainLoopCalled.countDown();
loopThread = Thread.currentThread();
while (!POISON.equals(currentBatchId) && !exitAsked) {
String currentBatchId = null;

do {
try {
currentBatchId = batchSearchQueue.poll(60, TimeUnit.SECONDS);
if (currentBatchId != null && !POISON.equals(currentBatchId)) {
Expand All @@ -72,17 +72,17 @@ public void run() {
} catch (JooqBatchSearchRepository.BatchNotFoundException notFound) {
logger.warn("batch was not executed : {}", notFound.toString());
} catch (BatchSearchRunner.CancelException cancelEx) {
Objects.requireNonNull(currentBatchId,"BatchSearch Id cannot be null");
logger.info("cancelling batch search {}", currentBatchId);
batchSearchQueue.offer(currentBatchId);
repository.reset(currentBatchId);
String cancelledBatchSearch = cancelEx.getMessage();
logger.info("cancelling batch search {}", cancelledBatchSearch);
batchSearchQueue.offer(cancelledBatchSearch);
repository.reset(cancelledBatchSearch);
} catch (SearchException sex) {
logger.error("exception while running batch " + currentBatchId, sex);
repository.setState(currentBatchId, sex);
} catch (InterruptedException e) {
logger.warn("main loop interrupted");
}
}
} while (!POISON.equals(currentBatchId) && !exitAsked);
logger.info("exiting main loop");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Integer call() throws SearchException {
long beforeScrollLoop = DatashareTime.getInstance().currentTimeMillis();
while (docsToProcess.size() != 0 && numberOfResults < MAX_BATCH_RESULT_SIZE - MAX_SCROLL_SIZE) {
if (cancelAsked) {
throw new CancelException();
throw new CancelException(batchSearch.uuid);
}
resultConsumer.apply(batchSearch.uuid, query, (List<Document>) docsToProcess);
if (DatashareTime.getInstance().currentTimeMillis() - beforeScrollLoop < maxTimeSeconds * 1000L) {
Expand Down Expand Up @@ -142,5 +142,13 @@ public void cancel() {
logger.warn("batch search interrupted during cancel check status for {}", batchSearch.uuid);
}
}
public static class CancelException extends RuntimeException { }
public static class CancelException extends RuntimeException {
public CancelException() {
}

public CancelException(String batchSearchId) {
super(batchSearchId);
}

}
}

0 comments on commit 4a14bb7

Please sign in to comment.