Skip to content

Commit

Permalink
generic block compressed complex columns (apache#16863)
Browse files Browse the repository at this point in the history
changes:
* Adds new `CompressedComplexColumn`, `CompressedComplexColumnSerializer`, `CompressedComplexColumnSupplier` based on `CompressedVariableSizedBlobColumn` used by JSON columns
* Adds `IndexSpec.complexMetricCompression` which can be used to specify compression for the generic compressed complex column. Defaults to uncompressed because compressed columns are not backwards compatible.
* Adds new definition of `ComplexMetricSerde.getSerializer` which accepts an `IndexSpec` argument when creating a serializer. The old signature has been marked `@Deprecated` and has a default implementation that returns `null`, but it will be used by the default implementation of the new version if it is implemented to return a non-null value. The default implementation of the new method will use a `CompressedComplexColumnSerializer` if `IndexSpec.complexMetricCompression` is not null/none/uncompressed, or will use `LargeColumnSupportedComplexColumnSerializer` otherwise.
* Removed all duplicate generic implementations of `ComplexMetricSerde.getSerializer` and `ComplexMetricSerde.deserializeColumn` into default implementations `ComplexMetricSerde` instead of being copied all over the place. The default implementation of `deserializeColumn` will check if the first byte indicates that the new compression was used, otherwise will use the `GenericIndexed` based supplier.
* Complex columns with custom serializers/deserializers are unaffected and may continue doing whatever it is they do, either with specialized compression or whatever else, this new stuff is just to provide generic implementations built around `ObjectStrategy`.
* add ObjectStrategy.readRetainsBufferReference so CompressedComplexColumn only copies on read if required
* add copyValueOnRead flag down to CompressedBlockReader to avoid buffer duplicate if the value needs copied anyway
  • Loading branch information
clintropolis authored Aug 27, 2024
1 parent ed3dbd6 commit f8301a3
Show file tree
Hide file tree
Showing 100 changed files with 1,155 additions and 614 deletions.
1 change: 1 addition & 0 deletions docs/ingestion/ingestion-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ For information on defining an `indexSpec` in a query context, see [SQL-based in
|stringDictionaryEncoding|Encoding format for string value dictionaries used by STRING and [COMPLEX<json>](../querying/nested-columns.md) columns. To enable front coding, set `stringDictionaryEncoding.type` to `frontCoded`. Optionally, you can specify the `bucketSize` and `formatVersion` properties. See [Front coding](#front-coding) for more information.|`{"type":"utf8"}`|
|metricCompression|Compression format for primitive type metric columns. Options are `lz4`, `lzf`, `zstd`, `uncompressed`, or `none` (which is more efficient than `uncompressed`, but not supported by older versions of Druid).|`lz4`|
|longEncoding|Encoding format for long-typed columns. Applies regardless of whether they are dimensions or metrics. Options are `auto` or `longs`. `auto` encodes the values using offset or lookup table depending on column cardinality, and store them with variable size. `longs` stores the value as-is with 8 bytes each.|`longs`|
|complexMetricCompression|Compression format for complex type metric columns. Options are `lz4`, `lzf`, `zstd`, `uncompressed`. Options other than `uncompressed` are not compatible with Druid versions older than 31, and only applies to complex metrics which do not have specialized column formats.|`uncompressed`|
|jsonCompression|Compression format to use for nested column raw data. Options are `lz4`, `lzf`, `zstd`, or `uncompressed`.|`lz4`|

#### Front coding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public String getTypeName()
}

@Override
@Nullable
public CompressedBigDecimal getRowValue(int rowNum)
{
int s = scale.get(rowNum);
Expand Down Expand Up @@ -96,7 +97,8 @@ public ColumnValueSelector makeColumnValueSelector(final ReadableOffset offset)
{
return new ObjectColumnSelector<CompressedBigDecimal>()
{
@Override @Nullable
@Override
@Nullable
public CompressedBigDecimal getObject()
{
return getRowValue(offset.getOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.compressedbigdecimal;

import org.apache.druid.data.input.InputRow;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
Expand Down Expand Up @@ -73,7 +74,8 @@ public CompressedBigDecimal extractValue(InputRow inputRow, String metricName)
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
builder.setComplexColumnSupplier(
CompressedBigDecimalColumnPartSupplier.fromByteBuffer(buffer));
CompressedBigDecimalColumnPartSupplier.fromByteBuffer(buffer)
);
}

/* (non-Javadoc)
Expand All @@ -83,7 +85,8 @@ public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
@Override
public CompressedBigDecimalLongColumnSerializer getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column
String column,
IndexSpec indexSpec
)
{
return CompressedBigDecimalLongColumnSerializer.create(segmentWriteOutMedium, column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,10 @@ public byte[] toBytes(CompressedBigDecimal val)

return buf.array();
}

@Override
public boolean readRetainsBufferReference()
{
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,9 @@
import com.datadoghq.sketch.ddsketch.DDSketch;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

import java.nio.ByteBuffer;


public class DDSketchComplexMetricSerde extends ComplexMetricSerde
Expand Down Expand Up @@ -84,31 +76,9 @@ public Object extractValue(final InputRow inputRow, final String metricName)
};
}

@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
final GenericIndexed<DDSketch> column = GenericIndexed.read(
buffer,
STRATEGY,
builder.getFileMapper()
);
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}

@Override
public ObjectStrategy<DDSketch> getObjectStrategy()
{
return STRATEGY;
}

@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(
segmentWriteOutMedium,
column,
this.getObjectStrategy()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,10 @@ public int compare(DDSketch o1, DDSketch o2)
{
return DDSketchAggregatorFactory.COMPARATOR.compare(o1, o2);
}

@Override
public boolean readRetainsBufferReference()
{
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.ddsketch;

import org.junit.Assert;
import org.junit.Test;

public class DDSketchObjectStrategyTest
{
@Test
public void testReadRetainsBufferReference()
{
DDSketchObjectStrategy strategy = new DDSketchObjectStrategy();
Assert.assertFalse(strategy.readRetainsBufferReference());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,9 @@

import org.apache.druid.data.input.InputRow;
import org.apache.druid.query.aggregation.momentsketch.aggregator.MomentSketchAggregatorFactory;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

import java.nio.ByteBuffer;

public class MomentSketchComplexMetricSerde extends ComplexMetricSerde
{
Expand Down Expand Up @@ -62,31 +54,9 @@ public Object extractValue(final InputRow inputRow, final String metricName)
};
}

@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
final GenericIndexed<MomentSketchWrapper> column = GenericIndexed.read(
buffer,
STRATEGY,
builder.getFileMapper()
);
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}

@Override
public ObjectStrategy<MomentSketchWrapper> getObjectStrategy()
{
return STRATEGY;
}

@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(
segmentWriteOutMedium,
column,
this.getObjectStrategy()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,10 @@ public int compare(MomentSketchWrapper o1, MomentSketchWrapper o2)
{
return MomentSketchAggregatorFactory.COMPARATOR.compare(o1, o2);
}

@Override
public boolean readRetainsBufferReference()
{
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.momentsketch;

import org.junit.Assert;
import org.junit.Test;

public class MomentSketchObjectStrategyTest
{
@Test
public void testReadRetainsBufferReference()
{
MomentSketchObjectStrategy strategy = new MomentSketchObjectStrategy();
Assert.assertFalse(strategy.readRetainsBufferReference());
}
}
Loading

0 comments on commit f8301a3

Please sign in to comment.