From ee38b6c6e42fa2f82bfb1f53a6ae3163720fedd3 Mon Sep 17 00:00:00 2001 From: jakubdziworski Date: Wed, 18 Jul 2018 00:43:22 +0200 Subject: [PATCH] Integration test --- build.sbt | 8 +- .../kmq/redelivery/IntegrationTest.scala | 88 +++++++++++++++++++ .../redelivery/infrastructure/KafkaSpec.scala | 29 ++++++ 3 files changed, 123 insertions(+), 2 deletions(-) create mode 100644 core/src/test/scala/com/softwaremill/kmq/redelivery/IntegrationTest.scala create mode 100644 core/src/test/scala/com/softwaremill/kmq/redelivery/infrastructure/KafkaSpec.scala diff --git a/build.sbt b/build.sbt index eacebcf..f584636 100644 --- a/build.sbt +++ b/build.sbt @@ -52,9 +52,13 @@ lazy val core = (project in file("core")) libraryDependencies ++= List( "org.apache.kafka" % "kafka-clients" % "1.0.0", "com.typesafe.akka" %% "akka-actor" % "2.5.6", + "com.typesafe.akka" %% "akka-stream" % "2.5.6", "com.typesafe.scala-logging" %% "scala-logging" % "3.7.2", - "org.scalatest" %% "scalatest" % "3.0.4" % "test" - ) + "org.scalatest" %% "scalatest" % "3.0.4" % "test", + "com.typesafe.akka" %% "akka-testkit" % "2.5.6" % "test", + "com.typesafe.akka" %% "akka-stream-kafka" % "0.17" % "test", + "net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("javax.jms", "jms") +) ) lazy val exampleJava = (project in file("example-java")) diff --git a/core/src/test/scala/com/softwaremill/kmq/redelivery/IntegrationTest.scala b/core/src/test/scala/com/softwaremill/kmq/redelivery/IntegrationTest.scala new file mode 100644 index 0000000..0aa5bfa --- /dev/null +++ b/core/src/test/scala/com/softwaremill/kmq/redelivery/IntegrationTest.scala @@ -0,0 +1,88 @@ +package com.softwaremill.kmq.redelivery + +import java.time.Duration +import java.util.Random + +import akka.actor.ActorSystem +import akka.kafka.scaladsl.{Consumer, Producer} +import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions} +import akka.stream.ActorMaterializer +import akka.testkit.TestKit +import com.softwaremill.kmq._ +import com.softwaremill.kmq.redelivery.infrastructure.KafkaSpec +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.StringDeserializer +import org.scalatest.concurrent.Eventually +import org.scalatest.time.{Seconds, Span} +import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} + +import scala.collection.mutable.ArrayBuffer + +class IntegrationTest extends TestKit(ActorSystem("test-system")) with FlatSpecLike with KafkaSpec with BeforeAndAfterAll with Eventually with Matchers { + + implicit val materializer = ActorMaterializer() + import system.dispatcher + + "KMQ" should "resend message if not committed" in { + val bootstrapServer = s"localhost:${testKafkaConfig.kafkaPort}" + val kmqConfig = new KmqConfig("queue", "markers", "kmq_client", "kmq_redelivery", Duration.ofSeconds(1).toMillis, + 1000) + + val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer) + .withBootstrapServers(bootstrapServer) + .withGroupId(kmqConfig.getMsgConsumerGroupId) + .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + val markerProducerSettings = ProducerSettings(system, + new MarkerKey.MarkerKeySerializer(), new MarkerValue.MarkerValueSerializer()) + .withBootstrapServers(bootstrapServer) + .withProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, classOf[ParititionFromMarkerKey].getName) + val markerProducer = markerProducerSettings.createKafkaProducer() + + val random = new Random() + + lazy val processedMessages = ArrayBuffer[String]() + lazy val receivedMessages = ArrayBuffer[String]() + + val control = Consumer.committableSource(consumerSettings, Subscriptions.topics(kmqConfig.getMsgTopic)) // 1. get messages from topic + .map { msg => + ProducerMessage.Message( + new ProducerRecord[MarkerKey, MarkerValue](kmqConfig.getMarkerTopic, MarkerKey.fromRecord(msg.record), new StartMarker(kmqConfig.getMsgTimeoutMs)), msg) + } + .via(Producer.flow(markerProducerSettings, markerProducer)) // 2. write the "start" marker + .map(_.message.passThrough) + .mapAsync(1) { msg => + msg.committableOffset.commitScaladsl().map(_ => msg.record) // this should be batched + } + .map { msg => + receivedMessages += msg.value + msg + } + .filter(_ => random.nextInt(5) != 0) + .map { processedMessage => + processedMessages += processedMessage.value + new ProducerRecord[MarkerKey, MarkerValue](kmqConfig.getMarkerTopic, MarkerKey.fromRecord(processedMessage), EndMarker.INSTANCE) + } + .to(Producer.plainSink(markerProducerSettings, markerProducer)) // 5. write "end" markers + .run() + + val redeliveryHook = RedeliveryTracker.start(new KafkaClients(bootstrapServer), kmqConfig) + + val messages = (0 to 20).map(_.toString) + messages.foreach(msg => sendToKafka(kmqConfig.getMsgTopic,msg)) + + eventually { + receivedMessages.size should be > processedMessages.size + processedMessages.sortBy(_.toInt).distinct shouldBe messages + }(PatienceConfig(timeout = Span(15, Seconds)), implicitly) + + redeliveryHook.close() + control.shutdown() + } + + override def afterAll(): Unit = { + super.afterAll() + TestKit.shutdownActorSystem(system) + } +} diff --git a/core/src/test/scala/com/softwaremill/kmq/redelivery/infrastructure/KafkaSpec.scala b/core/src/test/scala/com/softwaremill/kmq/redelivery/infrastructure/KafkaSpec.scala new file mode 100644 index 0000000..cccc96c --- /dev/null +++ b/core/src/test/scala/com/softwaremill/kmq/redelivery/infrastructure/KafkaSpec.scala @@ -0,0 +1,29 @@ +package com.softwaremill.kmq.redelivery.infrastructure + +import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import org.apache.kafka.common.serialization.StringDeserializer +import org.scalatest.{BeforeAndAfterEach, Suite} + +trait KafkaSpec extends BeforeAndAfterEach { self: Suite => + + val testKafkaConfig = EmbeddedKafkaConfig(9092, 2182) + private implicit val stringDeserializer = new StringDeserializer() + + def sendToKafka(topic: String, message: String): Unit = { + EmbeddedKafka.publishStringMessageToKafka(topic, message)(testKafkaConfig) + } + + def consumeFromKafka(topic: String): String = { + EmbeddedKafka.consumeFirstStringMessageFrom(topic)(testKafkaConfig) + } + + override def beforeEach(): Unit = { + super.beforeEach() + EmbeddedKafka.start()(testKafkaConfig) + } + + override def afterEach(): Unit = { + super.afterEach() + EmbeddedKafka.stop() + } +}