Skip to content

Commit

Permalink
Compute range partitionsSpec using effective maxRowsPerSegment (a…
Browse files Browse the repository at this point in the history
…pache#16987)

In the compaction config, a range type partitionsSpec supports setting one of maxRowsPerSegment and targetRowsPerSegment. When compaction is run with the native engine, while maxRowsPerSegment = x results in segments of size x, targetRowsPerSegment = y results in segments of size 1.5 * y.

MSQ only supports rowsPerSegment = x as part of its tuning config, the resulting segment size being approx. x -- which is in line with maxRowsPerSegment behaviour in native compaction.

This PR makes the following changes:

use effective maxRowsPerSegment to pass as rowsPerSegment parameter for MSQ
persist rowsPerSegment as maxRowsPerSegment in lastCompactionState for MSQ
Use effective maxRowsPerSegment-based range spec in CompactionStatus check for both Native and MSQ.
  • Loading branch information
gargvishesh authored Sep 9, 2024
1 parent b7a21a9 commit 37d4174
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1603,9 +1603,10 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo
if (shardSpec != null) {
if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
// Effective maxRowsPerSegment is propagated as rowsPerSegment in MSQ
partitionSpec = new DimensionRangePartitionsSpec(
tuningConfig.getRowsPerSegment(),
null,
tuningConfig.getRowsPerSegment(),
partitionDimensions,
false
);
Expand All @@ -1623,9 +1624,10 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo
)));
}
} else if (clusterBy != null && !clusterBy.getColumns().isEmpty()) {
// Effective maxRowsPerSegment is propagated as rowsPerSegment in MSQ
partitionSpec = new DimensionRangePartitionsSpec(
tuningConfig.getRowsPerSegment(),
null,
tuningConfig.getRowsPerSegment(),
clusterBy.getColumns()
.stream()
.map(KeyColumn::columnName).collect(Collectors.toList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexing.common.TaskToolbox;
Expand Down Expand Up @@ -286,19 +285,10 @@ private static MSQTuningConfig buildMSQTuningConfig(CompactionTask compactionTas

private static Integer getRowsPerSegment(CompactionTask compactionTask)
{
Integer rowsPerSegment = PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT;
if (compactionTask.getTuningConfig() != null) {
PartitionsSpec partitionsSpec = compactionTask.getTuningConfig().getPartitionsSpec();
if (partitionsSpec instanceof DynamicPartitionsSpec) {
rowsPerSegment = partitionsSpec.getMaxRowsPerSegment();
} else if (partitionsSpec instanceof DimensionRangePartitionsSpec) {
DimensionRangePartitionsSpec dimensionRangePartitionsSpec = (DimensionRangePartitionsSpec) partitionsSpec;
rowsPerSegment = dimensionRangePartitionsSpec.getTargetRowsPerSegment() != null
? dimensionRangePartitionsSpec.getTargetRowsPerSegment()
: dimensionRangePartitionsSpec.getMaxRowsPerSegment();
}
if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) {
return compactionTask.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment();
}
return rowsPerSegment;
return PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT;
}

private static RowSignature getRowSignature(DataSchema dataSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2643,8 +2643,8 @@ private CompactionState expectedCompactionState(
);
} else {
partitionsSpec = new DimensionRangePartitionsSpec(
MultiStageQueryContext.getRowsPerSegment(QueryContext.of(context)),
null,
MultiStageQueryContext.getRowsPerSegment(QueryContext.of(context)),
partitionDimensions,
false
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class MSQCompactionRunnerTest

private static final String TIMESTAMP_COLUMN = ColumnHolder.TIME_COLUMN_NAME;
private static final int TARGET_ROWS_PER_SEGMENT = 100000;
private static final int MAX_ROWS_PER_SEGMENT = 150000;
private static final GranularityType SEGMENT_GRANULARITY = GranularityType.HOUR;
private static final GranularityType QUERY_GRANULARITY = GranularityType.HOUR;
private static List<String> PARTITION_DIMENSIONS;
Expand Down Expand Up @@ -286,7 +287,7 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce
new MSQTuningConfig(
1,
MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
TARGET_ROWS_PER_SEGMENT,
MAX_ROWS_PER_SEGMENT,
null,
createIndexSpec()
),
Expand Down Expand Up @@ -326,7 +327,7 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess
DimFilter dimFilter = new SelectorDimFilter("dim1", "foo", null);

CompactionTask taskCreatedWithTransformSpec = createCompactionTask(
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, PARTITION_DIMENSIONS, false),
new DimensionRangePartitionsSpec(null, MAX_ROWS_PER_SEGMENT, PARTITION_DIMENSIONS, false),
dimFilter,
Collections.emptyMap(),
null,
Expand Down Expand Up @@ -364,7 +365,7 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess
new MSQTuningConfig(
1,
MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
TARGET_ROWS_PER_SEGMENT,
MAX_ROWS_PER_SEGMENT,
null,
createIndexSpec()
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,16 +692,26 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig(CompactionEngine eng

LOG.info("Auto compaction test with range partitioning");

final DimensionRangePartitionsSpec rangePartitionsSpec = new DimensionRangePartitionsSpec(
final DimensionRangePartitionsSpec inputRangePartitionsSpec = new DimensionRangePartitionsSpec(
5,
null,
ImmutableList.of("city"),
false
);
submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine);
DimensionRangePartitionsSpec expectedRangePartitionsSpec = inputRangePartitionsSpec;
if (engine == CompactionEngine.MSQ) {
// Range spec is transformed to its effective maxRowsPerSegment equivalent in MSQ
expectedRangePartitionsSpec = new DimensionRangePartitionsSpec(
null,
7,
ImmutableList.of("city"),
false
);
}
submitCompactionConfig(inputRangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(rangePartitionsSpec, 2);
verifySegmentsCompacted(expectedRangePartitionsSpec, 2);
checkCompactionIntervals(intervalsBeforeCompaction);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,27 @@ static PartitionsSpec findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuni
partitionsSpecFromTuningConfig.getMaxRowsPerSegment(),
((DynamicPartitionsSpec) partitionsSpecFromTuningConfig).getMaxTotalRowsOr(Long.MAX_VALUE)
);
} else if (partitionsSpecFromTuningConfig instanceof DimensionRangePartitionsSpec) {
return getEffectiveRangePartitionsSpec((DimensionRangePartitionsSpec) partitionsSpecFromTuningConfig);
} else {
return partitionsSpecFromTuningConfig;
}
}

/**
* Converts to have only the effective maxRowsPerSegment to avoid false positives when targetRowsPerSegment is set but
* effectively translates to the same maxRowsPerSegment.
*/
static DimensionRangePartitionsSpec getEffectiveRangePartitionsSpec(DimensionRangePartitionsSpec partitionsSpec)
{
return new DimensionRangePartitionsSpec(
null,
partitionsSpec.getMaxRowsPerSegment(),
partitionsSpec.getPartitionDimensions(),
partitionsSpec.isAssumeGrouped()
);
}

/**
* Evaluates {@link #CHECKS} to determine the compaction status.
*/
Expand Down Expand Up @@ -286,10 +302,14 @@ private CompactionStatus allCandidatesHaveSameCompactionState()

private CompactionStatus partitionsSpecIsUpToDate()
{
PartitionsSpec existingPartionsSpec = lastCompactionState.getPartitionsSpec();
if (existingPartionsSpec instanceof DimensionRangePartitionsSpec) {
existingPartionsSpec = getEffectiveRangePartitionsSpec((DimensionRangePartitionsSpec) existingPartionsSpec);
}
return CompactionStatus.completeIfEqual(
"partitionsSpec",
findPartitionsSpecFromConfig(tuningConfig),
lastCompactionState.getPartitionsSpec(),
existingPartionsSpec,
CompactionStatus::asString
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void testFindPartitionsSpecWhenGivenIsHashed()
}

@Test
public void testFindPartitionsSpecWhenGivenIsRange()
public void testFindPartitionsSpecWhenGivenIsRangeWithMaxRows()
{
final PartitionsSpec partitionsSpec =
new DimensionRangePartitionsSpec(null, 10000, Collections.singletonList("dim"), false);
Expand All @@ -174,6 +174,19 @@ public void testFindPartitionsSpecWhenGivenIsRange()
);
}

@Test
public void testFindPartitionsSpecWhenGivenIsRangeWithTargetRows()
{
final PartitionsSpec partitionsSpec =
new DimensionRangePartitionsSpec(10000, null, Collections.singletonList("dim"), false);
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
Assert.assertEquals(
new DimensionRangePartitionsSpec(null, 15000, Collections.singletonList("dim"), false),
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}

@Test
public void testStatusWhenLastCompactionStateIsNull()
{
Expand Down

0 comments on commit 37d4174

Please sign in to comment.