diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java index 1cb7af037529..ac3ef234a75b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java @@ -40,7 +40,7 @@ import java.util.List; import java.util.function.Function; -public abstract class BaseLeafFrameProcessor implements FrameProcessor +public abstract class BaseLeafFrameProcessor implements FrameProcessor { private final ReadableInput baseInput; private final ResourceHolder outputChannelHolder; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index ae154b383824..222b9d4a0bbb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.msq.querykit; import com.fasterxml.jackson.databind.ObjectMapper; @@ -9,7 +28,6 @@ import org.apache.druid.frame.channel.WritableFrameChannel; import org.apache.druid.frame.processor.ReturnOrAwait; import org.apache.druid.frame.read.FrameReader; -import org.apache.druid.frame.segment.FrameSegment; import org.apache.druid.frame.util.SettableLongVirtualColumn; import org.apache.druid.frame.write.FrameWriter; import org.apache.druid.frame.write.FrameWriterFactory; @@ -23,30 +41,24 @@ import org.apache.druid.query.operator.OffsetLimit; import org.apache.druid.query.operator.Operator; import org.apache.druid.query.operator.OperatorFactory; -import org.apache.druid.query.operator.SegmentToRowsAndColumnsOperator; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowAndColumns; -import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.SimpleSettableOffset; -import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; public class WindowOperatorQueryFrameProcessor extends BaseLeafFrameProcessor @@ -96,6 +108,7 @@ public WindowOperatorQueryFrameProcessor( this.frameWriterVirtualColumns = VirtualColumns.create(frameWriterVirtualColumns); } + // deep storage @Override protected ReturnOrAwait runWithSegment(SegmentWithDescriptor segment) throws IOException @@ -118,63 +131,80 @@ protected ReturnOrAwait runWithInputChannel(ReadableFrameChannel inputChan // Read the frames from the channel // convert to FrameRowsAndColumns - if (inputChannel.canRead()) { - Frame f = inputChannel.read(); - - // the frame here is row based - // frame rows and columns need columnar. discuss with Eric - // Action item: need to implement a new rows and columns that accept a row-based frame - - // Create a frame rows and columns what would - RowBasedFrameRowAndColumns frameRowsAndColumns = new RowBasedFrameRowAndColumns(f, inputFrameReader.signature()); - Operator op = getOperator(frameRowsAndColumns); - // - //Operator op = new SegmentToRowsAndColumnsOperator(frameSegment); - // On the operator created above add the operators present in the query that we want to chain - - for ( OperatorFactory of : query.getOperators()) { - op = of.wrap(op); - } - - // Let the operator run - // the results that come in the receiver must be pushed to the outout channel - // need to transform the output rows and columns back to frame - Operator.go(op, new Operator.Receiver() - { - @Override - public Operator.Signal push(RowsAndColumns rac) - { - //outputFrameChannel.output(rac.toFrame()); - LazilyDecoratedRowsAndColumns tmpldrc = new LazilyDecoratedRowsAndColumns(rac, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null); - Pair pairFrames = tmpldrc.naiveMaterializeToRowFrames(rac); - Frame f = Frame.wrap(pairFrames.lhs); - try { - Iterables.getOnlyElement(outputChannels()).write(new FrameWithPartition(f, FrameWithPartition.NO_PARTITION)); - } - catch (IOException e) { - throw new RuntimeException(e); - } - return Operator.Signal.GO; - } - - @Override - public void completed() - { - - } - }); - - } else if (inputChannel.isFinished()) { - return ReturnOrAwait.returnObject(Unit.instance()); - } else { - return ReturnOrAwait.awaitAll(inputChannels().size()); - } + if (inputChannel.canRead()) { + Frame f = inputChannel.read(); + + // the frame here is row based + // frame rows and columns need columnar. discuss with Eric + // Action item: need to implement a new rows and columns that accept a row-based frame + + // Create a frame rows and columns what would + RowBasedFrameRowAndColumns frameRowsAndColumns = new RowBasedFrameRowAndColumns(f, inputFrameReader.signature()); + Operator op = getOperator(frameRowsAndColumns); + // + //Operator op = new SegmentToRowsAndColumnsOperator(frameSegment); + // On the operator created above add the operators present in the query that we want to chain + + for (OperatorFactory of : query.getOperators()) { + op = of.wrap(op); + } + + // Let the operator run + // the results that come in the receiver must be pushed to the outout channel + // need to transform the output rows and columns back to frame + Operator.go(op, new Operator.Receiver() + { + @Override + public Operator.Signal push(RowsAndColumns rac) + { + //outputFrameChannel.output(rac.toFrame()); + LazilyDecoratedRowsAndColumns tmpldrc = new LazilyDecoratedRowsAndColumns( + rac, + null, + null, + null, + OffsetLimit.limit(Integer.MAX_VALUE), + null, + null + ); + Pair pairFrames = tmpldrc.naiveMaterializeToRowFrames(rac); + Frame f = Frame.wrap(pairFrames.lhs); + try { + Iterables.getOnlyElement(outputChannels()) + .write(new FrameWithPartition(f, FrameWithPartition.NO_PARTITION)); + } + catch (IOException e) { + throw new RuntimeException(e); + } + return Operator.Signal.GO; + } + + @Override + public void completed() + { + + } + }); + + } else if (inputChannel.isFinished()) { + return ReturnOrAwait.returnObject(Unit.instance()); + } else { + return ReturnOrAwait.awaitAll(inputChannels().size()); + } return ReturnOrAwait.runAgain(); } private static Operator getOperator(RowBasedFrameRowAndColumns frameRowsAndColumns) { - LazilyDecoratedRowsAndColumns ldrc = new LazilyDecoratedRowsAndColumns(frameRowsAndColumns, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null); + LazilyDecoratedRowsAndColumns ldrc = new LazilyDecoratedRowsAndColumns( + frameRowsAndColumns, + null, + null, + null, + OffsetLimit.limit(Integer.MAX_VALUE), + null, + null + ); // Create an operator on top of the created rows and columns Operator op = new Operator() { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java index 71267185e28d..0afd863460af 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.msq.querykit; import com.fasterxml.jackson.annotation.JsonCreator; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 347fb0a54dad..5028ada05a77 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.msq.querykit; import com.fasterxml.jackson.databind.ObjectMapper; @@ -17,6 +36,7 @@ public WindowOperatorQueryKit(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } + @Override public QueryDefinition makeQueryDefinition( String queryId, @@ -53,7 +73,7 @@ public QueryDefinition makeQueryDefinition( // Create a new stage which takes in the subquery as an input queryDefBuilder.add( StageDefinition.builder(firstStageNumber) - .inputs(new StageInputSpec(firstStageNumber-1)) + .inputs(new StageInputSpec(firstStageNumber - 1)) .signature(rowSignature) .maxWorkerCount(maxWorkerCount) .shuffleSpec(null) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index ba2a4b66c239..b0013f00b2d8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -53,6 +53,7 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FilteredAggregatorFactory; @@ -65,8 +66,14 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.operator.window.WindowFrame; +import org.apache.druid.query.operator.window.WindowFramedAggregateProcessor; +import org.apache.druid.query.operator.window.WindowOperatorFactory; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.spec.LegacySegmentSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinType; @@ -130,12 +137,12 @@ public class MSQSelectTest extends MSQTestBase public static Collection data() { Object[][] data = new Object[][]{ - {DEFAULT, DEFAULT_MSQ_CONTEXT} -// {DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT}, -// {FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT}, -// {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}, -// {QUERY_RESULTS_WITH_DURABLE_STORAGE, QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT}, -// {QUERY_RESULTS_WITH_DEFAULT, QUERY_RESULTS_WITH_DEFAULT_CONTEXT} + {DEFAULT, DEFAULT_MSQ_CONTEXT}, + {DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT}, + {FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT}, + {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}, + {QUERY_RESULTS_WITH_DURABLE_STORAGE, QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT}, + {QUERY_RESULTS_WITH_DEFAULT, QUERY_RESULTS_WITH_DEFAULT_CONTEXT} }; return Arrays.asList(data); @@ -621,29 +628,47 @@ public void testWindowOnFoo() .add("cc", ColumnType.DOUBLE) .build(); + final Query groupByQuery = GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration + .eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions( + new DefaultDimensionSpec( + "m1", + "d0", + ColumnType.FLOAT + ) + )) + .setContext(context) + .build(); + + + final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0); + final AggregatorFactory[] theAggs = { + new DoubleSumAggregatorFactory("w0", "d0") + }; + WindowFramedAggregateProcessor proc = new WindowFramedAggregateProcessor(theFrame, theAggs); + + final WindowOperatorQuery query = new WindowOperatorQuery( + new QueryDataSource(groupByQuery), + new LegacySegmentSpec(Intervals.ETERNITY), + context, + RowSignature.builder().add("d0", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(), + ImmutableList.of( + new NaivePartitioningOperatorFactory(ImmutableList.of("d0")), + new WindowOperatorFactory(proc) + ), + null + ); testSelectQuery() - .setSql("select m1,SUM(m1) OVER() cc from foo group by m1") + .setSql("select m1,SUM(m1) OVER(PARTITION BY m1) cc from foo group by m1") .setExpectedMSQSpec(MSQSpec.builder() - .query(GroupByQuery.builder() - .setDataSource(CalciteTests.DATASOURCE1) - .setInterval(querySegmentSpec(Filtration - .eternity())) - .setGranularity(Granularities.ALL) - .setDimensions(dimensions( - new DefaultDimensionSpec( - "cnt", - "d0", - ColumnType.LONG - ) - )) - .setAggregatorSpecs(aggregators(new CountAggregatorFactory( - "a0"))) - .setContext(context) - .build()) + .query(query) .columnMappings( new ColumnMappings(ImmutableList.of( - new ColumnMapping("d0", "cnt"), - new ColumnMapping("a0", "cnt1") + new ColumnMapping("d0", "m1"), + new ColumnMapping("w0", "cc") ) )) .tuningConfig(MSQTuningConfig.defaultConfig()) @@ -679,6 +704,90 @@ public void testWindowOnFoo() .verifyResults(); } + @Test + public void testWindowOnFooWithEmptyOver() + { + RowSignature rowSignature = RowSignature.builder() + .add("m1", ColumnType.FLOAT) + .add("cc", ColumnType.DOUBLE) + .build(); + + final Query groupByQuery = GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration + .eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions( + new DefaultDimensionSpec( + "m1", + "d0", + ColumnType.FLOAT + ) + )) + .setContext(context) + .build(); + + + final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0); + final AggregatorFactory[] theAggs = { + new DoubleSumAggregatorFactory("w0", "d0") + }; + WindowFramedAggregateProcessor proc = new WindowFramedAggregateProcessor(theFrame, theAggs); + + final WindowOperatorQuery query = new WindowOperatorQuery( + new QueryDataSource(groupByQuery), + new LegacySegmentSpec(Intervals.ETERNITY), + context, + RowSignature.builder().add("d0", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(), + ImmutableList.of( + new NaivePartitioningOperatorFactory(ImmutableList.of()), + new WindowOperatorFactory(proc) + ), + null + ); + testSelectQuery() + .setSql("select m1,SUM(m1) OVER() cc from foo group by m1") + .setExpectedMSQSpec(MSQSpec.builder() + .query(query) + .columnMappings( + new ColumnMappings(ImmutableList.of( + new ColumnMapping("d0", "m1"), + new ColumnMapping("w0", "cc") + ) + )) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination() + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build()) + .setExpectedRowSignature(rowSignature) + .setExpectedResultRows(ImmutableList.of( + new Object[]{1.0f, 21.0}, + new Object[]{2.0f, 21.0}, + new Object[]{3.0f, 21.0}, + new Object[]{4.0f, 21.0}, + new Object[]{5.0f, 21.0}, + new Object[]{6.0f, 21.0} + )) + .setQueryContext(context) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "shuffle" + ) + .verifyResults(); + } + @Test public void testSelectWithLimit() { diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java index a67cb519407f..22d61ca8f2a1 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import java.util.Arrays; +import java.util.Objects; public class WindowFramedAggregateProcessor implements Processor { @@ -96,4 +97,25 @@ public String toString() ", aggregations=" + Arrays.toString(aggregations) + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WindowFramedAggregateProcessor that = (WindowFramedAggregateProcessor) o; + return Objects.equals(frame, that.frame) && Arrays.equals(aggregations, that.aggregations); + } + + @Override + public int hashCode() + { + int result = Objects.hash(frame); + result = 31 * result + Arrays.hashCode(aggregations); + return result; + } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowAndColumns.java index 06f386161d93..7b7937ebb84b 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowAndColumns.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.query.rowsandcols.concrete; import org.apache.druid.frame.Frame; @@ -12,15 +31,15 @@ import javax.annotation.Nullable; import java.util.Collection; -import java.util.LinkedHashMap; public class RowBasedFrameRowAndColumns implements RowsAndColumns { private final Frame frame; private final RowSignature signature; - public RowBasedFrameRowAndColumns(Frame frame, RowSignature signature) { - this.frame = FrameType.ROW_BASED.ensureType(frame);; + public RowBasedFrameRowAndColumns(Frame frame, RowSignature signature) + { + this.frame = FrameType.ROW_BASED.ensureType(frame); this.signature = signature; }