Skip to content

Commit

Permalink
Track IngestionState more accurately in realtime tasks. (apache#16934)
Browse files Browse the repository at this point in the history
Previously, SeekableStreamIndexTaskRunner set ingestion state to
COMPLETED when it finished reading data from Kafka. This is incorrect.
After the changes in this patch, the transitions go:

1) The task stays in BUILD_SEGMENTS after it finishes reading from Kafka,
   while it is building its final set of segments to publish.

2) The task transitions to SEGMENT_AVAILABILITY_WAIT after publishing,
   while waiting for handoff.

3) The task transitions to COMPLETED immediately before exiting, when
   truly done.
  • Loading branch information
gianm authored Aug 22, 2024
1 parent 7256953 commit a83125e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ The `ingestionState` shows what step of ingestion the task reached. Possible sta
- `NOT_STARTED`: The task has not begun reading any rows
- `DETERMINE_PARTITIONS`: The task is processing rows to determine partitioning
- `BUILD_SEGMENTS`: The task is processing rows to construct segments
- `SEGMENT_AVAILABILITY_WAIT`: The task has published its segments and is waiting for them to become available.
- `COMPLETED`: The task has finished its work.

Only batch tasks have the DETERMINE_PARTITIONS phase. Realtime tasks such as those created by the Kafka Indexing Service do not have a DETERMINE_PARTITIONS phase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
Expand Down Expand Up @@ -1617,6 +1618,10 @@ public void testMultipleParseExceptionsSuccess() throws Exception

IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
Expand Down Expand Up @@ -1697,6 +1702,10 @@ public void testMultipleParseExceptionsFailure() throws Exception

IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
Assert.assertNotNull(reportData.getErrorMsg());

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
Expand Down Expand Up @@ -3057,9 +3066,13 @@ public void testParseExceptionsInIteratorConstructionSuccess() throws Exception
newDataSchemaMetadata()
);

// Verify unparseable data
IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());

// Verify unparseable data
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);

Expand Down Expand Up @@ -3190,9 +3203,14 @@ public void testParseExceptionsBeyondThresholdTaskFails() throws Exception
Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
Assert.assertNull(newDataSchemaMetadata());

// Verify ingestion state and error message
final IngestionStatsAndErrors reportData = getTaskReportData();
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
Assert.assertNotNull(reportData.getErrorMsg());

// Verify there is no unparseable data in the report since we've 0 saved parse exceptions
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(getTaskReportData(), RowIngestionMeters.BUILD_SEGMENTS);
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);

Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages());
}
Expand Down Expand Up @@ -3231,6 +3249,12 @@ public void testCompletionReportPartitionStats() throws Exception

Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());

// Verify report metrics
Assert.assertEquals(reportData.getRecordsProcessed().size(), 1);
Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L);
}
Expand Down Expand Up @@ -3279,6 +3303,12 @@ public void testCompletionReportMultiplePartitionStats() throws Exception

Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());

// Verify report metrics
Assert.assertEquals(reportData.getRecordsProcessed().size(), 2);
Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
Expand Down Expand Up @@ -1186,6 +1187,10 @@ public void testMultipleParseExceptionsSuccess() throws Exception

IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
Expand Down Expand Up @@ -1272,6 +1277,10 @@ public void testMultipleParseExceptionsFailure() throws Exception

IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
Assert.assertNotNull(reportData.getErrorMsg());

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,6 @@ public void onFailure(Throwable t)
}
}
}
ingestionState = IngestionState.COMPLETED;
}
catch (Exception e) {
// (1) catch all exceptions while reading from kafka
Expand Down Expand Up @@ -835,6 +834,7 @@ public void onFailure(Throwable t)
// failed to persist sequences. It might also return null if handoff failed, but was recoverable.
// See publishAndRegisterHandoff() for details.
List<SegmentsAndCommitMetadata> handedOffList = Collections.emptyList();
ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOffList = Futures.allAsList(handOffWaitList).get();
} else {
Expand Down Expand Up @@ -928,6 +928,7 @@ public void onFailure(Throwable t)
}
}

ingestionState = IngestionState.COMPLETED;
toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null, handoffWaitMs));
return TaskStatus.success(task.getId());
}
Expand Down

0 comments on commit a83125e

Please sign in to comment.