From 90ead3033a2afb57bfc61fdd2e2f1073c93d328a Mon Sep 17 00:00:00 2001 From: Henning Poettker Date: Sun, 19 May 2024 19:29:31 +0200 Subject: [PATCH] Keep heap lean during remote polling --- .../MessageChannelPartitionHandler.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java index a5cc624196..4e31e06a0b 100644 --- a/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2009-2023 the original author or authors. + * Copyright 2009-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -251,24 +251,22 @@ protected Set doHandle(StepExecution managerStepExecution, private Set pollReplies(final StepExecution managerStepExecution, final Set split) throws Exception { - final Set result = new HashSet<>(split.size()); + Set partitionStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet()); Callable> callback = () -> { - Set currentStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet()); JobExecution jobExecution = jobExplorer.getJobExecution(managerStepExecution.getJobExecutionId()); - jobExecution.getStepExecutions() + Set finishedStopExecutions = jobExecution.getStepExecutions() .stream() - .filter(stepExecution -> currentStepExecutionIds.contains(stepExecution.getId())) - .filter(stepExecution -> !result.contains(stepExecution)) + .filter(stepExecution -> partitionStepExecutionIds.contains(stepExecution.getId())) .filter(stepExecution -> !stepExecution.getStatus().isRunning()) - .forEach(result::add); + .collect(Collectors.toSet()); if (logger.isDebugEnabled()) { logger.debug(String.format("Currently waiting on %s partitions to finish", split.size())); } - if (result.size() == split.size()) { - return result; + if (finishedStopExecutions.size() == split.size()) { + return finishedStopExecutions; } else { return null;