-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Iceberg/Comet integration POC #9841
base: main
Are you sure you want to change the base?
Conversation
...k/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnReader.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the right direction to take. I did an initial high-level pass. Looking forward to having a Comet release soon.
...k/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnReader.java
Outdated
Show resolved
Hide resolved
...k/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/build.gradle
Outdated
} | ||
|
||
compileOnly "org.apache.comet:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.1.0-SNAPSHOT" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this library will only contain the reader, not the operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. This only contains the reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need to be Spark Version Dependent? Just wondering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are currently doing some experiments to see if we can provide a Spark Version independent jar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for exploring that.
...ain/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnarBatchReader.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnarBatchReader.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergColumnarBatchReader.java
Outdated
Show resolved
Hide resolved
...rk/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/iceberg/spark/data/vectorized/comet/CometIcebergPositionColumnReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
Outdated
Show resolved
Hide resolved
...v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/BaseColumnBatchLoader.java
Outdated
Show resolved
Hide resolved
....4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometColumnReader.java
Outdated
Show resolved
Hide resolved
build.gradle
Outdated
@@ -45,6 +45,7 @@ buildscript { | |||
} | |||
} | |||
|
|||
String sparkMajorVersion = '3.4' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope we can soon have a snapshot for Comet jar independent of Spark to clean up deps here.
We can't have parquet
module depend on a jar with any Spark deps.
spark/v3.4/build.gradle
Outdated
} | ||
|
||
compileOnly "org.apache.comet:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.1.0-SNAPSHOT" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for exploring that.
import org.apache.spark.sql.vectorized.ColumnVector; | ||
import org.apache.spark.sql.vectorized.ColumnarBatch; | ||
|
||
@SuppressWarnings("checkstyle:VisibilityModifier") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes would require a bit more time to review. I'll do that tomorrow. I think we would want to restructure the original implementation a bit. Not a concern for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would want to structure this a bit differently. Let me think more.
...rk/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
Outdated
Show resolved
Hide resolved
@aokolnychyi I have addressed the comments. Could you please take one more look when you have a moment? Thanks a lot! |
Will check today. |
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetReaderType.java
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
Outdated
Show resolved
Hide resolved
import org.apache.spark.sql.vectorized.ColumnVector; | ||
import org.apache.spark.sql.vectorized.ColumnarBatch; | ||
|
||
@SuppressWarnings("checkstyle:VisibilityModifier") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would want to structure this a bit differently. Let me think more.
...k/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
Outdated
Show resolved
Hide resolved
....4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometColumnReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/BatchReadConf.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/BatchReadConf.java
Outdated
Show resolved
Hide resolved
@huaxingao - Hi, is the Comet Parquet reader able to support page skipping/use page indexes? -eg see #193 for the Iceberg Parquet reader initial issue. |
@cornelcreanga Comet Parquet reader doesn't support page skipping yet |
hey @huaxingao |
@PaulLiang1 Thank you for your interest! We are currently working on a binary release of DataFusion Comet. Once the binary release is available, I will proceed with this PR. |
@huaxingao Thanks |
@PaulLiang1 Thanks! I'll check with my colleague tomorrow to find out where we are in the binary release process. |
@PaulLiang1 We are pretty close to this and will have a binary release for Comet soon. |
got it, thanks for letting me know. please feel free to let us know if there is anything we could help on. thanks! |
* @param rowStartPosInBatch The starting position of the row in the batch. | ||
* @param hasIsDeletedColumn Indicates whether the columnar batch includes _deleted column. | ||
*/ | ||
public static void applyDeletesToColumnarBatch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we inherit the existing loading logic that is too complicated. First, we mix isDeleted
and rowIdMapping
cases. Second, we create ColumnarBatch
prior to having all column vectors (e.g. isDeleted array).
What we add methods like this to the util class instead? Only one of them will be needed in a query, right? We either mark records as removed or hide them.
public static Pair<int[], Integer> buildRowIdMapping(
ColumnVector[] vectors,
DeleteFilter<InternalRow> deletes,
long rowStartPosInBatch,
int batchSize) {
if (deletes == null) {
return null;
}
PositionDeleteIndex deletedPositions = deletes.deletedRowPositions();
Predicate<InternalRow> eqDeleteFilter = deletes.eqDeletedRowFilter();
ColumnarBatchRow row = new ColumnarBatchRow(vectors);
int[] rowIdMapping = new int[batchSize];
int liveRowId = 0;
for (int rowId = 0; rowId < batchSize; rowId++) {
long pos = rowStartPosInBatch + rowId;
row.rowId = rowId;
if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) {
deletes.incrementDeleteCount();
} else {
rowIdMapping[liveRowId] = rowId;
liveRowId++;
}
}
return liveRowId == batchSize ? null : Pair.of(rowIdMapping, liveRowId);
}
public static boolean[] buildIsDeleted(
ColumnVector[] vectors,
DeleteFilter<InternalRow> deletes,
long rowStartPosInBatch,
int batchSize) {
boolean[] isDeleted = new boolean[batchSize];
if (deletes == null) {
return isDeleted;
}
PositionDeleteIndex deletedPositions = deletes.deletedRowPositions();
Predicate<InternalRow> eqDeleteFilter = deletes.eqDeletedRowFilter();
ColumnarBatchRow row = new ColumnarBatchRow(vectors);
for (int rowId = 0; rowId < batchSize; rowId++) {
long pos = rowStartPosInBatch + rowId;
row.rowId = rowId;
isDeleted[rowId] = isDeleted(pos, row, deletedPositions, eqDeleteFilter);
}
return isDeleted;
}
// use separate if statements to reduce the chance of speculative execution for equality tests
private static boolean isDeleted(
long pos,
InternalRow row,
PositionDeleteIndex deletedPositions,
Predicate<InternalRow> eqDeleteFilter) {
if (deletedPositions != null && deletedPositions.isDeleted(pos)) {
return true;
}
if (!eqDeleteFilter.test(row)) {
return true;
}
return false;
}
Then our loading logic can look like:
- Initialize the vector array.
- Load all data vectors (leaving metadata vectors as null).
- If you need to discard deleted records, call
buildRowIdMapping
and either wrap loaded data vectors into other vectors or mutate them in place viasetRowIdMapping
. - If you need to mark deleted records, call
buildIsDeleted
to compute the flags. - Load all metadata vectors (we will have the
is_deleted
array fully populated now).
…unkMetaData> metaData)
c3ad611
to
5d609e1
Compare
This PR shows how I will integrate Comet with iceberg. The PR doesn't compile yet because we haven't released Comet yet, but it shows the ideas how we are going to change iceberg code to integrate Comet. Also, Comet doesn't have Spark3.5 support yet so I am doing this on 3.4, but we will add 3.5 support in Comet.
In
VectorizedSparkParquetReaders.buildReader
, if Comet library is available, aCometIcebergColumnarBatchReader
will be created, which will use Comet batch reader to read data. We can also add a property later to control whether we want to use Comet or not.The logic in
CometIcebergVectorizedReaderBuilder
is very similar toVectorizedReaderBuilder
. It builds Comet column reader instead of iceberg column reader.The delete logic in
CometIcebergColumnarBatchReader
is exactly the same as the one inColumnarBatchReader
. I will extract the common code and put the common code in a base class.The main motivation of this PR is to improve performance using native execution. Comet's Parquet reader is a hybrid implementation: IO and decompression are done in the JVM while decoding is done natively. There is some performance gain from native decoding, but the gain is not much. However, by switching to the Comet Parquet reader, Comet will recognize that this is a Comet scan and will convert the Spark physical plan into a Comet plan for native execution. The major performance gain will be from this native execution.