Skip to content

Commit

Permalink
Flink: Avoid RANGE mode broken chain when write parallelism changes (#…
Browse files Browse the repository at this point in the history
…11702)

Co-authored-by: huyuanfeng <[email protected]>
  • Loading branch information
huyuanfeng2018 and huyuanfeng authored Dec 27, 2024
1 parent fc3f705 commit de54a08
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down Expand Up @@ -667,8 +668,17 @@ private DataStream<RowData> distributeDataStream(

return shuffleStream
.partitionCustom(new RangePartitioner(iSchema, sortOrder), r -> r)
.filter(StatisticsOrRecord::hasRecord)
.map(StatisticsOrRecord::record);
.flatMap(
(FlatMapFunction<StatisticsOrRecord, RowData>)
(statisticsOrRecord, out) -> {
if (statisticsOrRecord.hasRecord()) {
out.collect(statisticsOrRecord.record());
}
})
// Set the parallelism same as writerParallelism to
// promote operator chaining with the downstream writer operator
.setParallelism(writerParallelism)
.returns(RowData.class);

default:
throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,18 @@ public class TestFlinkIcebergSinkDistributionMode extends TestFlinkIcebergSinkBa
@Parameter(index = 1)
private boolean partitioned;

@Parameters(name = "parallelism = {0}, partitioned = {1}")
@Parameter(index = 2)
private int writeParallelism;

@Parameters(name = "parallelism = {0}, partitioned = {1}, writeParallelism = {2}")
public static Object[][] parameters() {
return new Object[][] {
{1, true},
{1, false},
{2, true},
{2, false}
{1, true, 1},
{1, false, 1},
{2, true, 2},
{2, false, 2},
{1, true, 2},
{1, false, 2},
};
}

Expand All @@ -110,7 +115,7 @@ public void before() throws IOException {
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
.setMaxParallelism(Math.max(parallelism, writeParallelism));

this.tableLoader = CATALOG_EXTENSION.tableLoader();
}
Expand Down Expand Up @@ -180,7 +185,7 @@ public void testOverrideWriteConfigWithUnknownDistributionMode() {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.setAll(newProps);

assertThatThrownBy(builder::append)
Expand All @@ -206,7 +211,7 @@ public void testRangeDistributionWithoutSortOrderUnpartitioned() throws Exceptio
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism);
.writeParallelism(writeParallelism);

// Range distribution requires either sort order or partition spec defined
assertThatThrownBy(builder::append)
Expand All @@ -233,7 +238,7 @@ public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism);
.writeParallelism(writeParallelism);

// sort based on partition columns
builder.append();
Expand Down Expand Up @@ -307,7 +312,7 @@ public void testRangeDistributionWithSortOrder() throws Exception {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.rangeDistributionStatisticsType(StatisticsType.Map)
.append();
env.execute(getClass().getSimpleName());
Expand Down Expand Up @@ -343,9 +348,9 @@ public void testRangeDistributionWithSortOrder() throws Exception {
List<DataFile> addedDataFiles =
Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
// each writer task should only write one file for non-partition sort column
assertThat(addedDataFiles).hasSize(parallelism);
assertThat(addedDataFiles).hasSize(writeParallelism);
// verify there is no overlap in min-max stats range
if (parallelism == 2) {
if (parallelism > 1) {
assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
}
}
Expand All @@ -368,7 +373,7 @@ public void testRangeDistributionSketchWithSortOrder() throws Exception {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.rangeDistributionStatisticsType(StatisticsType.Sketch)
.append();
env.execute(getClass().getSimpleName());
Expand Down Expand Up @@ -399,9 +404,9 @@ public void testRangeDistributionSketchWithSortOrder() throws Exception {
List<DataFile> addedDataFiles =
Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
// each writer task should only write one file for non-partition sort column
assertThat(addedDataFiles).hasSize(parallelism);
assertThat(addedDataFiles).hasSize(writeParallelism);
// verify there is no overlap in min-max stats range
if (parallelism == 2) {
if (writeParallelism > 2) {
assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
}
}
Expand Down Expand Up @@ -437,7 +442,7 @@ public void testRangeDistributionStatisticsMigration() throws Exception {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.rangeDistributionStatisticsType(StatisticsType.Auto)
.append();
env.execute(getClass().getSimpleName());
Expand Down Expand Up @@ -469,9 +474,9 @@ public void testRangeDistributionStatisticsMigration() throws Exception {
Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
// each writer task should only write one file for non-partition sort column
// sometimes
assertThat(addedDataFiles).hasSize(parallelism);
assertThat(addedDataFiles).hasSize(writeParallelism);
// verify there is no overlap in min-max stats range
if (parallelism == 2) {
if (writeParallelism > 1) {
assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
}
}
Expand Down

0 comments on commit de54a08

Please sign in to comment.