Skip to content

Commit

Permalink
don't inject +score unless coordinator requests it; this is a cleaner…
Browse files Browse the repository at this point in the history
… approach than ignoring it when serialization fails later
  • Loading branch information
jbellis committed Dec 12, 2024
1 parent c0de416 commit 2babc86
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement
{
// TODO remove this when we no longer need to downgrade to replicas that don't know about synthetic columns,
// and the related code in
// - Columns.Serializer.encodeBitmap
// - UnfilteredSerializer.serializeRowBody)
// - StatementRestrictions.addOrderingRestrictions
// - StorageAttachedIndexSearcher.PrimaryKeyIterator constructor
public static final boolean ANN_USE_SYNTHETIC_SCORE = Boolean.parseBoolean(System.getProperty("cassandra.sai.ann_use_synthetic_score", "false"));

private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class);
Expand Down
8 changes: 0 additions & 8 deletions src/java/org/apache/cassandra/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -716,15 +716,7 @@ private static long encodeBitmap(Collection<ColumnMetadata> columns, Columns sup
for (ColumnMetadata column : columns)
{
if (iter.next(column) == null)
{
// We can't know for sure whether to add the synthetic score column because WildcardColumnFilter
// just says "yes" to everything; instead, we just skip it here.
// TODO remove this with SelectStatement.ANN_USE_SYNTHETIC_SCORE.
if (column.isSynthetic())
continue;

throw new IllegalStateException(columns + " is not a subset of " + superset);
}

int currentIndex = iter.indexOfCurrent();
int count = currentIndex - expectIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,6 @@ private void serializeRowBody(Row row, int flags, SerializationHelper helper, Da
// with. So we use the ColumnMetadata from the "header" which is "current". Also see #11810 for what
// happens if we don't do that.
ColumnMetadata column = si.next(cd.column());
// We can't know for sure whether to add the synthetic score column because WildcardColumnFilter
// just says "yes" to everything; instead, we just skip it here.
// TODO remove this with SelectStatement.ANN_USE_SYNTHETIC_SCORE.
if (column == null)
return;
assert column != null : cd.column.toString();

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ public TableMetadata metadata()
return command.metadata();
}

public ReadCommand command()
{
return command;
}

RowFilter.FilterElement filterOperation()
{
// NOTE: we cannot remove the order by filter expression here yet because it is used in the FilterTree class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.FloatType;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
Expand Down Expand Up @@ -636,7 +637,7 @@ public UnfilteredRowIterator readAndValidatePartition(PrimaryKey pk, List<Primar
}
}
}
return isRowValid ? new PrimaryKeyIterator(partition, staticRow, row, sourceKeys) : null;
return isRowValid ? new PrimaryKeyIterator(partition, staticRow, row, sourceKeys, controller.command()) : null;
}
}

Expand All @@ -657,7 +658,7 @@ public static class PrimaryKeyIterator extends AbstractUnfilteredRowIterator
private boolean consumed = false;
private final Unfiltered row;

public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfiltered content, List<PrimaryKeyWithSortKey> primaryKeysWithScore)
public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfiltered content, List<PrimaryKeyWithSortKey> primaryKeysWithScore, ReadCommand command)
{
super(partition.metadata(),
partition.partitionKey(),
Expand All @@ -668,7 +669,24 @@ public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfilt
partition.stats());

assert !primaryKeysWithScore.isEmpty();
if (!content.isRow() || !(primaryKeysWithScore.get(0) instanceof PrimaryKeyWithScore))
var isScoredRow = primaryKeysWithScore.get(0) instanceof PrimaryKeyWithScore;
if (!content.isRow() || !isScoredRow)
{
this.row = content;
return;
}

// When +score is added on the coordinator side, it's represented as a PrecomputedColumnFilter
// even in a 'SELECT *' because WCF is not capable of representing synthetic columns.
// This can be simplified when we remove ANN_USE_SYNTHETIC_SCORE
var tm = metadata();
var scoreColumn = ColumnMetadata.syntheticColumn(tm.keyspace,
tm.name,
ColumnMetadata.SYNTHETIC_SCORE_ID,
FloatType.instance);
var isScoreFetched = !(command.columnFilter() instanceof ColumnFilter.WildCardColumnFilter)
&& !command.columnFilter().fetches(scoreColumn);
if (!isScoreFetched)
{
this.row = content;
return;
Expand All @@ -680,11 +698,6 @@ public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfilt
columnData.addAll(originalRow.columnData());

// inject +score as a new column
var tm = metadata();
var scoreColumn = ColumnMetadata.syntheticColumn(tm.keyspace,
tm.name,
ColumnMetadata.SYNTHETIC_SCORE_ID,
FloatType.instance);
var pkWithScore = (PrimaryKeyWithScore) primaryKeysWithScore.get(0);
columnData.add(BufferCell.live(scoreColumn,
FBUtilities.nowInSeconds(),
Expand Down

0 comments on commit 2babc86

Please sign in to comment.