Skip to content

Commit

Permalink
MSQ: Remove unnecessary snapshot deserialization code. (#16116)
Browse files Browse the repository at this point in the history
Since #13205, a special deserializer module has no longer been necessary
to read key collector snapshots. This patch removes the unnecessary code.
  • Loading branch information
gianm authored Mar 18, 2024
1 parent 7d307df commit 36bc94c
Show file tree
Hide file tree
Showing 8 changed files with 9 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -783,18 +783,10 @@ public void updatePartialKeyStatisticsInformation(
addToKernelManipulationQueue(
queryKernel -> {
final StageId stageId = queryKernel.getStageId(stageNumber);

// We need a specially-decorated ObjectMapper to deserialize key statistics.
final StageDefinition stageDef = queryKernel.getStageDefinition(stageId);
final ObjectMapper mapper = MSQTasks.decorateObjectMapperForKeyCollectorSnapshot(
context.jsonMapper(),
stageDef.getShuffleSpec().clusterBy(),
stageDef.getShuffleSpec().doesAggregate()
);

final PartialKeyStatisticsInformation partialKeyStatisticsInformation;

try {
partialKeyStatisticsInformation = mapper.convertValue(
partialKeyStatisticsInformation = context.jsonMapper().convertValue(
partialKeyStatisticsInformationObject,
PartialKeyStatisticsInformation.class
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

package org.apache.druid.msq.exec;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.guice.MultiStageQuery;
Expand All @@ -39,10 +37,6 @@
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
import org.apache.druid.msq.statistics.KeyCollectorFactory;
import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
import org.apache.druid.msq.statistics.KeyCollectorSnapshotDeserializerModule;
import org.apache.druid.msq.statistics.KeyCollectors;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.server.DruidNode;
import org.apache.druid.storage.NilStorageConnector;
Expand Down Expand Up @@ -125,24 +119,6 @@ public static long primaryTimestampFromObjectForInsert(final Object timestamp)
}
}

/**
* Returns a decorated copy of an ObjectMapper that knows how to deserialize the appropriate kind of
* {@link KeyCollectorSnapshot}.
*/
static ObjectMapper decorateObjectMapperForKeyCollectorSnapshot(
final ObjectMapper mapper,
final ClusterBy clusterBy,
final boolean aggregate
)
{
final KeyCollectorFactory<?, ?> keyCollectorFactory =
KeyCollectors.makeStandardFactory(clusterBy, aggregate);

final ObjectMapper mapperCopy = mapper.copy();
mapperCopy.registerModule(new KeyCollectorSnapshotDeserializerModule(keyCollectorFactory));
return mapperCopy;
}

/**
* Returns the host:port from a {@link DruidNode}. Convenience method to make it easier to construct
* {@link MSQErrorReport} instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@

package org.apache.druid.msq.statistics;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import org.apache.druid.frame.key.RowKey;

import java.io.IOException;
import java.util.Comparator;
import java.util.Optional;

Expand Down Expand Up @@ -53,46 +48,6 @@ public DelegateOrMinKeyCollector<TDelegate> newKeyCollector()
return new DelegateOrMinKeyCollector<>(comparator, delegateFactory.newKeyCollector(), null);
}

@Override
public JsonDeserializer<DelegateOrMinKeyCollectorSnapshot<TSnapshot>> snapshotDeserializer()
{
final JsonDeserializer<TSnapshot> delegateDeserializer = delegateFactory.snapshotDeserializer();

return new JsonDeserializer<DelegateOrMinKeyCollectorSnapshot<TSnapshot>>()
{
@Override
public DelegateOrMinKeyCollectorSnapshot<TSnapshot> deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException
{
TSnapshot delegateSnapshot = null;
RowKey minKey = null;

if (!jp.isExpectedStartObjectToken()) {
ctxt.reportWrongTokenException(this, JsonToken.START_OBJECT, null);
}

JsonToken token;

while ((token = jp.nextToken()) != JsonToken.END_OBJECT) {
if (token != JsonToken.FIELD_NAME) {
ctxt.reportWrongTokenException(this, JsonToken.FIELD_NAME, null);
}

final String fieldName = jp.getText();
jp.nextToken();

if (DelegateOrMinKeyCollectorSnapshot.FIELD_SNAPSHOT.equals(fieldName)) {
delegateSnapshot = delegateDeserializer.deserialize(jp, ctxt);
} else if (DelegateOrMinKeyCollectorSnapshot.FIELD_MIN_KEY.equals(fieldName)) {
minKey = jp.readValueAs(RowKey.class);
}
}

return new DelegateOrMinKeyCollectorSnapshot<>(delegateSnapshot, minKey);
}
};
}

@Override
public DelegateOrMinKeyCollectorSnapshot<TSnapshot> toSnapshot(final DelegateOrMinKeyCollector<TDelegate> collector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@

package org.apache.druid.msq.statistics;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import it.unimi.dsi.fastutil.objects.Object2LongRBTreeMap;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.RowKey;

import java.io.IOException;
import java.util.Comparator;
import java.util.stream.Collectors;

Expand All @@ -51,19 +47,6 @@ public DistinctKeyCollector newKeyCollector()
return new DistinctKeyCollector(comparator);
}

@Override
public JsonDeserializer<DistinctKeySnapshot> snapshotDeserializer()
{
return new JsonDeserializer<DistinctKeySnapshot>()
{
@Override
public DistinctKeySnapshot deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException
{
return jp.readValueAs(DistinctKeySnapshot.class);
}
};
}

@Override
public DistinctKeySnapshot toSnapshot(final DistinctKeyCollector collector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,13 @@

package org.apache.druid.msq.statistics;

import com.fasterxml.jackson.databind.JsonDeserializer;

public interface KeyCollectorFactory<TCollector extends KeyCollector<TCollector>, TSnapshot extends KeyCollectorSnapshot>
{
/**
* Create a new {@link KeyCollector}
*/
TCollector newKeyCollector();

/**
* Fetches the deserializer that can be used to deserialize the snapshots created by the KeyCollectors corresponding
* to this factory
*/
JsonDeserializer<TSnapshot> snapshotDeserializer();

/**
* Serializes a {@link KeyCollector} to a {@link KeyCollectorSnapshot}
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

package org.apache.druid.msq.statistics;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.ByteArrayUtil;
Expand All @@ -31,7 +28,6 @@
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.java.util.common.StringUtils;

import java.io.IOException;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Comparator;
Expand Down Expand Up @@ -61,20 +57,6 @@ public QuantilesSketchKeyCollector newKeyCollector()
return new QuantilesSketchKeyCollector(comparator, ItemsSketch.getInstance(byte[].class, SKETCH_INITIAL_K, comparator), 0);
}

@Override
public JsonDeserializer<QuantilesSketchKeyCollectorSnapshot> snapshotDeserializer()
{
return new JsonDeserializer<QuantilesSketchKeyCollectorSnapshot>()
{
@Override
public QuantilesSketchKeyCollectorSnapshot deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException
{
return jp.readValueAs(QuantilesSketchKeyCollectorSnapshot.class);
}
};
}

@Override
public QuantilesSketchKeyCollectorSnapshot toSnapshot(QuantilesSketchKeyCollector collector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void test_clusterByX_unique()
);
}

verifySnapshotSerialization(testName, collector, aggregate);
verifySnapshotSerialization(testName, collector);
}
);
}
Expand Down Expand Up @@ -187,7 +187,7 @@ public void test_clusterByX_everyKeyAppearsTwice()
);
}

verifySnapshotSerialization(testName, collector, aggregate);
verifySnapshotSerialization(testName, collector);
}
);
}
Expand Down Expand Up @@ -245,7 +245,7 @@ public void test_clusterByX_everyKeyAppearsTwice_withAggregation()
);
}

verifySnapshotSerialization(testName, collector, aggregate);
verifySnapshotSerialization(testName, collector);
}
);
}
Expand Down Expand Up @@ -309,7 +309,7 @@ public void test_clusterByXYbucketByX_threeX_uniqueY()
}
}

verifySnapshotSerialization(testName, collector, aggregate);
verifySnapshotSerialization(testName, collector);
}
);
}
Expand Down Expand Up @@ -380,7 +380,7 @@ public void test_clusterByXYbucketByX_maxX_uniqueY()
}
}

verifySnapshotSerialization(testName, collector, aggregate);
verifySnapshotSerialization(testName, collector);
}
);
}
Expand Down Expand Up @@ -446,7 +446,7 @@ public void test_clusterByXYbucketByX_maxX_lowCardinalityY_withAggregation()
}
}

verifySnapshotSerialization(testName, collector, aggregate);
verifySnapshotSerialization(testName, collector);
}
);
}
Expand Down Expand Up @@ -945,21 +945,11 @@ private static long trackedRows(final ClusterByStatisticsCollectorImpl collector

private static void verifySnapshotSerialization(
final String testName,
final ClusterByStatisticsCollector collector,
final boolean aggregate
final ClusterByStatisticsCollector collector
)
{
try {
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
jsonMapper.registerModule(
new KeyCollectorSnapshotDeserializerModule(
KeyCollectors.makeStandardFactory(
collector.getClusterBy(),
aggregate
)
)
);

final ClusterByStatisticsSnapshot snapshot = collector.snapshot();
final ClusterByStatisticsSnapshot snapshot2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(snapshot),
Expand Down

0 comments on commit 36bc94c

Please sign in to comment.