diff --git a/kinesis/pom.xml b/kinesis/pom.xml index fa42780..ae1c213 100644 --- a/kinesis/pom.xml +++ b/kinesis/pom.xml @@ -14,9 +14,9 @@ jar - 1.7.5 + 1.13.2 0.12.3 - 1.11.128 + 1.11.670 com.expedia.www.haystack.kinesis.span.collector.App ${project.artifactId}-${project.version} diff --git a/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/integration/LocalKafkaConsumer.scala b/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/integration/LocalKafkaConsumer.scala index 3f59a95..8f1bb79 100644 --- a/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/integration/LocalKafkaConsumer.scala +++ b/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/integration/LocalKafkaConsumer.scala @@ -78,7 +78,7 @@ trait LocalKafkaConsumer { } } - if(records.size < minExpectedCount) throw new RuntimeException("Fail to read the expected records from kafka") + if(records.size < minExpectedCount) throw new RuntimeException(s"Fail to read the expected records (${records.size}) from kafka") records.toList } diff --git a/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/integration/tests/KinesisSpanCollectorSpec.scala b/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/integration/tests/KinesisSpanCollectorSpec.scala index 0a6cb34..a3c6ad1 100644 --- a/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/integration/tests/KinesisSpanCollectorSpec.scala +++ b/kinesis/src/test/scala/com/expedia/www/haystack/kinesis/span/collector/integration/tests/KinesisSpanCollectorSpec.scala @@ -44,7 +44,7 @@ class KinesisSpanCollectorSpec extends IntegrationTestSpec { produceRecordsToKinesis(List(spanBytes, spanBytes)) Then("it should be pushed to kafka") - readRecordsFromKafka(0, 1.second).headOption + readRecordsFromKafka(0, 10.second).headOption } "read valid spans from kinesis and store individual spans in kafka" in { @@ -63,7 +63,7 @@ class KinesisSpanCollectorSpec extends IntegrationTestSpec { produceRecordsToKinesis(List(span_1, span_2, span_3, span_4)) Then("it should be pushed to kafka with partition key as its trace id") - val records = readRecordsFromKafka(4, 5.seconds) + val records = readRecordsFromKafka(4, 10.seconds) val externalrecords = readRecordsFromExternalKafka(0, 10.seconds) externalrecords.size shouldEqual 0 records.size shouldEqual 4 @@ -97,7 +97,7 @@ class KinesisSpanCollectorSpec extends IntegrationTestSpec { produceRecordsToKinesis(List(span_1, span_2, span_3, span_4)) Then("it should be pushed to default kafka and external kafka with partition key as its trace id") - val records = readRecordsFromKafka(4, 5.seconds) + val records = readRecordsFromKafka(4, 10.seconds) val numConsumers = ProjectConfiguration.externalKafkaConfig().size val externalrecords = readRecordsFromExternalKafka(4 * numConsumers, (10 * numConsumers).seconds) externalrecords.size should equal(4) @@ -121,7 +121,7 @@ class KinesisSpanCollectorSpec extends IntegrationTestSpec { produceRecordsToKinesis(List(span_1)) Then("the appropriate span decorator plugin should be loaded using spi") - val records = readRecordsFromKafka(1, 5.seconds) + val records = readRecordsFromKafka(1, 10.seconds) records should not be empty val spans = records.map(Span.parseFrom)