diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 2f02c716a85c..8167b9b64e12 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -160,6 +160,36 @@ public TaskStatusPlus getTaskStatus(String taskID) } } + public StatusResponseHolder handoffTaskGroupEarly( + String dataSource, + String taskGroups + ) + { + try { + LOG.info("handing off %s %s", dataSource, taskGroups); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.POST, new URL(StringUtils.format( + "%ssupervisor/%s/taskGroups/handoff", + getIndexerURL(), + StringUtils.urlEncode(dataSource) + ))).setContent( + "application/json", + StringUtils.toUtf8(taskGroups) + ), + StatusResponseHandler.getInstance() + ).get(); + LOG.info("Handoff early response code " + response.getStatus().getCode()); + LOG.info("Handoff early response " + response.getContent()); + return response; + } + catch (ISE e) { + throw e; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + public List getAllTasks() { return getTasks("tasks"); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java index 3062a6fa04ba..c7907ae6fa5d 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java @@ -32,6 +32,7 @@ public class TaskResponseObject private final DateTime createdTime; private final DateTime queueInsertionTime; private final TaskState status; + private final Long duration; @JsonCreator private TaskResponseObject( @@ -39,7 +40,8 @@ private TaskResponseObject( @JsonProperty("type") String type, @JsonProperty("createdTime") DateTime createdTime, @JsonProperty("queueInsertionTime") DateTime queueInsertionTime, - @JsonProperty("status") TaskState status + @JsonProperty("status") TaskState status, + @JsonProperty("duration") Long duration ) { this.id = id; @@ -47,6 +49,7 @@ private TaskResponseObject( this.createdTime = createdTime; this.queueInsertionTime = queueInsertionTime; this.status = status; + this.duration = duration; } @JsonProperty @@ -78,4 +81,10 @@ public TaskState getStatus() { return status; } + + @JsonProperty + public Long getDuration() + { + return duration; + } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index 9ea1b5c402ef..e20c2ea20617 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.testing.IntegrationTestingConfig; @@ -75,6 +76,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after"; private static final int STREAM_SHARD_COUNT = 2; protected static final long CYCLE_PADDING_MS = 100; + private static final int LONG_DURATION_SUPERVISOR_MILLIS = 600 * 1000; private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json"; private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json"; @@ -82,6 +84,9 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest private static final String SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE = "supervisor_with_idle_behaviour_enabled_spec_template.json"; + private static final String SUPERVISOR_LONG_DURATION_TEMPLATE_FILE = + "supervisor_with_long_duration.json"; + protected static final String DATA_RESOURCE_ROOT = "/stream/data"; protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH = String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE); @@ -90,6 +95,9 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest protected static final String SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH = String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE); + protected static final String SUPERVISOR_WITH_LONG_DURATION_TEMPLATE_PATH = + String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_LONG_DURATION_TEMPLATE_FILE); + protected static final String SERIALIZER_SPEC_DIR = "serializer"; protected static final String INPUT_FORMAT_SPEC_DIR = "input_format"; protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser"; @@ -230,6 +238,113 @@ protected void doTestIndexDataStableState( } } + protected void doTestIndexDataHandoffEarly( + @Nullable Boolean transactionEnabled + ) throws Exception + { + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig( + INPUT_FORMAT, + getResourceAsString(JSON_INPUT_FORMAT_PATH) + ); + try ( + final Closeable closer = createResourceCloser(generatedTestConfig); + final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled) + ) { + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform() + .apply(getResourceAsString(SUPERVISOR_WITH_LONG_DURATION_TEMPLATE_PATH)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); + LOG.info("Submitted supervisor"); + + // Start generating half of the data + int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND; + int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; + final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator( + new JsonEventSerializer(jsonMapper), + EVENTS_PER_SECOND, + CYCLE_PADDING_MS + ); + long numWritten = streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateFirstRound, + FIRST_EVENT_TIME + ); + + // Make sure we consume the data written + long numWrittenHalf = numWritten; + ITRetryUtil.retryUntilTrue( + () -> + numWrittenHalf == this.queryHelper.countRows( + generatedTestConfig.getFullDatasourceName(), + Intervals.ETERNITY, + name -> new LongSumAggregatorFactory(name, "count") + ), + StringUtils.format( + "dataSource[%s] consumed [%,d] events, expected [%,d]", + generatedTestConfig.getFullDatasourceName(), + this.queryHelper.countRows( + generatedTestConfig.getFullDatasourceName(), + Intervals.ETERNITY, + name -> new LongSumAggregatorFactory(name, "count") + ), + numWritten + ) + ); + + // Trigger early handoff + StatusResponseHolder response = indexer.handoffTaskGroupEarly( + generatedTestConfig.getFullDatasourceName(), + jsonMapper.writeValueAsString( + ImmutableMap.of( + "taskGroupIds", ImmutableList.of(0) + ) + ) + ); + Assert.assertEquals(response.getStatus().getCode(), 200); + + // Load the rest of the data + numWritten += streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateRemaining, + FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound) + ); + + // Make sure we consume the rest of the data + long numWrittenAll = numWritten; + ITRetryUtil.retryUntilTrue( + () -> + numWrittenAll == this.queryHelper.countRows( + generatedTestConfig.getFullDatasourceName(), + Intervals.ETERNITY, + name -> new LongSumAggregatorFactory(name, "count") + ), + StringUtils.format( + "dataSource[%s] consumed [%,d] events, expected [%,d]", + generatedTestConfig.getFullDatasourceName(), + this.queryHelper.countRows( + generatedTestConfig.getFullDatasourceName(), + Intervals.ETERNITY, + name -> new LongSumAggregatorFactory(name, "count") + ), + numWritten + ) + ); + + // Wait for the early handoff task to complete and cheeck its duration + ITRetryUtil.retryUntilTrue( + () -> (!indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName()).isEmpty()), + "Waiting for Task Completion" + ); + + List completedTasks = indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName()); + Assert.assertEquals(completedTasks.stream().filter(taskResponseObject -> taskResponseObject.getDuration() < LONG_DURATION_SUPERVISOR_MILLIS).count(), 1); + } + } + void doTestIndexDataWithLosingCoordinator(@Nullable Boolean transactionEnabled) throws Exception { testIndexWithLosingNodeHelper( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java new file mode 100644 index 000000000000..e729680159da --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Test(groups = TestNGGroup.KAFKA_INDEX) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKafkaIndexingServiceStopTasksEarlyTest extends AbstractKafkaIndexingServiceTest +{ + @Override + public String getTestNamePrefix() + { + return "kafka_stop_tasks_early"; + } + + @BeforeClass + public void beforeClass() throws Exception + { + doBeforeClass(); + } + + + // This test does not actually check whether the tasks stopped early since the API does + // not make any guarantees about handoff. Instead it makes sure the handoff API can be called + // and that the tasks will eventually catch up with new data. + @Test + public void testStopTasksEarly() throws Exception + { + doTestIndexDataHandoffEarly(false); + } +} diff --git a/integration-tests/src/test/resources/stream/data/supervisor_with_long_duration.json b/integration-tests/src/test/resources/stream/data/supervisor_with_long_duration.json new file mode 100644 index 000000000000..180b5a5e24c2 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/supervisor_with_long_duration.json @@ -0,0 +1,57 @@ +{ + "type": "%%STREAM_TYPE%%", + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "parser": %%PARSER%%, + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": %%DIMENSIONS%%, + "dimensionExclusions": [], + "spatialDimensions": [] + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + } + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "MINUTE", + "queryGranularity": "NONE" + } + }, + "tuningConfig": { + "type": "%%STREAM_TYPE%%", + "intermediatePersistPeriod": "PT30S", + "maxRowsPerSegment": 5000000, + "maxRowsInMemory": 500000 + }, + "ioConfig": { + "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%", + "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%, + "taskCount": 1, + "replicas": 1, + "taskDuration": "PT600S", + "%%USE_EARLIEST_KEY%%": true, + "inputFormat" : %%INPUT_FORMAT%% + } +}