Skip to content

Commit

Permalink
Add new test for handoff API (#16492)
Browse files Browse the repository at this point in the history
* Add new test for handoff API

* Add new method

* fix test

* Update test
  • Loading branch information
georgew5656 authored May 28, 2024
1 parent 21f725f commit f7013e0
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,36 @@ public TaskStatusPlus getTaskStatus(String taskID)
}
}

public StatusResponseHolder handoffTaskGroupEarly(
String dataSource,
String taskGroups
)
{
try {
LOG.info("handing off %s %s", dataSource, taskGroups);
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(StringUtils.format(
"%ssupervisor/%s/taskGroups/handoff",
getIndexerURL(),
StringUtils.urlEncode(dataSource)
))).setContent(
"application/json",
StringUtils.toUtf8(taskGroups)
),
StatusResponseHandler.getInstance()
).get();
LOG.info("Handoff early response code " + response.getStatus().getCode());
LOG.info("Handoff early response " + response.getContent());
return response;
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

public List<TaskResponseObject> getAllTasks()
{
return getTasks("tasks");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,24 @@ public class TaskResponseObject
private final DateTime createdTime;
private final DateTime queueInsertionTime;
private final TaskState status;
private final Long duration;

@JsonCreator
private TaskResponseObject(
@JsonProperty("id") String id,
@JsonProperty("type") String type,
@JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
@JsonProperty("status") TaskState status
@JsonProperty("status") TaskState status,
@JsonProperty("duration") Long duration
)
{
this.id = id;
this.type = type;
this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime;
this.status = status;
this.duration = duration;
}

@JsonProperty
Expand Down Expand Up @@ -78,4 +81,10 @@ public TaskState getStatus()
{
return status;
}

@JsonProperty
public Long getDuration()
{
return duration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.testing.IntegrationTestingConfig;
Expand Down Expand Up @@ -75,13 +76,17 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
private static final int STREAM_SHARD_COUNT = 2;
protected static final long CYCLE_PADDING_MS = 100;
private static final int LONG_DURATION_SUPERVISOR_MILLIS = 600 * 1000;

private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json";
private static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE = "supervisor_with_autoscaler_spec_template.json";
private static final String SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE =
"supervisor_with_idle_behaviour_enabled_spec_template.json";

private static final String SUPERVISOR_LONG_DURATION_TEMPLATE_FILE =
"supervisor_with_long_duration.json";

protected static final String DATA_RESOURCE_ROOT = "/stream/data";
protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
Expand All @@ -90,6 +95,9 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
protected static final String SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE);

protected static final String SUPERVISOR_WITH_LONG_DURATION_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_LONG_DURATION_TEMPLATE_FILE);

protected static final String SERIALIZER_SPEC_DIR = "serializer";
protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser";
Expand Down Expand Up @@ -230,6 +238,113 @@ protected void doTestIndexDataStableState(
}
}

protected void doTestIndexDataHandoffEarly(
@Nullable Boolean transactionEnabled
) throws Exception
{
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
INPUT_FORMAT,
getResourceAsString(JSON_INPUT_FORMAT_PATH)
);
try (
final Closeable closer = createResourceCloser(generatedTestConfig);
final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
) {
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
.apply(getResourceAsString(SUPERVISOR_WITH_LONG_DURATION_TEMPLATE_PATH));
LOG.info("supervisorSpec: [%s]\n", taskSpec);
// Start supervisor
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
LOG.info("Submitted supervisor");

// Start generating half of the data
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
new JsonEventSerializer(jsonMapper),
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
long numWritten = streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateFirstRound,
FIRST_EVENT_TIME
);

// Make sure we consume the data written
long numWrittenHalf = numWritten;
ITRetryUtil.retryUntilTrue(
() ->
numWrittenHalf == this.queryHelper.countRows(
generatedTestConfig.getFullDatasourceName(),
Intervals.ETERNITY,
name -> new LongSumAggregatorFactory(name, "count")
),
StringUtils.format(
"dataSource[%s] consumed [%,d] events, expected [%,d]",
generatedTestConfig.getFullDatasourceName(),
this.queryHelper.countRows(
generatedTestConfig.getFullDatasourceName(),
Intervals.ETERNITY,
name -> new LongSumAggregatorFactory(name, "count")
),
numWritten
)
);

// Trigger early handoff
StatusResponseHolder response = indexer.handoffTaskGroupEarly(
generatedTestConfig.getFullDatasourceName(),
jsonMapper.writeValueAsString(
ImmutableMap.of(
"taskGroupIds", ImmutableList.of(0)
)
)
);
Assert.assertEquals(response.getStatus().getCode(), 200);

// Load the rest of the data
numWritten += streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateRemaining,
FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
);

// Make sure we consume the rest of the data
long numWrittenAll = numWritten;
ITRetryUtil.retryUntilTrue(
() ->
numWrittenAll == this.queryHelper.countRows(
generatedTestConfig.getFullDatasourceName(),
Intervals.ETERNITY,
name -> new LongSumAggregatorFactory(name, "count")
),
StringUtils.format(
"dataSource[%s] consumed [%,d] events, expected [%,d]",
generatedTestConfig.getFullDatasourceName(),
this.queryHelper.countRows(
generatedTestConfig.getFullDatasourceName(),
Intervals.ETERNITY,
name -> new LongSumAggregatorFactory(name, "count")
),
numWritten
)
);

// Wait for the early handoff task to complete and cheeck its duration
ITRetryUtil.retryUntilTrue(
() -> (!indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName()).isEmpty()),
"Waiting for Task Completion"
);

List<TaskResponseObject> completedTasks = indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName());
Assert.assertEquals(completedTasks.stream().filter(taskResponseObject -> taskResponseObject.getDuration() < LONG_DURATION_SUPERVISOR_MILLIS).count(), 1);
}
}

void doTestIndexDataWithLosingCoordinator(@Nullable Boolean transactionEnabled) throws Exception
{
testIndexWithLosingNodeHelper(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.tests.indexer;

import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

@Test(groups = TestNGGroup.KAFKA_INDEX)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKafkaIndexingServiceStopTasksEarlyTest extends AbstractKafkaIndexingServiceTest
{
@Override
public String getTestNamePrefix()
{
return "kafka_stop_tasks_early";
}

@BeforeClass
public void beforeClass() throws Exception
{
doBeforeClass();
}


// This test does not actually check whether the tasks stopped early since the API does
// not make any guarantees about handoff. Instead it makes sure the handoff API can be called
// and that the tasks will eventually catch up with new data.
@Test
public void testStopTasksEarly() throws Exception
{
doTestIndexDataHandoffEarly(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"type": "%%STREAM_TYPE%%",
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"parser": %%PARSER%%,
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": %%DIMENSIONS%%,
"dimensionExclusions": [],
"spatialDimensions": []
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "MINUTE",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "%%STREAM_TYPE%%",
"intermediatePersistPeriod": "PT30S",
"maxRowsPerSegment": 5000000,
"maxRowsInMemory": 500000
},
"ioConfig": {
"%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
"%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT600S",
"%%USE_EARLIEST_KEY%%": true,
"inputFormat" : %%INPUT_FORMAT%%
}
}

0 comments on commit f7013e0

Please sign in to comment.