From b14f578f3abae90eadab3152c35c1232ac4fd35e Mon Sep 17 00:00:00 2001 From: UmmulkiramR Date: Tue, 25 Apr 2023 13:02:08 -0400 Subject: [PATCH 1/5] new feature code changes - Added a mutation gql api, added kafka configurations, modified gql schema. --- pom.xml | 6 ++ .../config/kafka/AnalysisMessage.java | 65 +++++++++++++++ .../songsearch/config/kafka/KafkaConfig.java | 53 ++++++++++++ .../songsearch/config/kafka/KafkaSender.java | 16 ++++ .../songsearch/graphql/GraphQLProvider.java | 9 ++ .../graphql/StartAutomationMutation.java | 83 +++++++++++++++++++ .../songsearch/service/AnalysisService.java | 21 ++++- src/main/resources/schema.graphql | 8 ++ 8 files changed, 260 insertions(+), 1 deletion(-) create mode 100644 src/main/java/bio/overture/songsearch/config/kafka/AnalysisMessage.java create mode 100644 src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java create mode 100644 src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java create mode 100644 src/main/java/bio/overture/songsearch/graphql/StartAutomationMutation.java diff --git a/pom.xml b/pom.xml index 8a35219..aa7bc50 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,12 @@ test + + + org.springframework.kafka + spring-kafka + + com.graphql-java diff --git a/src/main/java/bio/overture/songsearch/config/kafka/AnalysisMessage.java b/src/main/java/bio/overture/songsearch/config/kafka/AnalysisMessage.java new file mode 100644 index 0000000..9d63b07 --- /dev/null +++ b/src/main/java/bio/overture/songsearch/config/kafka/AnalysisMessage.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2018. Ontario Institute for Cancer Research + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package bio.overture.songsearch.config.kafka; + +import bio.overture.songsearch.model.Analysis; +import bio.overture.songsearch.model.enums.AnalysisState; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.*; + +import static lombok.AccessLevel.PRIVATE; + +@Value +// Note: although the AllArgs and NoArgs combination below seems odd, +// it allows Jackson to deserialize to an immutable object without using any additional annotations. +@AllArgsConstructor +@NoArgsConstructor(force = true, access = PRIVATE) +public class AnalysisMessage { + + @NonNull private final String analysisId; + @NonNull private final String studyId; + @NonNull private final String state; + @NonNull private final String songServerId; + @NonNull private final Analysis analysis; + + public static AnalysisMessage createAnalysisMessage( + bio.overture.songsearch.model.Analysis analysis, String songServerId) { + + return new AnalysisMessage( + analysis.getAnalysisId(), + analysis.getStudyId(), + analysis.getAnalysisState().toString(), + analysis.getRepositories().get(0).getCode(), + new Analysis(analysis.getAnalysisId(), new AnalysisType(analysis.getAnalysisType()), analysis.getAnalysisState(), analysis.getStudyId() )); + } + + @Value + @AllArgsConstructor + private static class Analysis{ + String analysisId; + AnalysisType analysisType; + AnalysisState analysisState; + String studyId; + } + + @Data + @AllArgsConstructor + public static class AnalysisType { + String name; + } +} diff --git a/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java b/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java new file mode 100644 index 0000000..dd99f62 --- /dev/null +++ b/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java @@ -0,0 +1,53 @@ +package bio.overture.songsearch.config.kafka; + + +import lombok.val; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + + +@Configuration +public class KafkaConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.template.default-topic}") + private String defaultTopic; + + @Bean + public Map producerConfigs() { + val props = new HashMap(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return props; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + val template = new KafkaTemplate<>(producerFactory()); + template.setDefaultTopic(defaultTopic); + return template; + } + + @Bean + public KafkaSender sender() { + return new KafkaSender(); + } +} diff --git a/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java b/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java new file mode 100644 index 0000000..b1471ea --- /dev/null +++ b/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java @@ -0,0 +1,16 @@ +package bio.overture.songsearch.config.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; + +@Slf4j +public class KafkaSender { + @Autowired + private KafkaTemplate kafkaTemplate; + + public void send(String payload, String key) { + log.debug("sending payload='{}' to topic='{}'", payload, kafkaTemplate.getDefaultTopic()); + kafkaTemplate.send(kafkaTemplate.getDefaultTopic(), key, payload); + } +} diff --git a/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java b/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java index ad4130c..86970eb 100644 --- a/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java +++ b/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java @@ -52,6 +52,8 @@ public class GraphQLProvider { private final AnalysisDataFetcher analysisDataFetcher; private final FileDataFetcher fileDataFetcher; private final EntityDataFetcher entityDataFetcher; + + private final StartAutomationMutation startAutomationMutation; private final AuthProperties authProperties; private GraphQL graphQL; private GraphQLSchema graphQLSchema; @@ -61,10 +63,12 @@ public GraphQLProvider( AnalysisDataFetcher analysisDataFetcher, FileDataFetcher fileDataFetcher, EntityDataFetcher entityDataFetcher, + StartAutomationMutation startAutomationMutation, AuthProperties authProperties) { this.analysisDataFetcher = analysisDataFetcher; this.fileDataFetcher = fileDataFetcher; this.entityDataFetcher = entityDataFetcher; + this.startAutomationMutation=startAutomationMutation; this.authProperties = authProperties; } @@ -147,6 +151,11 @@ private RuntimeWiring buildWiring() { .dataFetcher( "sampleMatchedAnalysesForDonor", analysisDataFetcher.getSampleMatchedAnalysesForDonorFetcher())) + .type( + newTypeWiring("Mutation") + .dataFetcher("startAutomation", startAutomationMutation.startAutomationResolver()) + ) + .build(); } diff --git a/src/main/java/bio/overture/songsearch/graphql/StartAutomationMutation.java b/src/main/java/bio/overture/songsearch/graphql/StartAutomationMutation.java new file mode 100644 index 0000000..2a8b471 --- /dev/null +++ b/src/main/java/bio/overture/songsearch/graphql/StartAutomationMutation.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2021 The Ontario Institute for Cancer Research. All rights reserved + * + * This program and the accompanying materials are made available under the terms of the GNU Affero General Public License v3.0. + * You should have received a copy of the GNU Affero General Public License along with + * this program. If not, see . + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER + * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package bio.overture.songsearch.graphql; + +import bio.overture.songsearch.model.Analysis; +import bio.overture.songsearch.model.SearchResult; +import bio.overture.songsearch.model.Sort; +import bio.overture.songsearch.service.AnalysisService; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import graphql.schema.DataFetcher; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; + +import static bio.overture.songsearch.utils.JacksonUtils.convertValue; +import static java.util.stream.Collectors.toUnmodifiableList; + + +@Component +@Slf4j +public class StartAutomationMutation { + + private final AnalysisService analysisService; + + @Autowired + public StartAutomationMutation(AnalysisService analysisService) { + this.analysisService = analysisService; + } + + + public DataFetcher startAutomationResolver() { + return env -> { + val args = env.getArguments(); + + val filter = ImmutableMap.builder(); + val page = ImmutableMap.builder(); + val sorts = ImmutableList.builder(); + + if (args != null) { + if (args.get("filter") != null) filter.putAll((Map) args.get("filter")); + if (args.get("page") != null) page.putAll((Map) args.get("page")); + if (args.get("sorts") != null) { + val rawSorts = (List) args.get("sorts"); + sorts.addAll( + rawSorts.stream() + .map(sort -> convertValue(sort, Sort.class)) + .collect(toUnmodifiableList())); + } + } + + Analysis analysis = analysisService.getAnalysisById(env.getArguments().get("analysisId").toString()); + log.debug("Analysis fetched: " + analysis); + + analysisService.sendAnalysisMessage(analysis); + log.debug("Message sent to kafka queue"); + return analysis; + }; + } + + +} diff --git a/src/main/java/bio/overture/songsearch/service/AnalysisService.java b/src/main/java/bio/overture/songsearch/service/AnalysisService.java index 14b687f..a2cbd11 100644 --- a/src/main/java/bio/overture/songsearch/service/AnalysisService.java +++ b/src/main/java/bio/overture/songsearch/service/AnalysisService.java @@ -21,14 +21,17 @@ import static bio.overture.songsearch.config.constants.EsDefaults.ES_PAGE_DEFAULT_FROM; import static bio.overture.songsearch.config.constants.EsDefaults.ES_PAGE_DEFAULT_SIZE; import static bio.overture.songsearch.config.constants.SearchFields.*; +import static bio.overture.songsearch.config.kafka.AnalysisMessage.createAnalysisMessage; import static bio.overture.songsearch.model.enums.AnalysisState.PUBLISHED; import static bio.overture.songsearch.model.enums.SpecimenType.NORMAL; import static bio.overture.songsearch.model.enums.SpecimenType.TUMOUR; import static java.util.stream.Collectors.toUnmodifiableList; import static java.util.stream.Stream.empty; +import bio.overture.songsearch.config.kafka.KafkaSender; import bio.overture.songsearch.model.*; import bio.overture.songsearch.repository.AnalysisRepository; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import java.util.*; import java.util.stream.Stream; @@ -45,12 +48,19 @@ public class AnalysisService { private final AnalysisRepository analysisRepository; + private String songServerId; + private final KafkaSender sender; + @Autowired - public AnalysisService(AnalysisRepository analysisRepository) { + public AnalysisService(AnalysisRepository analysisRepository, + @NonNull KafkaSender sender) { + this.analysisRepository = analysisRepository; + this.sender = sender; } + private static Analysis hitToAnalysis(SearchHit hit) { val sourceMap = hit.getSourceAsMap(); return Analysis.parse(sourceMap); @@ -214,4 +224,13 @@ private static class FlatDonorSample { this.sampleType = sample.getSampleType(); } } + + @SneakyThrows + public void sendAnalysisMessage(Analysis analysis) { + val message = createAnalysisMessage(analysis, songServerId); + System.out.println("Message payload: "+message); + System.out.println("message after mapping: "+new ObjectMapper().writeValueAsString(message)); + sender.send(new ObjectMapper().writeValueAsString(message), message.getAnalysisId()); + } + } diff --git a/src/main/resources/schema.graphql b/src/main/resources/schema.graphql index 0dc6a00..8bafa8e 100644 --- a/src/main/resources/schema.graphql +++ b/src/main/resources/schema.graphql @@ -251,3 +251,11 @@ extend type Query { """ sampleMatchedAnalysesForDonor(req: SampleMatchedAnalysesForDonorReq!): [SampleMatchedAnalysisPair] } + +type Mutation { + startAutomation(analysisId: String!): MutationResponse +} + +type MutationResponse { + analysisId: ID +} \ No newline at end of file From c5528ec0f7d712914d390ffbcaa0c397f4e61e1b Mon Sep 17 00:00:00 2001 From: UmmulkiramR Date: Tue, 25 Apr 2023 15:40:54 -0400 Subject: [PATCH 2/5] new feature code changes - Added a mutation gql api, added kafka configurations, modified gql schema. --- .../bio/overture/songsearch/service/AnalysisService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/bio/overture/songsearch/service/AnalysisService.java b/src/main/java/bio/overture/songsearch/service/AnalysisService.java index a2cbd11..aa0d701 100644 --- a/src/main/java/bio/overture/songsearch/service/AnalysisService.java +++ b/src/main/java/bio/overture/songsearch/service/AnalysisService.java @@ -38,6 +38,7 @@ import lombok.NonNull; import lombok.SneakyThrows; import lombok.Value; +import lombok.extern.slf4j.Slf4j; import lombok.val; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.search.SearchHit; @@ -45,6 +46,7 @@ import org.springframework.stereotype.Service; @Service +@Slf4j public class AnalysisService { private final AnalysisRepository analysisRepository; @@ -228,8 +230,7 @@ private static class FlatDonorSample { @SneakyThrows public void sendAnalysisMessage(Analysis analysis) { val message = createAnalysisMessage(analysis, songServerId); - System.out.println("Message payload: "+message); - System.out.println("message after mapping: "+new ObjectMapper().writeValueAsString(message)); + log.debug("Message payload:: "+new ObjectMapper().writeValueAsString(message)); sender.send(new ObjectMapper().writeValueAsString(message), message.getAnalysisId()); } From db46e73793cc2684b5f1b4d0a38ab88b339be2f3 Mon Sep 17 00:00:00 2001 From: UmmulkiramR Date: Thu, 4 May 2023 13:49:46 -0400 Subject: [PATCH 3/5] to make kafka profile specific --- .../songsearch/config/kafka/DefaultSender.java | 11 +++++++++++ .../overture/songsearch/config/kafka/KafkaConfig.java | 2 ++ .../overture/songsearch/config/kafka/KafkaSender.java | 6 +++++- .../bio/overture/songsearch/config/kafka/Sender.java | 5 +++++ 4 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 src/main/java/bio/overture/songsearch/config/kafka/DefaultSender.java create mode 100644 src/main/java/bio/overture/songsearch/config/kafka/Sender.java diff --git a/src/main/java/bio/overture/songsearch/config/kafka/DefaultSender.java b/src/main/java/bio/overture/songsearch/config/kafka/DefaultSender.java new file mode 100644 index 0000000..baa9eba --- /dev/null +++ b/src/main/java/bio/overture/songsearch/config/kafka/DefaultSender.java @@ -0,0 +1,11 @@ +package bio.overture.songsearch.config.kafka; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DefaultSender implements Sender{ + + public void send(String payload, String key) { + log.debug("key: "+key); + } +} diff --git a/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java b/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java index dd99f62..565524c 100644 --- a/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java +++ b/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java @@ -7,6 +7,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @@ -16,6 +17,7 @@ @Configuration +@Profile("kafka") public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") diff --git a/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java b/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java index b1471ea..d0aae71 100644 --- a/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java +++ b/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java @@ -2,15 +2,19 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Profile; import org.springframework.kafka.core.KafkaTemplate; @Slf4j -public class KafkaSender { +@Profile("kafka") +public class KafkaSender implements Sender { @Autowired private KafkaTemplate kafkaTemplate; + public void send(String payload, String key) { log.debug("sending payload='{}' to topic='{}'", payload, kafkaTemplate.getDefaultTopic()); kafkaTemplate.send(kafkaTemplate.getDefaultTopic(), key, payload); } + } diff --git a/src/main/java/bio/overture/songsearch/config/kafka/Sender.java b/src/main/java/bio/overture/songsearch/config/kafka/Sender.java new file mode 100644 index 0000000..c77afb5 --- /dev/null +++ b/src/main/java/bio/overture/songsearch/config/kafka/Sender.java @@ -0,0 +1,5 @@ +package bio.overture.songsearch.config.kafka; + +public interface Sender { + void send(String payload, String key); +} From 8c0092a0fa50cd0e325129387c41204a1103fa3a Mon Sep 17 00:00:00 2001 From: UmmulkiramR Date: Fri, 5 May 2023 08:14:42 -0400 Subject: [PATCH 4/5] Refactored code to rectify failing tests --- .../config/kafka/AnalysisMessage.java | 21 +++++----- .../config/kafka/DefaultSender.java | 6 ++- .../songsearch/config/kafka/KafkaConfig.java | 8 ++-- .../songsearch/config/kafka/KafkaSender.java | 5 +-- .../songsearch/graphql/GraphQLProvider.java | 6 +-- .../graphql/StartAutomationMutation.java | 40 +++++++++++++------ .../repository/AnalysisRepository.java | 4 +- .../songsearch/service/AnalysisService.java | 19 +-------- 8 files changed, 52 insertions(+), 57 deletions(-) diff --git a/src/main/java/bio/overture/songsearch/config/kafka/AnalysisMessage.java b/src/main/java/bio/overture/songsearch/config/kafka/AnalysisMessage.java index 9d63b07..a898a86 100644 --- a/src/main/java/bio/overture/songsearch/config/kafka/AnalysisMessage.java +++ b/src/main/java/bio/overture/songsearch/config/kafka/AnalysisMessage.java @@ -17,13 +17,10 @@ package bio.overture.songsearch.config.kafka; -import bio.overture.songsearch.model.Analysis; +import static lombok.AccessLevel.PRIVATE; import bio.overture.songsearch.model.enums.AnalysisState; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.*; -import static lombok.AccessLevel.PRIVATE; - @Value // Note: although the AllArgs and NoArgs combination below seems odd, // it allows Jackson to deserialize to an immutable object without using any additional annotations. @@ -45,16 +42,20 @@ public static AnalysisMessage createAnalysisMessage( analysis.getStudyId(), analysis.getAnalysisState().toString(), analysis.getRepositories().get(0).getCode(), - new Analysis(analysis.getAnalysisId(), new AnalysisType(analysis.getAnalysisType()), analysis.getAnalysisState(), analysis.getStudyId() )); + new Analysis( + analysis.getAnalysisId(), + new AnalysisType(analysis.getAnalysisType()), + analysis.getAnalysisState(), + analysis.getStudyId())); } @Value @AllArgsConstructor - private static class Analysis{ - String analysisId; - AnalysisType analysisType; - AnalysisState analysisState; - String studyId; + private static class Analysis { + String analysisId; + AnalysisType analysisType; + AnalysisState analysisState; + String studyId; } @Data diff --git a/src/main/java/bio/overture/songsearch/config/kafka/DefaultSender.java b/src/main/java/bio/overture/songsearch/config/kafka/DefaultSender.java index baa9eba..8e055b9 100644 --- a/src/main/java/bio/overture/songsearch/config/kafka/DefaultSender.java +++ b/src/main/java/bio/overture/songsearch/config/kafka/DefaultSender.java @@ -1,11 +1,13 @@ package bio.overture.songsearch.config.kafka; import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; @Slf4j -public class DefaultSender implements Sender{ +@Component +public class DefaultSender implements Sender { public void send(String payload, String key) { - log.debug("key: "+key); + log.debug("key: " + key); } } diff --git a/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java b/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java index 565524c..c2b3762 100644 --- a/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java +++ b/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java @@ -1,6 +1,7 @@ package bio.overture.songsearch.config.kafka; - +import java.util.HashMap; +import java.util.Map; import lombok.val; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; @@ -12,10 +13,6 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import java.util.HashMap; -import java.util.Map; - - @Configuration @Profile("kafka") public class KafkaConfig { @@ -45,6 +42,7 @@ public ProducerFactory producerFactory() { public KafkaTemplate kafkaTemplate() { val template = new KafkaTemplate<>(producerFactory()); template.setDefaultTopic(defaultTopic); + System.out.println("defaultTopic: " + defaultTopic); return template; } diff --git a/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java b/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java index d0aae71..b5ebbeb 100644 --- a/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java +++ b/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java @@ -8,13 +8,10 @@ @Slf4j @Profile("kafka") public class KafkaSender implements Sender { - @Autowired - private KafkaTemplate kafkaTemplate; - + @Autowired private KafkaTemplate kafkaTemplate; public void send(String payload, String key) { log.debug("sending payload='{}' to topic='{}'", payload, kafkaTemplate.getDefaultTopic()); kafkaTemplate.send(kafkaTemplate.getDefaultTopic(), key, payload); } - } diff --git a/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java b/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java index 86970eb..8f1a312 100644 --- a/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java +++ b/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java @@ -68,7 +68,7 @@ public GraphQLProvider( this.analysisDataFetcher = analysisDataFetcher; this.fileDataFetcher = fileDataFetcher; this.entityDataFetcher = entityDataFetcher; - this.startAutomationMutation=startAutomationMutation; + this.startAutomationMutation = startAutomationMutation; this.authProperties = authProperties; } @@ -153,9 +153,7 @@ private RuntimeWiring buildWiring() { analysisDataFetcher.getSampleMatchedAnalysesForDonorFetcher())) .type( newTypeWiring("Mutation") - .dataFetcher("startAutomation", startAutomationMutation.startAutomationResolver()) - ) - + .dataFetcher("startAutomation", startAutomationMutation.startAutomationResolver())) .build(); } diff --git a/src/main/java/bio/overture/songsearch/graphql/StartAutomationMutation.java b/src/main/java/bio/overture/songsearch/graphql/StartAutomationMutation.java index 2a8b471..ebfef55 100644 --- a/src/main/java/bio/overture/songsearch/graphql/StartAutomationMutation.java +++ b/src/main/java/bio/overture/songsearch/graphql/StartAutomationMutation.java @@ -18,38 +18,46 @@ package bio.overture.songsearch.graphql; +import static bio.overture.songsearch.config.kafka.AnalysisMessage.createAnalysisMessage; +import static bio.overture.songsearch.utils.JacksonUtils.convertValue; +import static java.util.stream.Collectors.toUnmodifiableList; + +import bio.overture.songsearch.config.kafka.Sender; import bio.overture.songsearch.model.Analysis; -import bio.overture.songsearch.model.SearchResult; import bio.overture.songsearch.model.Sort; import bio.overture.songsearch.service.AnalysisService; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import graphql.schema.DataFetcher; +import java.util.List; +import java.util.Map; + import lombok.NonNull; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import lombok.val; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import java.util.List; -import java.util.Map; - -import static bio.overture.songsearch.utils.JacksonUtils.convertValue; -import static java.util.stream.Collectors.toUnmodifiableList; - - @Component @Slf4j public class StartAutomationMutation { private final AnalysisService analysisService; + @Value("${songServerId") + private String songServerId; + + private final Sender sender; + @Autowired - public StartAutomationMutation(AnalysisService analysisService) { + public StartAutomationMutation(AnalysisService analysisService, @NonNull Sender sender) { this.analysisService = analysisService; + this.sender = sender; } - public DataFetcher startAutomationResolver() { return env -> { val args = env.getArguments(); @@ -70,14 +78,20 @@ public DataFetcher startAutomationResolver() { } } - Analysis analysis = analysisService.getAnalysisById(env.getArguments().get("analysisId").toString()); + Analysis analysis = + analysisService.getAnalysisById(env.getArguments().get("analysisId").toString()); log.debug("Analysis fetched: " + analysis); - analysisService.sendAnalysisMessage(analysis); + sendAnalysisMessage(analysis); log.debug("Message sent to kafka queue"); return analysis; }; } - + @SneakyThrows + public void sendAnalysisMessage(Analysis analysis) { + val message = createAnalysisMessage(analysis, songServerId); + log.debug("Message payload:: " + new ObjectMapper().writeValueAsString(message)); + sender.send(new ObjectMapper().writeValueAsString(message), message.getAnalysisId()); + } } diff --git a/src/main/java/bio/overture/songsearch/repository/AnalysisRepository.java b/src/main/java/bio/overture/songsearch/repository/AnalysisRepository.java index ebbadf0..7584f07 100644 --- a/src/main/java/bio/overture/songsearch/repository/AnalysisRepository.java +++ b/src/main/java/bio/overture/songsearch/repository/AnalysisRepository.java @@ -84,7 +84,9 @@ private static Map>> argumentPa REPOSITORY_CODE, value -> new NestedQueryBuilder( - "repositories", new TermQueryBuilder("repositories.code", value), ScoreMode.None)) + "repositories", + new TermQueryBuilder("repositories.code", value), + ScoreMode.None)) .put( DONOR_ID, value -> diff --git a/src/main/java/bio/overture/songsearch/service/AnalysisService.java b/src/main/java/bio/overture/songsearch/service/AnalysisService.java index aa0d701..b1501d0 100644 --- a/src/main/java/bio/overture/songsearch/service/AnalysisService.java +++ b/src/main/java/bio/overture/songsearch/service/AnalysisService.java @@ -21,17 +21,14 @@ import static bio.overture.songsearch.config.constants.EsDefaults.ES_PAGE_DEFAULT_FROM; import static bio.overture.songsearch.config.constants.EsDefaults.ES_PAGE_DEFAULT_SIZE; import static bio.overture.songsearch.config.constants.SearchFields.*; -import static bio.overture.songsearch.config.kafka.AnalysisMessage.createAnalysisMessage; import static bio.overture.songsearch.model.enums.AnalysisState.PUBLISHED; import static bio.overture.songsearch.model.enums.SpecimenType.NORMAL; import static bio.overture.songsearch.model.enums.SpecimenType.TUMOUR; import static java.util.stream.Collectors.toUnmodifiableList; import static java.util.stream.Stream.empty; -import bio.overture.songsearch.config.kafka.KafkaSender; import bio.overture.songsearch.model.*; import bio.overture.songsearch.repository.AnalysisRepository; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import java.util.*; import java.util.stream.Stream; @@ -50,19 +47,13 @@ public class AnalysisService { private final AnalysisRepository analysisRepository; - private String songServerId; - private final KafkaSender sender; - @Autowired - public AnalysisService(AnalysisRepository analysisRepository, - @NonNull KafkaSender sender) { + public AnalysisService(AnalysisRepository analysisRepository) { this.analysisRepository = analysisRepository; - this.sender = sender; } - private static Analysis hitToAnalysis(SearchHit hit) { val sourceMap = hit.getSourceAsMap(); return Analysis.parse(sourceMap); @@ -226,12 +217,4 @@ private static class FlatDonorSample { this.sampleType = sample.getSampleType(); } } - - @SneakyThrows - public void sendAnalysisMessage(Analysis analysis) { - val message = createAnalysisMessage(analysis, songServerId); - log.debug("Message payload:: "+new ObjectMapper().writeValueAsString(message)); - sender.send(new ObjectMapper().writeValueAsString(message), message.getAnalysisId()); - } - } From 4805c58abc04d5cff4ee634b712722d59ad4dd99 Mon Sep 17 00:00:00 2001 From: UmmulkiramR Date: Fri, 5 May 2023 08:16:23 -0400 Subject: [PATCH 5/5] Added a kafka placeholder to avoid tests from failing --- src/main/resources/application.yaml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 732b52e..6088f1d 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -34,3 +34,15 @@ auth: - RDPC-DEV.READ queryAndMutation: - RDPC-DEV.WRITE +--- + +spring: + config: + activate: + on-profile: kafka + kafka: + bootstrap-servers: localhost:9092 + template: + default-topic: automation_trigger #song-analysis + +songServerId: submission-song.collab \ No newline at end of file