Skip to content

Commit

Permalink
separate step listeners for step writers
Browse files Browse the repository at this point in the history
  • Loading branch information
nitin-ebi committed Apr 30, 2024
1 parent 7b09548 commit a9186fb
Show file tree
Hide file tree
Showing 17 changed files with 161 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package uk.ac.ebi.eva.accession.clustering.batch.listeners;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import uk.ac.ebi.eva.accession.clustering.batch.io.ClusteringWriter;
import uk.ac.ebi.eva.accession.clustering.batch.io.RSSplitWriter;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERED_CLUSTERING_WRITER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.NON_CLUSTERED_CLUSTERING_WRITER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.NON_CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_SPLIT_WRITER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_SPLIT_WRITER_JOB_EXECUTION_SETTER;

@Configuration
public class JobExecutionSetter {
@Bean(NON_CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER)
public StepExecutionListener getNonClusteredClusteringWriterJobExecutionSetter(
@Qualifier(NON_CLUSTERED_CLUSTERING_WRITER) ClusteringWriter nonClusteredClusteringWriter) {
return new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
nonClusteredClusteringWriter.setJobExecution(stepExecution.getJobExecution());
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return stepExecution.getExitStatus();
}
};
}

@Bean(CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER)
public StepExecutionListener getClusteredClusteringWriterJobExecutionSetter(
@Qualifier(CLUSTERED_CLUSTERING_WRITER) ClusteringWriter clusteredClusteringWriter) {
return new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
clusteredClusteringWriter.setJobExecution(stepExecution.getJobExecution());
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return stepExecution.getExitStatus();
}
};
}


