From de54a0867306c54655bb101fecb14d7264ed62de Mon Sep 17 00:00:00 2001 From: big face cat <731030576@qq.com> Date: Fri, 27 Dec 2024 09:28:40 +0800 Subject: [PATCH] Flink: Avoid RANGE mode broken chain when write parallelism changes (#11702) Co-authored-by: huyuanfeng --- .../apache/iceberg/flink/sink/FlinkSink.java | 14 ++++++- .../TestFlinkIcebergSinkDistributionMode.java | 41 +++++++++++-------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 18f3557beeff..4acd32e13c76 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; @@ -667,8 +668,17 @@ private DataStream distributeDataStream( return shuffleStream .partitionCustom(new RangePartitioner(iSchema, sortOrder), r -> r) - .filter(StatisticsOrRecord::hasRecord) - .map(StatisticsOrRecord::record); + .flatMap( + (FlatMapFunction) + (statisticsOrRecord, out) -> { + if (statisticsOrRecord.hasRecord()) { + out.collect(statisticsOrRecord.record()); + } + }) + // Set the parallelism same as writerParallelism to + // promote operator chaining with the downstream writer operator + .setParallelism(writerParallelism) + .returns(RowData.class); default: throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java index aa9a0291b38f..84fa48e38b70 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java @@ -82,13 +82,18 @@ public class TestFlinkIcebergSinkDistributionMode extends TestFlinkIcebergSinkBa @Parameter(index = 1) private boolean partitioned; - @Parameters(name = "parallelism = {0}, partitioned = {1}") + @Parameter(index = 2) + private int writeParallelism; + + @Parameters(name = "parallelism = {0}, partitioned = {1}, writeParallelism = {2}") public static Object[][] parameters() { return new Object[][] { - {1, true}, - {1, false}, - {2, true}, - {2, false} + {1, true, 1}, + {1, false, 1}, + {2, true, 2}, + {2, false, 2}, + {1, true, 2}, + {1, false, 2}, }; } @@ -110,7 +115,7 @@ public void before() throws IOException { MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG) .enableCheckpointing(100) .setParallelism(parallelism) - .setMaxParallelism(parallelism); + .setMaxParallelism(Math.max(parallelism, writeParallelism)); this.tableLoader = CATALOG_EXTENSION.tableLoader(); } @@ -180,7 +185,7 @@ public void testOverrideWriteConfigWithUnknownDistributionMode() { FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) .table(table) .tableLoader(tableLoader) - .writeParallelism(parallelism) + .writeParallelism(writeParallelism) .setAll(newProps); assertThatThrownBy(builder::append) @@ -206,7 +211,7 @@ public void testRangeDistributionWithoutSortOrderUnpartitioned() throws Exceptio FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) .table(table) .tableLoader(tableLoader) - .writeParallelism(parallelism); + .writeParallelism(writeParallelism); // Range distribution requires either sort order or partition spec defined assertThatThrownBy(builder::append) @@ -233,7 +238,7 @@ public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) .table(table) .tableLoader(tableLoader) - .writeParallelism(parallelism); + .writeParallelism(writeParallelism); // sort based on partition columns builder.append(); @@ -307,7 +312,7 @@ public void testRangeDistributionWithSortOrder() throws Exception { FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) .table(table) .tableLoader(tableLoader) - .writeParallelism(parallelism) + .writeParallelism(writeParallelism) .rangeDistributionStatisticsType(StatisticsType.Map) .append(); env.execute(getClass().getSimpleName()); @@ -343,9 +348,9 @@ public void testRangeDistributionWithSortOrder() throws Exception { List addedDataFiles = Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator()); // each writer task should only write one file for non-partition sort column - assertThat(addedDataFiles).hasSize(parallelism); + assertThat(addedDataFiles).hasSize(writeParallelism); // verify there is no overlap in min-max stats range - if (parallelism == 2) { + if (parallelism > 1) { assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1)); } } @@ -368,7 +373,7 @@ public void testRangeDistributionSketchWithSortOrder() throws Exception { FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) .table(table) .tableLoader(tableLoader) - .writeParallelism(parallelism) + .writeParallelism(writeParallelism) .rangeDistributionStatisticsType(StatisticsType.Sketch) .append(); env.execute(getClass().getSimpleName()); @@ -399,9 +404,9 @@ public void testRangeDistributionSketchWithSortOrder() throws Exception { List addedDataFiles = Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator()); // each writer task should only write one file for non-partition sort column - assertThat(addedDataFiles).hasSize(parallelism); + assertThat(addedDataFiles).hasSize(writeParallelism); // verify there is no overlap in min-max stats range - if (parallelism == 2) { + if (writeParallelism > 2) { assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1)); } } @@ -437,7 +442,7 @@ public void testRangeDistributionStatisticsMigration() throws Exception { FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) .table(table) .tableLoader(tableLoader) - .writeParallelism(parallelism) + .writeParallelism(writeParallelism) .rangeDistributionStatisticsType(StatisticsType.Auto) .append(); env.execute(getClass().getSimpleName()); @@ -469,9 +474,9 @@ public void testRangeDistributionStatisticsMigration() throws Exception { Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator()); // each writer task should only write one file for non-partition sort column // sometimes - assertThat(addedDataFiles).hasSize(parallelism); + assertThat(addedDataFiles).hasSize(writeParallelism); // verify there is no overlap in min-max stats range - if (parallelism == 2) { + if (writeParallelism > 1) { assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1)); } }