Skip to content

Commit

Permalink
MSQ: Fix validation of time position in collations. (apache#16961)
Browse files Browse the repository at this point in the history
* MSQ: Fix validation of time position in collations.

It is possible for the collation to refer to a field that isn't mapped,
such as when the DML includes "CLUSTERED BY some_function(some_field)".
In this case, the collation refers to a projected column that is not
part of the field mappings. Prior to this patch, that would lead to an
out of bounds list access on fieldMappings.

This patch fixes the problem by identifying the position of __time in
the fieldMappings first, rather than retrieving each collation field
from fieldMappings.

Fixes a bug introduced in apache#16849.

* Fix test. Better warning message.
  • Loading branch information
gianm authored Aug 27, 2024
1 parent 3b88b57 commit ed3dbd6
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -479,22 +479,29 @@ private static void validateSortOrderBeginsWithTimeIfRequired(
);
}
} else if (!rootCollation.getFieldCollations().isEmpty()) {
int timePosition = -1;
int timePositionInRow = -1;
for (int i = 0; i < fieldMappings.size(); i++) {
Entry<Integer, String> entry = fieldMappings.get(i);
if (ColumnHolder.TIME_COLUMN_NAME.equals(entry.getValue())) {
timePositionInRow = i;
break;
}
}

int timePositionInCollation = -1;
for (int i = 0; i < rootCollation.getFieldCollations().size(); i++) {
final String fieldCollationName =
fieldMappings.get(rootCollation.getFieldCollations().get(i).getFieldIndex()).getValue();
if (ColumnHolder.TIME_COLUMN_NAME.equals(fieldCollationName)) {
timePosition = i;
if (rootCollation.getFieldCollations().get(i).getFieldIndex() == timePositionInRow) {
timePositionInCollation = i;
break;
}
}

if (timePosition > 0) {
if (timePositionInCollation > 0) {
throw InvalidSqlInput.exception(
"Sort order (CLUSTERED BY) cannot include[%s] in position[%d] unless context parameter[%s] "
+ "is set to[false]. %s",
ColumnHolder.TIME_COLUMN_NAME,
timePosition,
timePositionInCollation,
MultiStageQueryContext.CTX_FORCE_TIME_SORT,
DimensionsSpec.WARNING_NON_TIME_SORT_ORDER
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,86 @@ public void testReplaceOnFooWithAllClusteredByDim(String contextName, Map<String
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String contextName, Map<String, Object> context)
{
// Tests [CLUSTERED BY LOWER(dim1)], i.e. an expression that is not actually stored.
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("m1", ColumnType.FLOAT)
.build();

DataSegment existingDataSegment0 = DataSegment.builder()
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
.size(50)
.version(MSQTestTaskActionClient.VERSION)
.dataSource("foo")
.build();

DataSegment existingDataSegment1 = DataSegment.builder()
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
.size(50)
.version(MSQTestTaskActionClient.VERSION)
.dataSource("foo")
.build();

Mockito.doCallRealMethod()
.doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1))
.when(testTaskActionClient)
.submit(new RetrieveUsedSegmentsAction("foo", ImmutableList.of(Intervals.ETERNITY)));

testIngestQuery().setSql(" REPLACE INTO foo OVERWRITE ALL "
+ "SELECT __time, dim1, m1 "
+ "FROM foo "
+ "PARTITIONED BY ALL "
+ "CLUSTERED BY LOWER(dim1)")
.setExpectedDataSource("foo")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedSegments(
ImmutableSet.of(
SegmentId.of("foo", Intervals.ETERNITY, "test", 0)
)
)
.setExpectedShardSpec(NumberedShardSpec.class)
.setExpectedResultRows(
ImmutableList.of(
new Object[]{946684800000L, NullHandling.sqlCompatible() ? "" : null, 1.0f},
new Object[]{946771200000L, "10.1", 2.0f},
new Object[]{946857600000L, "2", 3.0f},
new Object[]{978307200000L, "1", 4.0f},
new Object[]{978393600000L, "def", 5.0f},
new Object[]{978480000000L, "abc", 6.0f}
)
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(6),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
DimensionsSpec.builder()
.setDimensions(
ImmutableList.of(
new StringDimensionSchema("dim1"),
new FloatDimensionSchema("m1")
)
)
.setDimensionExclusions(Collections.singletonList("__time"))
.build(),
GranularityType.ALL,
Intervals.ETERNITY
)
)
.verifyResults();
}

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByExpression(String contextName, Map<String, Object> context)
{
RowSignature rowSignature = RowSignature.builder()
.add("dim1", ColumnType.STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ public class DimensionsSpec
public static final String WARNING_NON_TIME_SORT_ORDER = StringUtils.format(
"Warning: support for segments not sorted by[%s] is experimental. Such segments are not readable by older "
+ "version of Druid, and certain queries cannot run on them. See "
+ "https://druid.apache.org/docs/latest/ingestion/partitioning#sorting for details before using this option.",
ColumnHolder.TIME_COLUMN_NAME
+ "https://druid.apache.org/docs/latest/ingestion/partitioning#sorting for details before setting "
+ "%s to[false].",
ColumnHolder.TIME_COLUMN_NAME,
PARAMETER_FORCE_TIME_SORT
);

public static final boolean DEFAULT_FORCE_TIME_SORT = true;
Expand Down

0 comments on commit ed3dbd6

Please sign in to comment.