From 428f58cf1530962490abf67a668a149ac068794a Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Wed, 11 Sep 2024 18:00:13 -0400 Subject: [PATCH] Support maxColumnsToMerge in supervisor tuningConfig (#17030) * support maxColumnsToMerge in supervisor specs * remove log line * fix style * add docs * fix unit tests --- docs/ingestion/supervisor.md | 1 + .../RabbitStreamIndexTaskTuningConfig.java | 18 +++++++++++++----- .../RabbitStreamSupervisorTuningConfig.java | 12 +++++++++--- .../RabbitStreamIndexTaskTuningConfigTest.java | 3 ++- .../supervisor/RabbitStreamSupervisorTest.java | 4 +++- .../kafka/KafkaIndexTaskTuningConfig.java | 18 ++++++++++++------ .../KafkaSupervisorTuningConfig.java | 10 +++++++--- .../indexing/kafka/KafkaIndexTaskTest.java | 1 + .../kafka/KafkaIndexTaskTuningConfigTest.java | 12 ++++++++++-- .../kafka/supervisor/KafkaSupervisorTest.java | 9 ++++++++- ...TestModifiedKafkaIndexTaskTuningConfig.java | 4 +++- .../kinesis/KinesisIndexTaskTuningConfig.java | 18 ++++++++++++------ .../KinesisSupervisorTuningConfig.java | 11 ++++++++--- .../kinesis/KinesisIndexTaskSerdeTest.java | 1 + .../indexing/kinesis/KinesisIndexTaskTest.java | 3 ++- .../KinesisIndexTaskTuningConfigTest.java | 12 ++++++++++-- .../supervisor/KinesisSupervisorTest.java | 3 +++ ...stModifiedKinesisIndexTaskTuningConfig.java | 7 +++++-- .../SeekableStreamIndexTaskTuningConfig.java | 17 +++++++++++++++-- .../SeekableStreamSupervisorSpecTest.java | 1 + .../SeekableStreamSupervisorStateTest.java | 1 + 21 files changed, 127 insertions(+), 39 deletions(-) diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md index d5293ae581f8..242adb3d58cd 100644 --- a/docs/ingestion/supervisor.md +++ b/docs/ingestion/supervisor.md @@ -214,6 +214,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka tuning co |`logParseExceptions`|Boolean|If `true`, Druid logs an error message when a parsing exception occurs, containing information about the row where the error occurred.|No|`false`| |`maxParseExceptions`|Integer|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Setting `reportParseExceptions` overrides this limit.|No|unlimited| |`maxSavedParseExceptions`|Integer|When a parse exception occurs, Druid keeps track of the most recent parse exceptions. `maxSavedParseExceptions` limits the number of saved exception instances. These saved exceptions are available after the task finishes in the [task completion report](../ingestion/tasks.md#task-reports). Setting `reportParseExceptions` overrides this limit.|No|0| +|`maxColumnsToMerge`|Integer|Limit of the number of segments to merge in a single phase when merging segments for publishing. This limit affects the total number of columns present in a set of segments to merge. If the limit is exceeded, segment merging occurs in multiple phases. Druid merges at least 2 segments per phase, regardless of this setting.|No|-1| ## Start a supervisor diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java index 0086084c5706..288e28966a0c 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java @@ -74,7 +74,9 @@ public RabbitStreamIndexTaskTuningConfig( @Nullable Integer numPersistThreads, @Nullable Integer recordBufferSize, @Nullable Integer recordBufferOfferTimeout, - @Nullable Integer maxRecordsPerPoll) + @Nullable Integer maxRecordsPerPoll, + @Nullable Integer maxColumnsToMerge + ) { super( appendableIndexSpec, @@ -97,7 +99,8 @@ public RabbitStreamIndexTaskTuningConfig( logParseExceptions, maxParseExceptions, maxSavedParseExceptions, - numPersistThreads + numPersistThreads, + maxColumnsToMerge ); this.recordBufferSize = recordBufferSize; @@ -130,7 +133,8 @@ private RabbitStreamIndexTaskTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, - @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll + @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge ) { this( @@ -156,7 +160,9 @@ private RabbitStreamIndexTaskTuningConfig( numPersistThreads, recordBufferSize, recordBufferOfferTimeout, - maxRecordsPerPoll); + maxRecordsPerPoll, + maxColumnsToMerge + ); } @Nullable @@ -226,7 +232,8 @@ public RabbitStreamIndexTaskTuningConfig withBasePersistDirectory(File dir) getNumPersistThreads(), getRecordBufferSizeConfigured(), getRecordBufferOfferTimeout(), - getMaxRecordsPerPollConfigured() + getMaxRecordsPerPollConfigured(), + getMaxColumnsToMerge() ); } @@ -253,6 +260,7 @@ public String toString() ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + ", numPersistThreads=" + getNumPersistThreads() + ", maxRecordsPerPole=" + getMaxRecordsPerPollConfigured() + + ", maxColumnsToMerge=" + getMaxColumnsToMerge() + '}'; } diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java index c6db20eed91c..a2667026fffd 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java @@ -68,6 +68,7 @@ public static RabbitStreamSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -99,7 +100,9 @@ public RabbitStreamSupervisorTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, - @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll) + @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge + ) { super( appendableIndexSpec, @@ -124,7 +127,8 @@ public RabbitStreamSupervisorTuningConfig( numPersistThreads, recordBufferSize, recordBufferOfferTimeout, - maxRecordsPerPoll + maxRecordsPerPoll, + maxColumnsToMerge ); this.workerThreads = workerThreads; this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES); @@ -210,6 +214,7 @@ public String toString() ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + ", numPersistThreads=" + getNumPersistThreads() + ", maxRecordsPerPoll=" + getMaxRecordsPerPollConfigured() + + ", maxColumnsToMerge=" + getMaxColumnsToMerge() + '}'; } @@ -239,7 +244,8 @@ public RabbitStreamIndexTaskTuningConfig convertToTaskTuningConfig() getRecordBufferSizeConfigured(), getRecordBufferOfferTimeout(), getMaxRecordsPerPollConfigured(), - getNumPersistThreads() + getNumPersistThreads(), + getMaxColumnsToMerge() ); } } diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java index 193b35b8af1e..d507199495a3 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java @@ -180,7 +180,8 @@ public void testtoString() throws Exception "maxParseExceptions=0, " + "maxSavedParseExceptions=0, " + "numPersistThreads=1, " + - "maxRecordsPerPoll=null}"; + "maxRecordsPerPoll=null, " + + "maxColumnsToMerge=-1}"; Assert.assertEquals(resStr, config.toString()); diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index 5f31d70b75dd..a5b9b597afa5 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -158,7 +158,9 @@ public void setupTest() null, null, null, - 100); + 100, + null + ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); serviceEmitter = new StubServiceEmitter("RabbitStreamSupervisorTest", "localhost"); EmittingLogger.registerEmitter(serviceEmitter); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java index de4bc38fc7a5..b0b358da20bf 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java @@ -52,7 +52,8 @@ public KafkaIndexTaskTuningConfig( @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, @Nullable Integer maxSavedParseExceptions, - @Nullable Integer numPersistThreads + @Nullable Integer numPersistThreads, + @Nullable Integer maxColumnsToMerge ) { super( @@ -76,7 +77,8 @@ public KafkaIndexTaskTuningConfig( logParseExceptions, maxParseExceptions, maxSavedParseExceptions, - numPersistThreads + numPersistThreads, + maxColumnsToMerge ); } @@ -100,7 +102,8 @@ private KafkaIndexTaskTuningConfig( @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, - @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge ) { this( @@ -123,7 +126,8 @@ private KafkaIndexTaskTuningConfig( logParseExceptions, maxParseExceptions, maxSavedParseExceptions, - numPersistThreads + numPersistThreads, + maxColumnsToMerge ); } @@ -150,7 +154,8 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) isLogParseExceptions(), getMaxParseExceptions(), getMaxSavedParseExceptions(), - getNumPersistThreads() + getNumPersistThreads(), + getMaxColumnsToMerge() ); } @@ -177,7 +182,8 @@ public String toString() ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + ", numPersistThreads=" + getNumPersistThreads() + - '}'; + ", getMaxColumnsToMerge=" + getMaxColumnsToMerge() + + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index d32d694ad540..1e0b35874090 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -65,6 +65,7 @@ public static KafkaSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -93,7 +94,8 @@ public KafkaSupervisorTuningConfig( @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, - @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge ) { super( @@ -116,7 +118,8 @@ public KafkaSupervisorTuningConfig( logParseExceptions, maxParseExceptions, maxSavedParseExceptions, - numPersistThreads + numPersistThreads, + maxColumnsToMerge ); this.workerThreads = workerThreads; this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES); @@ -229,7 +232,8 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() isLogParseExceptions(), getMaxParseExceptions(), getMaxSavedParseExceptions(), - getNumPersistThreads() + getNumPersistThreads(), + getMaxColumnsToMerge() ); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index f8c6b23aae90..dd04c2309f6e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2861,6 +2861,7 @@ private KafkaIndexTask createTask( logParseExceptions, maxParseExceptions, maxSavedParseExceptions, + null, null ); if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 5c67f6e50212..20777b320a53 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -73,6 +73,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(false, config.isReportParseExceptions()); Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); Assert.assertEquals(1, config.getNumPersistThreads()); + Assert.assertEquals(-1, config.getMaxColumnsToMerge()); } @Test @@ -123,6 +124,7 @@ public void testSerdeWithNonDefaults() throws Exception config.getIndexSpecForIntermediatePersists() ); Assert.assertEquals(2, config.getNumPersistThreads()); + Assert.assertEquals(-1, config.getMaxColumnsToMerge()); } @Test @@ -152,7 +154,8 @@ public void testConvert() null, null, null, - 2 + 2, + 5 ); KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); @@ -168,6 +171,7 @@ public void testConvert() Assert.assertEquals(true, copy.isReportParseExceptions()); Assert.assertEquals(5L, copy.getHandoffConditionTimeout()); Assert.assertEquals(2, copy.getNumPersistThreads()); + Assert.assertEquals(5, copy.getMaxColumnsToMerge()); } @Test @@ -193,7 +197,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException true, 42, 42, - 2 + 2, + -1 ); String serialized = mapper.writeValueAsString(base); @@ -219,6 +224,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); + Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge()); } @Test @@ -244,6 +250,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException 42, 42, 2, + -1, "extra string" ); @@ -269,6 +276,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); + Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 86275d10e318..127cb72efcb1 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -346,7 +346,8 @@ public SeekableStreamIndexTaskClient build( null, null, null, - null + null, + null ); EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes(); @@ -497,6 +498,7 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException null, null, null, + null, null ), null @@ -4221,6 +4223,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ) ); @@ -4260,6 +4263,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ); @@ -4413,6 +4417,7 @@ public void testSequenceNameDoesNotChangeWithTaskId() null, null, null, + null, null ) ); @@ -4888,6 +4893,7 @@ public SeekableStreamIndexTaskClient build( null, null, 10, + null, null ); @@ -5002,6 +5008,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, null ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java index 8e5ad243d2e5..a2b7228bb46a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java @@ -56,6 +56,7 @@ public TestModifiedKafkaIndexTaskTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, @JsonProperty("extra") String extra ) { @@ -79,7 +80,8 @@ public TestModifiedKafkaIndexTaskTuningConfig( logParseExceptions, maxParseExceptions, maxSavedParseExceptions, - numPersistThreads + numPersistThreads, + maxColumnsToMerge ); this.extra = extra; } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java index a86541a04873..83b111e9e962 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java @@ -92,7 +92,8 @@ public KinesisIndexTaskTuningConfig( @Nullable Integer maxSavedParseExceptions, @Deprecated @Nullable Integer maxRecordsPerPoll, @Nullable Integer maxBytesPerPoll, - @Nullable Period intermediateHandoffPeriod + @Nullable Period intermediateHandoffPeriod, + @Nullable Integer maxColumnsToMerge ) { super( @@ -116,7 +117,8 @@ public KinesisIndexTaskTuningConfig( logParseExceptions, maxParseExceptions, maxSavedParseExceptions, - null + null, + maxColumnsToMerge ); this.recordBufferSize = recordBufferSize; this.recordBufferSizeBytes = recordBufferSizeBytes; @@ -161,7 +163,8 @@ private KinesisIndexTaskTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Deprecated @Nullable Integer maxRecordsPerPoll, @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll, - @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge ) { this( @@ -191,7 +194,8 @@ private KinesisIndexTaskTuningConfig( maxSavedParseExceptions, maxRecordsPerPoll, maxBytesPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + maxColumnsToMerge ); } @@ -294,7 +298,8 @@ public KinesisIndexTaskTuningConfig withBasePersistDirectory(File dir) getMaxSavedParseExceptions(), getMaxRecordsPerPollConfigured(), getMaxBytesPerPollConfigured(), - getIntermediateHandoffPeriod() + getIntermediateHandoffPeriod(), + getMaxColumnsToMerge() ); } @@ -363,6 +368,7 @@ public String toString() ", maxRecordsPerPoll=" + maxRecordsPerPoll + ", maxBytesPerPoll=" + maxBytesPerPoll + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + - '}'; + ", maxColumnsToMerge=" + getMaxColumnsToMerge() + + '}'; } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index a0a68c14bc0b..1a11f8d658b7 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -76,6 +76,7 @@ public static KinesisSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -113,7 +114,8 @@ public KinesisSupervisorTuningConfig( @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, - @JsonProperty("useListShards") Boolean useListShards + @JsonProperty("useListShards") Boolean useListShards, + @JsonProperty("maxColumnsToMerge") Integer maxColumnsToMerge ) { super( @@ -143,7 +145,8 @@ public KinesisSupervisorTuningConfig( maxSavedParseExceptions, maxRecordsPerPoll, maxBytesPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + maxColumnsToMerge ); this.workerThreads = workerThreads; @@ -244,6 +247,7 @@ public String toString() ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", repartitionTransitionDuration=" + getRepartitionTransitionDuration() + ", useListShards=" + isUseListShards() + + ", maxColumnsToMerge=" + getMaxColumnsToMerge() + '}'; } @@ -277,7 +281,8 @@ public KinesisIndexTaskTuningConfig convertToTaskTuningConfig() getMaxSavedParseExceptions(), getMaxRecordsPerPollConfigured(), getMaxBytesPerPollConfigured(), - getIntermediateHandoffPeriod() + getIntermediateHandoffPeriod(), + getMaxColumnsToMerge() ); } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index 60d8f686a28c..ed2758ddaefd 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -78,6 +78,7 @@ public class KinesisIndexTaskSerdeTest null, null, null, + null, null ); private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 510eaa797e07..d69e43ca660c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2374,7 +2374,8 @@ private KinesisIndexTask createTask( maxSavedParseExceptions, maxRecordsPerPoll, maxBytesPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + null ); return createTask(taskId, dataSchema, ioConfig, tuningConfig, context); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index b61c5cf2ae48..375e26e2ed28 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -134,6 +134,8 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(2, (int) config.getFetchThreads()); Assert.assertTrue(config.isSkipSequenceNumberAvailabilityCheck()); Assert.assertFalse(config.isResetOffsetAutomatically()); + Assert.assertEquals(-1, config.getMaxColumnsToMerge()); + } @Test @@ -166,7 +168,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException 500, 6000, 1_000_000, - new Period("P3D") + new Period("P3D"), + 1000 ); String serialized = mapper.writeValueAsString(base); @@ -197,6 +200,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException Assert.assertEquals(base.getRecordBufferSizeBytesConfigured(), deserialized.getRecordBufferSizeBytesConfigured()); Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); Assert.assertEquals(base.getMaxBytesPerPollConfigured(), deserialized.getMaxBytesPerPollConfigured()); + Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge()); } @Test @@ -229,7 +233,8 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException 500, 1_000_000, 6000, - new Period("P3D") + new Period("P3D"), + 1000 ); String serialized = mapper.writeValueAsString(new TestModifiedKinesisIndexTaskTuningConfig(base, "loool")); @@ -257,6 +262,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout()); Assert.assertEquals(base.getRecordBufferSizeBytesConfigured(), deserialized.getRecordBufferSizeBytesConfigured()); Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); + Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge()); } @Test @@ -322,6 +328,7 @@ public void testConvert() null, null, null, + null, null ); KinesisIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); @@ -345,6 +352,7 @@ public void testConvert() Assert.assertTrue(copy.isResetOffsetAutomatically()); Assert.assertEquals(10, (int) copy.getMaxRecordsPerPollConfigured()); Assert.assertEquals(new Period().withDays(Integer.MAX_VALUE), copy.getIntermediateHandoffPeriod()); + Assert.assertEquals(-1, copy.getMaxColumnsToMerge()); } @Test diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index e6ed27c9cecc..50b7203c629c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -207,6 +207,7 @@ public void setupTest() null, null, null, + null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -3980,6 +3981,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ); @@ -5159,6 +5161,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, null ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index ac84d2105cd1..b85c3edc2a65 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -63,6 +63,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll, @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, @JsonProperty("extra") String extra ) { @@ -93,7 +94,8 @@ public TestModifiedKinesisIndexTaskTuningConfig( maxSavedParseExceptions, maxRecordsPerPoll, maxBytesPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + maxColumnsToMerge ); this.extra = extra; } @@ -127,7 +129,8 @@ public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig bas base.getMaxSavedParseExceptions(), base.getMaxRecordsPerPollConfigured(), base.getMaxBytesPerPollConfigured(), - base.getIntermediateHandoffPeriod() + base.getIntermediateHandoffPeriod(), + base.getMaxColumnsToMerge() ); this.extra = extra; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index 34e4c5f0f2da..309336e1d532 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -41,6 +41,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato private static final IndexSpec DEFAULT_INDEX_SPEC = IndexSpec.DEFAULT; private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE; private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = Duration.ofMinutes(15).toMillis(); + private static final int DEFAULT_MAX_COLUMNS_TO_MERGE = -1; private final AppendableIndexSpec appendableIndexSpec; private final int maxRowsInMemory; @@ -66,6 +67,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato private final int maxSavedParseExceptions; private final int numPersistThreads; + private final int maxColumnsToMerge; public SeekableStreamIndexTaskTuningConfig( @Nullable AppendableIndexSpec appendableIndexSpec, @@ -88,7 +90,8 @@ public SeekableStreamIndexTaskTuningConfig( @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, @Nullable Integer maxSavedParseExceptions, - @Nullable Integer numPersistThreads + @Nullable Integer numPersistThreads, + @Nullable Integer maxColumnsToMerge ) { this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; @@ -139,6 +142,7 @@ public SeekableStreamIndexTaskTuningConfig( : logParseExceptions; this.numPersistThreads = numPersistThreads == null ? DEFAULT_NUM_PERSIST_THREADS : Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS); + this.maxColumnsToMerge = maxColumnsToMerge == null ? DEFAULT_MAX_COLUMNS_TO_MERGE : maxColumnsToMerge; } @Override @@ -289,6 +293,13 @@ public int getNumPersistThreads() return numPersistThreads; } + @Override + @JsonProperty + public int getMaxColumnsToMerge() + { + return maxColumnsToMerge; + } + @Override public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir); @@ -315,6 +326,7 @@ public boolean equals(Object o) maxParseExceptions == that.maxParseExceptions && maxSavedParseExceptions == that.maxSavedParseExceptions && numPersistThreads == that.numPersistThreads && + maxColumnsToMerge == that.maxColumnsToMerge && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && @@ -347,7 +359,8 @@ public int hashCode() logParseExceptions, maxParseExceptions, maxSavedParseExceptions, - numPersistThreads + numPersistThreads, + maxColumnsToMerge ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 3e0e46d7a033..4deee6ce9b8d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -522,6 +522,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, + null, null ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index daf85ac39c99..1f42ba7ce996 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2683,6 +2683,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, + null, null ) {