Skip to content

Commit

Permalink
Avro: Add writers for the internal object model (#11919)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat authored Jan 10, 2025
1 parent 97b5b39 commit a100e6a
Show file tree
Hide file tree
Showing 10 changed files with 570 additions and 129 deletions.
57 changes: 57 additions & 0 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

Expand Down Expand Up @@ -228,4 +235,54 @@ private static BigInteger randomUnscaled(int precision, Random random) {

return new BigInteger(sb.toString());
}

public static List<Object> generateList(
Random random, Types.ListType list, Supplier<Object> elementResult) {
int numElements = random.nextInt(20);

List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
for (int i = 0; i < numElements; i += 1) {
// return null 5% of the time when the value is optional
if (list.isElementOptional() && random.nextInt(20) == 1) {
result.add(null);
} else {
result.add(elementResult.get());
}
}

return result;
}

public static Map<Object, Object> generateMap(
Random random, Types.MapType map, Supplier<Object> keyResult, Supplier<Object> valueResult) {
int numEntries = random.nextInt(20);

Map<Object, Object> result = Maps.newLinkedHashMap();
Supplier<Object> keyFunc;
if (map.keyType() == Types.StringType.get()) {
keyFunc = () -> keyResult.get().toString();
} else {
keyFunc = keyResult;
}

Set<Object> keySet = Sets.newHashSet();
for (int i = 0; i < numEntries; i += 1) {
Object key = keyFunc.get();
// ensure no collisions
while (keySet.contains(key)) {
key = keyFunc.get();
}

keySet.add(key);

// return null 5% of the time when the value is optional
if (map.isValueOptional() && random.nextInt(20) == 1) {
result.put(key, null);
} else {
result.put(key, valueResult.get());
}
}

return result;
}
}
116 changes: 116 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.avro;

import java.util.List;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

