From fc923b3af65b0e3cb28a9afb69f7fd05c88f62ca Mon Sep 17 00:00:00 2001 From: abharath9 Date: Mon, 6 Jan 2025 15:32:06 -0600 Subject: [PATCH] replace legacy converter with new (#11838) Co-authored-by: Bharath Kumar Avusherla --- .../apache/iceberg/flink/source/reader/RowConverter.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java index a84384fe17bf..7604004ff0cc 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java @@ -18,12 +18,14 @@ */ package org.apache.iceberg.flink.source.reader; +import java.util.stream.Stream; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.conversion.DataStructureConverter; import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; @@ -42,8 +44,11 @@ private RowConverter(RowType rowType, TypeInformation rowTypeInfo) { public static RowConverter fromIcebergSchema(org.apache.iceberg.Schema icebergSchema) { RowType rowType = FlinkSchemaUtil.convert(icebergSchema); TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema); - RowTypeInfo rowTypeInfo = - new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()); + TypeInformation[] typeInformations = + Stream.of(tableSchema.getFieldDataTypes()) + .map(ExternalTypeInfo::of) + .toArray(TypeInformation[]::new); + RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, tableSchema.getFieldNames()); return new RowConverter(rowType, rowTypeInfo); }