diff --git a/query/engine_test.go b/query/engine_test.go index ba1294f67..2f2a1dc34 100644 --- a/query/engine_test.go +++ b/query/engine_test.go @@ -12,83 +12,66 @@ import ( "github.com/polarsignals/frostdb/dynparquet" schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1" "github.com/polarsignals/frostdb/query/logicalplan" - "github.com/polarsignals/frostdb/query/physicalplan" ) func TestUniqueAggregation(t *testing.T) { - tests := map[string]struct { - execOptions []physicalplan.Option - }{ - "no concurrency": { - execOptions: []physicalplan.Option{ - physicalplan.WithConcurrency(1), - }, - }, - "default": { - execOptions: []physicalplan.Option{}, - }, - } - for name, test := range tests { - t.Run(name, func(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer mem.AssertSize(t, 0) + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) - schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{ - Name: "test", - Columns: []*schemapb.Column{{ - Name: "example", - StorageLayout: &schemapb.StorageLayout{ - Type: schemapb.StorageLayout_TYPE_INT64, - }, - }, { - Name: "timestamp", - StorageLayout: &schemapb.StorageLayout{ - Type: schemapb.StorageLayout_TYPE_INT64, - }, - }}, - }) - require.NoError(t, err) + schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{ + Name: "test", + Columns: []*schemapb.Column{{ + Name: "example", + StorageLayout: &schemapb.StorageLayout{ + Type: schemapb.StorageLayout_TYPE_INT64, + }, + }, { + Name: "timestamp", + StorageLayout: &schemapb.StorageLayout{ + Type: schemapb.StorageLayout_TYPE_INT64, + }, + }}, + }) + require.NoError(t, err) - rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{ - Name: "example", - Type: arrow.PrimitiveTypes.Int64, - }, { - Name: "timestamp", - Type: arrow.PrimitiveTypes.Int64, - }}, nil)) - defer rb.Release() + rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{ + Name: "example", + Type: arrow.PrimitiveTypes.Int64, + }, { + Name: "timestamp", + Type: arrow.PrimitiveTypes.Int64, + }}, nil)) + defer rb.Release() - rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil) - rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil) + rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil) + rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil) - r := rb.NewRecord() - defer r.Release() + r := rb.NewRecord() + defer r.Release() - ran := false - err = NewEngine(mem, &FakeTableProvider{ - Tables: map[string]logicalplan.TableReader{ - "test": &FakeTableReader{ - FrostdbSchema: schema, - Records: []arrow.Record{r}, - }, - }, - }, WithPhysicalplanOptions(test.execOptions...)).ScanTable("test"). - Aggregate( - []*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))}, - []logicalplan.Expr{logicalplan.Col("timestamp")}, - ). - Execute(context.Background(), func(ctx context.Context, r arrow.Record) error { - require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values()) - require.True(t, r.Column(1).(*array.Int64).IsNull(0)) - require.True(t, r.Column(1).(*array.Int64).IsValid(1)) - require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1)) - ran = true - return nil - }) - require.NoError(t, err) - require.True(t, ran) + ran := false + err = NewEngine(mem, &FakeTableProvider{ + Tables: map[string]logicalplan.TableReader{ + "test": &FakeTableReader{ + FrostdbSchema: schema, + Records: []arrow.Record{r}, + }, + }, + }).ScanTable("test"). + Aggregate( + []*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))}, + []logicalplan.Expr{logicalplan.Col("timestamp")}, + ). + Execute(context.Background(), func(ctx context.Context, r arrow.Record) error { + require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values()) + require.True(t, r.Column(1).(*array.Int64).IsNull(0)) + require.True(t, r.Column(1).(*array.Int64).IsValid(1)) + require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1)) + ran = true + return nil }) - } + require.NoError(t, err) + require.True(t, ran) } func TestAndAggregation(t *testing.T) { diff --git a/query/physicalplan/physicalplan.go b/query/physicalplan/physicalplan.go index 0b718dd37..787d39d16 100644 --- a/query/physicalplan/physicalplan.go +++ b/query/physicalplan/physicalplan.go @@ -17,7 +17,8 @@ import ( "github.com/polarsignals/frostdb/recovery" ) -var defaultConcurrency = runtime.GOMAXPROCS(0) +// TODO: Make this smarter. +var concurrencyHardcoded = runtime.GOMAXPROCS(0) type PhysicalPlan interface { Callback(ctx context.Context, r arrow.Record) error @@ -254,29 +255,22 @@ func (p *noopOperator) Draw() *Diagram { return p.next.Draw() } -type ExecOptions struct { +type execOptions struct { orderedAggregations bool overrideInput []PhysicalPlan readMode logicalplan.ReadMode - concurrency int } -func NewExecOptions() ExecOptions { - return ExecOptions{ - concurrency: defaultConcurrency, - } -} - -type Option func(o *ExecOptions) +type Option func(o *execOptions) func WithReadMode(m logicalplan.ReadMode) Option { - return func(o *ExecOptions) { + return func(o *execOptions) { o.readMode = m } } func WithOrderedAggregations() Option { - return func(o *ExecOptions) { + return func(o *execOptions) { o.orderedAggregations = true } } @@ -284,17 +278,11 @@ func WithOrderedAggregations() Option { // WithOverrideInput can be used to provide an input stage on top of which the // Build function can build the physical plan. func WithOverrideInput(input []PhysicalPlan) Option { - return func(o *ExecOptions) { + return func(o *execOptions) { o.overrideInput = input } } -func WithConcurrency(concurrency int) Option { - return func(o *ExecOptions) { - o.concurrency = concurrency - } -} - func Build( ctx context.Context, pool memory.Allocator, @@ -306,7 +294,7 @@ func Build( _, span := tracer.Start(ctx, "PhysicalPlan/Build") defer span.End() - execOpts := NewExecOptions() + execOpts := execOptions{} for _, o := range options { o(&execOpts) } @@ -330,7 +318,7 @@ func Build( // Create noop operators since we don't know what to push the scan // results to. In a following node visit, these noops will have // SetNext called on them and push to the correct operator. - plans := make([]PhysicalPlan, execOpts.concurrency) + plans := make([]PhysicalPlan, concurrencyHardcoded) for i := range plans { plans[i] = &noopOperator{} } @@ -345,7 +333,7 @@ func Build( // Create noop operators since we don't know what to push the scan // results to. In a following node visit, these noops will have // SetNext called on them and push to the correct operator. - plans := make([]PhysicalPlan, execOpts.concurrency) + plans := make([]PhysicalPlan, concurrencyHardcoded) for i := range plans { plans[i] = &noopOperator{} } @@ -447,12 +435,13 @@ func Build( ordered = false } var sync PhysicalPlan - // These aggregate operators need to be synchronized. - // NOTE: that in the case of concurrency 1 we still add a syncronizer because the Aggregation operator expects a final aggregation to be performed. - if ordered && len(plan.Aggregation.GroupExprs) > 0 { - sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs) - } else { - sync = Synchronize(len(prev)) + if len(prev) > 1 { + // These aggregate operators need to be synchronized. + if ordered && len(plan.Aggregation.GroupExprs) > 0 { + sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs) + } else { + sync = Synchronize(len(prev)) + } } seed := maphash.MakeSeed() for i := 0; i < len(prev); i++ { @@ -511,7 +500,7 @@ func Build( } func shouldPlanOrderedAggregate( - execOpts ExecOptions, oInfo *planOrderingInfo, agg *logicalplan.Aggregation, + execOpts execOptions, oInfo *planOrderingInfo, agg *logicalplan.Aggregation, ) (bool, error) { if !execOpts.orderedAggregations { // Ordered aggregations disabled.