@Bean(RS_SPLIT_WRITER_JOB_EXECUTION_SETTER)
public StepExecutionListener getRSSplitWriterJobExecutionSetter(
@Qualifier(RS_SPLIT_WRITER) RSSplitWriter rsSplitWriter) {
return new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
rsSplitWriter.setJobExecution(stepExecution.getJobExecution());
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return stepExecution.getExitStatus();
}
};
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,9 @@ public class BeanNames {

public static final String BACK_PROPAGATE_SPLIT_OR_MERGED_RS_JOB = "BACK_PROPAGATE_SPLIT_OR_MERGED_RS_JOB";

public static final String JOB_EXECUTION_LISTENER = "JOB_EXECUTION_LISTENER";
public static final String NON_CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER = "NON_CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER";

public static final String CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER = "CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER";

public static final String RS_SPLIT_WRITER_JOB_EXECUTION_SETTER = "RS_SPLIT_WRITER_JOB_EXECUTION_SETTER";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package uk.ac.ebi.eva.accession.clustering.configuration.batch.io;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package uk.ac.ebi.eva.accession.clustering.configuration.batch.io;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package uk.ac.ebi.eva.accession.clustering.configuration.batch.jobs;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
Expand All @@ -29,7 +28,6 @@
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTER_UNCLUSTERED_VARIANTS_JOB;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.JOB_EXECUTION_LISTENER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_MERGE_CANDIDATES_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_SPLIT_CANDIDATES_STEP;

Expand All @@ -46,7 +44,6 @@ public Job clusteringFromMongoJob(
@Qualifier(CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP) Step clearRSMergeAndSplitCandidatesStep,
@Qualifier(CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP) Step clusteringNonClusteredVariantsFromMongoStep,
@Qualifier(ACCESSIONING_SHUTDOWN_STEP) Step accessioningShutdownStep,
@Qualifier(JOB_EXECUTION_LISTENER) JobExecutionListener jobExecutionListener,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get(CLUSTER_UNCLUSTERED_VARIANTS_JOB)
.incrementer(new RunIdIncrementer())
Expand All @@ -55,7 +52,6 @@ public Job clusteringFromMongoJob(
.next(clearRSMergeAndSplitCandidatesStep)
.next(clusteringNonClusteredVariantsFromMongoStep)
.next(accessioningShutdownStep)
.listener(jobExecutionListener)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import htsjdk.samtools.util.StringUtil;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
Expand Down Expand Up @@ -48,7 +47,6 @@
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_CLUSTERED_VARIANTS_FROM_MONGO_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_FROM_MONGO_JOB;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.JOB_EXECUTION_LISTENER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_MERGE_CANDIDATES_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_SPLIT_CANDIDATES_STEP;

Expand Down Expand Up @@ -88,15 +86,13 @@ public Job clusteringFromMongoJob(@Qualifier(CLUSTERING_CLUSTERED_VARIANTS_FROM_
// Back-propagate RS in the remapped assembly that were split or merged
@Qualifier(BACK_PROPAGATE_SPLIT_OR_MERGED_RS_STEP)
Step backPropagateSplitMergedRSStep,
@Qualifier(JOB_EXECUTION_LISTENER) JobExecutionListener jobExecutionListener,
StepBuilderFactory stepBuilderFactory,
JobBuilderFactory jobBuilderFactory,
InputParameters inputParameters) {
JobExecutionDecider jobExecutionDecider = isRemappedAssemblyPresent(inputParameters);
Step dummyStep = dummyStep(stepBuilderFactory);
return jobBuilderFactory.get(CLUSTERING_FROM_MONGO_JOB)
.incrementer(new RunIdIncrementer())
.listener(jobExecutionListener)
//We need the dummy step here because Spring won't conditionally start the first step
.start(dummyStep)
.next(jobExecutionDecider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.springframework.context.annotation.Configuration;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.JOB_EXECUTION_LISTENER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.STUDY_CLUSTERING_JOB;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.STUDY_CLUSTERING_STEP;

Expand All @@ -37,13 +36,11 @@ public class StudyClusteringJobConfiguration {
@Bean(STUDY_CLUSTERING_JOB)
public Job studyClusteringJob(@Qualifier(STUDY_CLUSTERING_STEP) Step clusteringStep,
@Qualifier(ACCESSIONING_SHUTDOWN_STEP) Step accessioningShutdownStep,
@Qualifier(JOB_EXECUTION_LISTENER) JobExecutionListener jobExecutionListener,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get(STUDY_CLUSTERING_JOB)
.incrementer(new RunIdIncrementer())
.start(clusteringStep)
.next(accessioningShutdownStep)
.listener(jobExecutionListener)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@

import java.util.List;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.NON_CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_SPLIT_WRITER_JOB_EXECUTION_SETTER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.STUDY_CLUSTERING_MONGO_READER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.STUDY_CLUSTERING_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.TARGET_SS_READER_FOR_NEW_BACKPROP_RS;
Expand Down Expand Up @@ -67,13 +70,15 @@ public Step clusteringClusteredVariantStepMongoReader(
@Qualifier(CLUSTERED_VARIANTS_MONGO_READER) ItemStreamReader<SubmittedVariantEntity> mongoReader,
@Qualifier(CLUSTERED_CLUSTERING_WRITER) ItemWriter<SubmittedVariantEntity> submittedVariantWriter,
@Qualifier(PROGRESS_LISTENER) StepExecutionListener progressListener,
@Qualifier(CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER) StepExecutionListener clusteredClusteringWriterJobExecutionSetter,
StepBuilderFactory stepBuilderFactory,
SimpleCompletionPolicy chunkSizeCompletionPolicy) {
TaskletStep step = stepBuilderFactory.get(CLUSTERING_CLUSTERED_VARIANTS_FROM_MONGO_STEP)
.<SubmittedVariantEntity, SubmittedVariantEntity>chunk(chunkSizeCompletionPolicy)
.reader(mongoReader)
.writer(submittedVariantWriter)
.listener(progressListener)
.listener(clusteredClusteringWriterJobExecutionSetter)
.build();
return step;
}
Expand Down Expand Up @@ -102,6 +107,7 @@ public Step processRSSplitCandidatesStep(
ItemReader<SubmittedVariantOperationEntity> rsSplitCandidatesReader,
@Qualifier(RS_SPLIT_WRITER) ItemWriter<SubmittedVariantOperationEntity> rsSplitWriter,
@Qualifier(PROGRESS_LISTENER) StepExecutionListener progressListener,
@Qualifier(RS_SPLIT_WRITER_JOB_EXECUTION_SETTER) StepExecutionListener rsSplitWriterJobExecutionSetter,
StepBuilderFactory stepBuilderFactory,
SimpleCompletionPolicy chunkSizeCompletionPolicy) {
TaskletStep step = stepBuilderFactory.get(PROCESS_RS_SPLIT_CANDIDATES_STEP)
Expand All @@ -110,6 +116,7 @@ public Step processRSSplitCandidatesStep(
.reader(rsSplitCandidatesReader)
.writer(rsSplitWriter)
.listener(progressListener)
.listener(rsSplitWriterJobExecutionSetter)
.build();
return step;
}
Expand Down Expand Up @@ -147,13 +154,15 @@ public Step clusteringNonClusteredVariantStepMongoReader(
@Qualifier(NON_CLUSTERED_VARIANTS_MONGO_READER) ItemStreamReader<SubmittedVariantEntity> mongoReader,
@Qualifier(NON_CLUSTERED_CLUSTERING_WRITER) ItemWriter<SubmittedVariantEntity> submittedVariantWriter,
@Qualifier(PROGRESS_LISTENER) StepExecutionListener progressListener,
@Qualifier(NON_CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER) StepExecutionListener nonClusteredClusteringWriterJobExecutionSetter,
StepBuilderFactory stepBuilderFactory,
SimpleCompletionPolicy chunkSizeCompletionPolicy) {
TaskletStep step = stepBuilderFactory.get(CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP)
.<SubmittedVariantEntity, SubmittedVariantEntity>chunk(chunkSizeCompletionPolicy)
.reader(mongoReader)
.writer(submittedVariantWriter)
.listener(progressListener)
.listener(nonClusteredClusteringWriterJobExecutionSetter)
.build();
return step;
}
Expand Down Expand Up @@ -200,13 +209,15 @@ public Step studyClusteringStep(
@Qualifier(STUDY_CLUSTERING_MONGO_READER) ItemStreamReader<SubmittedVariantEntity> mongoReader,
@Qualifier(NON_CLUSTERED_CLUSTERING_WRITER) ItemWriter<SubmittedVariantEntity> submittedVariantWriter,
@Qualifier(PROGRESS_LISTENER) StepExecutionListener progressListener,
@Qualifier(NON_CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER) StepExecutionListener nonClusteredClusteringWriterJobExecutionSetter,
StepBuilderFactory stepBuilderFactory,
SimpleCompletionPolicy chunkSizeCompletionPolicy) {
TaskletStep step = stepBuilderFactory.get(STUDY_CLUSTERING_STEP)
.<SubmittedVariantEntity, SubmittedVariantEntity>chunk(chunkSizeCompletionPolicy)
.reader(mongoReader)
.writer(submittedVariantWriter)
.listener(progressListener)
.listener(nonClusteredClusteringWriterJobExecutionSetter)
.build();
return step;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import uk.ac.ebi.eva.accession.core.model.eva.SubmittedVariantEntity;
import uk.ac.ebi.eva.commons.core.models.pipeline.Variant;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_FROM_VCF_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERED_CLUSTERING_WRITER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROGRESS_LISTENER;
Expand All @@ -48,6 +49,7 @@ public Step clusteringVariantsStep(
@Qualifier(VARIANT_TO_SUBMITTED_VARIANT_ENTITY_PROCESSOR) ItemProcessor<Variant, SubmittedVariantEntity> processor,
@Qualifier(CLUSTERED_CLUSTERING_WRITER) ItemWriter<SubmittedVariantEntity> submittedVariantWriter,
@Qualifier(PROGRESS_LISTENER) StepExecutionListener progressListener,
@Qualifier(CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER) StepExecutionListener clusteredClusteringWriterJobExecutionSetter,
StepBuilderFactory stepBuilderFactory,
SimpleCompletionPolicy chunkSizeCompletionPolicy) {
TaskletStep step = stepBuilderFactory.get(CLUSTERING_FROM_VCF_STEP)
Expand All @@ -56,6 +58,7 @@ public Step clusteringVariantsStep(
.processor(processor)
.writer(submittedVariantWriter)
.listener(progressListener)
.listener(clusteredClusteringWriterJobExecutionSetter)
.build();
return step;
}
Expand Down
Loading

0 comments on commit a9186fb

Please sign in to comment.