abstract class BaseWriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {

protected abstract ValueWriter<?> createRecordWriter(List<ValueWriter<?>> fields);

protected abstract ValueWriter<?> fixedWriter(int length);

@Override
public ValueWriter<?> record(Schema record, List<String> names, List<ValueWriter<?>> fields) {
return createRecordWriter(fields);
}

@Override
public ValueWriter<?> union(Schema union, List<ValueWriter<?>> options) {
Preconditions.checkArgument(
options.size() == 2, "Cannot create writer for non-option union: %s", union);
if (union.getTypes().get(0).getType() == Schema.Type.NULL) {
return ValueWriters.option(0, options.get(1));
} else if (union.getTypes().get(1).getType() == Schema.Type.NULL) {
return ValueWriters.option(1, options.get(0));
} else {
throw new IllegalArgumentException(
String.format("Cannot create writer for non-option union: %s", union));
}
}

@Override
public ValueWriter<?> array(Schema array, ValueWriter<?> elementWriter) {
if (array.getLogicalType() instanceof LogicalMap) {
ValueWriters.StructWriter<?> keyValueWriter = (ValueWriters.StructWriter<?>) elementWriter;
return ValueWriters.arrayMap(keyValueWriter.writer(0), keyValueWriter.writer(1));
}

return ValueWriters.array(elementWriter);
}

@Override
public ValueWriter<?> map(Schema map, ValueWriter<?> valueWriter) {
return ValueWriters.map(ValueWriters.strings(), valueWriter);
}

@Override
public ValueWriter<?> primitive(Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
if (logicalType != null) {
switch (logicalType.getName()) {
case "date":
return ValueWriters.ints();

case "time-micros":
return ValueWriters.longs();

case "timestamp-micros":
return ValueWriters.longs();

case "decimal":
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale());

case "uuid":
return ValueWriters.uuids();

default:
throw new IllegalArgumentException("Unsupported logical type: " + logicalType);
}
}

switch (primitive.getType()) {
case NULL:
return ValueWriters.nulls();
case BOOLEAN:
return ValueWriters.booleans();
case INT:
return ValueWriters.ints();
case LONG:
return ValueWriters.longs();
case FLOAT:
return ValueWriters.floats();
case DOUBLE:
return ValueWriters.doubles();
case STRING:
return ValueWriters.strings();
case FIXED:
return fixedWriter(primitive.getFixedSize());
case BYTES:
return ValueWriters.byteBuffers();
default:
throw new IllegalArgumentException("Unsupported type: " + primitive);
}
}
}
87 changes: 4 additions & 83 deletions core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@
import java.io.IOException;
import java.util.List;
import java.util.stream.Stream;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.Encoder;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class GenericAvroWriter<T> implements MetricsAwareDatumWriter<T> {
private ValueWriter<T> writer = null;
Expand Down Expand Up @@ -55,92 +52,16 @@ public Stream<FieldMetrics> metrics() {
return writer.metrics();
}

private static class WriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {
private WriteBuilder() {}
private static class WriteBuilder extends BaseWriteBuilder {

@Override
public ValueWriter<?> record(Schema record, List<String> names, List<ValueWriter<?>> fields) {
protected ValueWriter<?> createRecordWriter(List<ValueWriter<?>> fields) {
return ValueWriters.record(fields);
}

@Override
public ValueWriter<?> union(Schema union, List<ValueWriter<?>> options) {
Preconditions.checkArgument(
options.contains(ValueWriters.nulls()),
"Cannot create writer for non-option union: %s",
union);
Preconditions.checkArgument(
options.size() == 2, "Cannot create writer for non-option union: %s", union);
if (union.getTypes().get(0).getType() == Schema.Type.NULL) {
return ValueWriters.option(0, options.get(1));
} else {
return ValueWriters.option(1, options.get(0));
}
}

@Override
public ValueWriter<?> array(Schema array, ValueWriter<?> elementWriter) {
if (array.getLogicalType() instanceof LogicalMap) {
ValueWriters.StructWriter<?> keyValueWriter = (ValueWriters.StructWriter<?>) elementWriter;
return ValueWriters.arrayMap(keyValueWriter.writer(0), keyValueWriter.writer(1));
}

return ValueWriters.array(elementWriter);
}

@Override
public ValueWriter<?> map(Schema map, ValueWriter<?> valueWriter) {
return ValueWriters.map(ValueWriters.strings(), valueWriter);
}

@Override
public ValueWriter<?> primitive(Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
if (logicalType != null) {
switch (logicalType.getName()) {
case "date":
return ValueWriters.ints();

case "time-micros":
return ValueWriters.longs();

case "timestamp-micros":
return ValueWriters.longs();

case "decimal":
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale());

case "uuid":
return ValueWriters.uuids();

default:
throw new IllegalArgumentException("Unsupported logical type: " + logicalType);
}
}

switch (primitive.getType()) {
case NULL:
return ValueWriters.nulls();
case BOOLEAN:
return ValueWriters.booleans();
case INT:
return ValueWriters.ints();
case LONG:
return ValueWriters.longs();
case FLOAT:
return ValueWriters.floats();
case DOUBLE:
return ValueWriters.doubles();
case STRING:
return ValueWriters.strings();
case FIXED:
return ValueWriters.genericFixed(primitive.getFixedSize());
case BYTES:
return ValueWriters.byteBuffers();
default:
throw new IllegalArgumentException("Unsupported type: " + primitive);
}
protected ValueWriter<?> fixedWriter(int length) {
return ValueWriters.genericFixed(length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ public ValueReader<?> primitive(Pair<Integer, Type> partner, Schema primitive) {
case STRING:
return ValueReaders.strings();
case FIXED:
return ValueReaders.fixed(primitive);
case BYTES:
return ValueReaders.byteBuffers();
case ENUM:
Expand Down
74 changes: 74 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/InternalWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.avro;

import java.io.IOException;
import java.util.List;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.io.Encoder;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.types.Type;

/**
* A Writer that consumes Iceberg's internal in-memory object model.
*
* <p>Iceberg's internal in-memory object model produces the types defined in {@link
* Type.TypeID#javaClass()}.
*/
public class InternalWriter<T> implements MetricsAwareDatumWriter<T> {
private ValueWriter<T> writer = null;

public static <D> InternalWriter<D> create(Schema schema) {
return new InternalWriter<>(schema);
}

InternalWriter(Schema schema) {
setSchema(schema);
}

@Override
@SuppressWarnings("unchecked")
public void setSchema(Schema schema) {
this.writer = (ValueWriter<T>) AvroSchemaVisitor.visit(schema, new WriteBuilder());
}

@Override
public void write(T datum, Encoder out) throws IOException {
writer.write(datum, out);
}

@Override
public Stream<FieldMetrics> metrics() {
return writer.metrics();
}

private static class WriteBuilder extends BaseWriteBuilder {

@Override
protected ValueWriter<?> createRecordWriter(List<ValueWriter<?>> fields) {
return ValueWriters.struct(fields);
}

@Override
protected ValueWriter<?> fixedWriter(int length) {
return ValueWriters.fixedBuffers(length);
}
}
}
Loading

0 comments on commit a100e6a

Please sign in to comment.