Skip to content

Commit

Permalink
replace legacy converter with new (#11838)
Browse files Browse the repository at this point in the history
Co-authored-by: Bharath Kumar Avusherla <[email protected]>
  • Loading branch information
abharath9 and Bharath Kumar Avusherla authored Jan 6, 2025
1 parent 8e2ffb3 commit fc923b3
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
package org.apache.iceberg.flink.source.reader;

import java.util.stream.Stream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
Expand All @@ -42,8 +44,11 @@ private RowConverter(RowType rowType, TypeInformation<Row> rowTypeInfo) {
public static RowConverter fromIcebergSchema(org.apache.iceberg.Schema icebergSchema) {
RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema);
RowTypeInfo rowTypeInfo =
new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
TypeInformation[] typeInformations =
Stream.of(tableSchema.getFieldDataTypes())
.map(ExternalTypeInfo::of)
.toArray(TypeInformation[]::new);
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, tableSchema.getFieldNames());
return new RowConverter(rowType, rowTypeInfo);
}

Expand Down

0 comments on commit fc923b3

Please sign in to comment.