Skip to content

Commit

Permalink
Minor fixes and enhancements in UnionQuery handling (apache#17483)
Browse files Browse the repository at this point in the history
* plan consistently with either UnionDataSource or UnionQuery for decoupled mode
* expose errors
* move decoupled related setting from PlannerConfig to QueryContexts
  • Loading branch information
kgyrtkirk authored Nov 28, 2024
1 parent ddbb985 commit c1ef38b
Show file tree
Hide file tree
Showing 23 changed files with 388 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,12 @@ protected QueryTestBuilder testBuilder()
*/
@Test
@Override
public void testUnionIsUnplannable()
public void testUnionDifferentColumnOrder()
{
assertQueryIsUnplannable(
"SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo",
"SQL requires union between two tables and column names queried for each table are different Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]."
);

}

@Disabled("Ignored till MSQ can plan UNION ALL with any operand")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,4 +661,13 @@ public String toString()
"context=" + context +
'}';
}

public boolean isDecoupledMode()
{
String value = getString(
QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED
);
return QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED.equals(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.java.util.common.StringUtils;

import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -105,6 +106,10 @@ public class QueryContexts
// SQL statement resource specific keys
public static final String CTX_EXECUTION_MODE = "executionMode";

public static final String CTX_NATIVE_QUERY_SQL_PLANNING_MODE = "plannerStrategy";
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED = "COUPLED";
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED = "DECOUPLED";

// Defaults
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,9 @@ public String toString()
"dataSources=" + dataSources +
'}';
}

public static boolean isCompatibleDataSource(DataSource dataSource)
{
return (dataSource instanceof TableDataSource || dataSource instanceof InlineDataSource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,13 @@ public boolean isSkipEmptyBuckets()
@Override
public RowSignature getResultRowSignature(Finalization finalization)
{
final Finalization finalization1 = finalization;
final RowSignature.Builder builder = RowSignature.builder();
builder.addTimeColumn();
String timestampResultField = getTimestampResultField();
if (StringUtils.isNotEmpty(timestampResultField)) {
builder.add(timestampResultField, ColumnType.LONG);
}
builder.addAggregators(aggregatorSpecs, finalization1);
builder.addAggregators(aggregatorSpecs, finalization);
builder.addPostAggregators(postAggregatorSpecs);
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public Sequence<Object> run(QueryPlus<Object> queryPlus, ResponseContext respons
Sequence run = runner.run(queryPlus.withQuery(q), responseContext);
seqs.add(run);
}

return Sequences.concat(seqs);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,28 @@ public void testDefaultEnableQueryDebugging()
assertTrue(QueryContext.of(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true)).isDebug());
}

@Test
public void testIsDecoupled()
{
assertFalse(QueryContext.empty().isDecoupledMode());
assertTrue(
QueryContext.of(
ImmutableMap.of(
QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED
)
).isDecoupledMode()
);
assertFalse(
QueryContext.of(
ImmutableMap.of(
QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
"garbage"
)
).isDecoupledMode()
);
}

// This test is a bit silly. It is retained because another test uses the
// LegacyContextQuery test.
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.NoopDataSource;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -62,6 +64,17 @@ public void test_constructor_empty()
new UnionDataSource(Collections.emptyList());
}

@Test
public void test_isCompatible()
{
TableDataSource tableDataSource = new TableDataSource("foo");
InlineDataSource inlineDataSource = InlineDataSource.fromIterable(Collections.emptyList(), RowSignature.empty());

Assert.assertTrue(UnionDataSource.isCompatibleDataSource(tableDataSource));
Assert.assertTrue(UnionDataSource.isCompatibleDataSource(inlineDataSource));
Assert.assertFalse(UnionDataSource.isCompatibleDataSource(new NoopDataSource()));
}

@Test
public void test_getTableNames()
{
Expand Down Expand Up @@ -131,7 +144,7 @@ public void test_withChildren_sameNumber()
//noinspection unchecked
Assert.assertEquals(
new UnionDataSource(newDataSources),
unionDataSource.withChildren((List) newDataSources)
unionDataSource.withChildren(newDataSources)
);
}

Expand Down
9 changes: 4 additions & 5 deletions sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-metrics</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
Expand Down Expand Up @@ -239,11 +243,6 @@
<version>1.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pl.pragmatists</groupId>
<artifactId>JUnitParams</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ public class PlannerConfig
public static final String CTX_KEY_USE_NATIVE_QUERY_EXPLAIN = "useNativeQueryExplain";
public static final String CTX_KEY_FORCE_EXPRESSION_VIRTUAL_COLUMNS = "forceExpressionVirtualColumns";
public static final String CTX_MAX_NUMERIC_IN_FILTERS = "maxNumericInFilters";
public static final String CTX_NATIVE_QUERY_SQL_PLANNING_MODE = "plannerStrategy";
public static final int NUM_FILTER_NOT_USED = -1;

@JsonProperty
private int maxTopNLimit = 100_000;

Expand Down Expand Up @@ -75,9 +73,7 @@ public class PlannerConfig
private int maxNumericInFilters = NUM_FILTER_NOT_USED;

@JsonProperty
private String nativeQuerySqlPlanningMode = NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED; // can be COUPLED or DECOUPLED
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED = "COUPLED";
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED = "DECOUPLED";
private String nativeQuerySqlPlanningMode = QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED; // can be COUPLED or DECOUPLED

private boolean serializeComplexValues = true;

Expand Down Expand Up @@ -383,7 +379,7 @@ public Builder withOverrides(final Map<String, Object> queryContext)
maxNumericInFilters);
nativeQuerySqlPlanningMode = QueryContexts.parseString(
queryContext,
CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
nativeQuerySqlPlanningMode
);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Numbers;
Expand Down Expand Up @@ -508,6 +509,9 @@ public String getPlanningError()
*/
public void setPlanningError(String formatText, Object... arguments)
{
if (queryContext().isDecoupledMode()) {
throw InvalidSqlInput.exception(formatText, arguments);
}
planningError = StringUtils.nonStrictFormat(formatText, arguments);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.calcite.tools.Frameworks;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthConfig;
Expand Down Expand Up @@ -196,7 +197,7 @@ public SqlConformance conformance()
}
});

if (PlannerConfig.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED
if (QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED
.equals(plannerConfig().getNativeQuerySqlPlanningMode())
) {
frameworkConfigBuilder.costFactory(new DruidVolcanoCost.Factory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -540,7 +541,7 @@ protected PlannerResult planWithDruidConvention() throws ValidationException

if (plannerContext.getPlannerConfig()
.getNativeQuerySqlPlanningMode()
.equals(PlannerConfig.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED)
.equals(QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED)
) {
RelNode newRoot = parameterized;
newRoot = planner.transform(
Expand Down
Loading

0 comments on commit c1ef38b

Please sign in to comment.