In the previous big data post we saw that Apache Beam Row structure allows to write generic transforms but that using its serialization can be a bad bet. To illustrate how to switch between one format to another, we will show in this post how to convert a Row to an Avro IndexedRecord structure.

From Apache Beam Row to Avro IndexedRecord

As a quick reminder, Beam Row and Avro IndexedRecord share the same philosophy: the data structure is described through a schema that hosts the fields order and type, and the data are just directly serialized without their schema which is managed in user code.

However, contrarly to Beam Row Schema, the Avro Schema is not serializable. To solve that first issue, since the converters we will provide will likely be serialized through Beam runtime deployment, we must make it serializable.

Luckily for us, the schema can be serialized as a string through its toString() method and loaded back as a Schema through the Schema.Parser#parse method.

However, we can create a serializable schema like that:

class SerializableAvroSchema implements Serializable  {
    private volatile transient org.apache.avro.Schema schema;
    private String schemaString;

    SerializableAvroSchema(final org.apache.avro.Schema schema) {
        this.schema = schema;
        this.schemaString = schema.toString();
    }

    org.apache.avro.Schema getSchema() {
        if (schema == null) {
            synchronized (this) {
                if (schema == null) {
                    schema = new org.apache.avro.Schema.Parser().parse(schemaString);
                }
            }
        }
        return schema;
    }
}

Then we need three conversions:

  1. Convert the Row Schema to an Avro Schema
  2. Convert a Row to an IndexedRecord
  3. Convert an IndexedRecord to a Row

Convert Beam Schema to Avro one

Converting Beam schema to Avro schema means iterating over each field and mapping its type to avro native ones.

Here is how the mapping can be implemented :

public class RowAvroSchemaConverter {

  public org.apache.avro.Schema toAvroSchema(final String name, final Schema beamSchema) {
    return org.apache.avro.Schema.createRecord(
            name,
            null, null, false,
            beamSchema.getFields().stream()
                    .map(beamField -> new org.apache.avro.Schema.Field(
                            beamField.getName(), getSchema(name, beamField),
                            beamField.getDescription(), (Object) null))
                    .collect(toList()));
  }

  private org.apache.avro.Schema getDirectSchema(final String name, final Schema.Field field) {
    switch (field.getType().getTypeName()) {
        case BYTES:
            return org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES);
        case BOOLEAN:
            return org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN);
        case BYTE:
        case DECIMAL:
        case INT16:
        case INT32:
            return org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT);
        case DATETIME:
        case INT64:
            return org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG);
        case FLOAT:
            return org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT);
        case DOUBLE:
            return org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE);
        case STRING:
            return org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING);
        case ROW:
            return toAvroSchema(name + "." + field.getName(), field.getType().getRowSchema());
        case MAP:
            if (field.getType().getMapKeyType().getTypeName() != Schema.TypeName.STRING) {
                throw new IllegalArgumentException("Only maps with String keys are supported");
            }
            return org.apache.avro.Schema.createMap(toAvroSchema(name + "." + field.getName(), field.getType().getMapValueType().getRowSchema()));
        case ARRAY:
            return org.apache.avro.Schema.createArray(toAvroSchema(name + "." + field.getName(), field.getType().getCollectionElementType().getRowSchema()));
        default:
            throw new IllegalArgumentException(field.toString());
    }
  }

  protected org.apache.avro.Schema getSchema(final String nameBase, final Schema.Field field) {
    final org.apache.avro.Schema directSchema = getDirectSchema(nameBase, field);
    if (field.getNullable() != null && field.getNullable()) {
        return org.apache.avro.Schema.createUnion(
            directSchema,
            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL));
    }
    return directSchema;

  }

}

This code has the limitation to only support String keys for map container but this can be worked around using a list of key/values. To avoid complexifying this blogpost code samples, this hasn't been done but it is not very hard.

The interesting pieces of code are:

  1. getDirectSchema which maps the types between Beam and Avro worlds,

  2. getSchema which handles the nullability of each field,

  3. toAvroSchema which converts Beam's Schema to Avro's one.

With this code you can now get the Avro's schema from any Beam's schema:

public static void main(String[] args) {
    final Schema schema = createBeamPersonSchema();
    final RowAvroSchemaConverter provider = new RowAvroSchemaConverter();
    final org.apache.avro.Schema avroSchema = provider.toAvroSchema(
        "com.github.rmannibucau.generated.Person", schema);
    System.out.println(avroSchema);
}

Once the Avro schema reformatted to be more readable, this outputs something like:

{
   "type":"record",
   "name":"Person",
   "namespace":"com.github.rmannibucau.generated",
   "fields":[
      {
         "name":"id",
         "type":"string",
         "doc":""
      },
      {
         "name":"name",
         "type":"string",
         "doc":""
      },
      {
         "name":"age",
         "type":"int",
         "doc":""
      },
      {
         "name":"created",
         "type":"long",
         "doc":""
      },
      {
         "name":"updated",
         "type":"long",
         "doc":""
      },
      {
         "name":"address",
         "type":{
            "type":"record",
            "name":"address",
            "namespace":"com.github.rmannibucau.generated.Person",
            "fields":[
               {
                  "name":"number",
                  "type":"int",
                  "doc":""
               },
               {
                  "name":"street",
                  "type":"string",
                  "doc":""
               },
               {
                  "name":"country",
                  "type":"string",
                  "doc":""
               }
            ]
         },
         "doc":""
      }
   ]
}

Apache Beam's Row to Avro's IndexedRecord

Converting a row to an indexed record is as simple as iterating over each field and put each value into the record.

To not even have to create a custom implementation of IndexedRecord you can reuse org.apache.avro.generic.GenericData.Record structure.

The only careness to take is to make sure to properly process the not primitive types which are:

  • DATETIME Beam's data which must be converted to long in Avro record,
  • ROW which must trigger a nested conversion,
  • ARRAY and MAP which trigger a nested conversion for the values.

Hereafter is what the implementation can look like:

protected IndexedRecord toAvro(final Schema beamSchema, final org.apache.avro.Schema avroSchema, final Row row) {
    final GenericData.Record record = new GenericData.Record(avroSchema);
    IntStream.range(0, beamSchema.getFieldCount())
        .forEach(i -> {
            final Object value = row.getValue(i);
            if (value == null) {
                record.put(i, null);
                return;
            }
            final Schema.FieldType fieldType = beamSchema.getField(i).getType();
            final Schema.TypeName typeName = fieldType.getTypeName();
            if (typeName == Schema.TypeName.ROW) {
                record.put(i, toAvro(fieldType.getRowSchema(), avroSchema.getFields().get(i).schema(), Row.class.cast(value)));
            } else if (typeName == Schema.TypeName.DATETIME) {
                record.put(i, AbstractInstant.class.cast(value).toInstant().getMillis());
            } else if (typeName == Schema.TypeName.ARRAY) {
                final Schema.FieldType elementType = fieldType.getCollectionElementType();
                if (elementType.getTypeName() == Schema.TypeName.ROW) {
                    final org.apache.avro.Schema avroEltSchema = avroSchema.getFields().get(i).schema().getElementType();
                    record.put(i, ((Collection<Row>) value).stream()
                        .map(it -> toAvro(elementType.getRowSchema(), avroEltSchema, it))
                        .collect(toList()));
                } else {
                    record.put(i, value);
                }
            } else if (typeName == Schema.TypeName.MAP) {
                if (fieldType.getMapKeyType().getTypeName() != Schema.TypeName.STRING) {
                    throw new IllegalArgumentException("Only Map with String keys are supported for now");
                }
                final Schema.FieldType mapValueType = fieldType.getMapValueType();
                if (mapValueType.getTypeName() == Schema.TypeName.ROW) {
                    final org.apache.avro.Schema avroValueSchema = avroSchema.getFields().get(i).schema().getValueType();
                    record.put(i, ((Map<String, Row>) value).entrySet().stream()
                        .collect(toMap(Map.Entry::getKey, it -> toAvro(mapValueType.getRowSchema(), avroValueSchema, it.getValue()))));
                } else {
                    record.put(i, value);
                }
            } else {
                record.put(i, value);
            }
        });
    return record;
}

To prepare it for Beam we will make the conversion serialization friendly through a SerializableFunction:

public SerializableFunction<Row, IndexedRecord> createFromRow(
        final Schema schema, final org.apache.avro.Schema avroSchema) {
    final SerializableAvroSchema avroSchemaRef = new SerializableAvroSchema(avroSchema);
    return row -> toAvro(schema, avroSchemaRef.getSchema(), row);
}

Avro's IndexedRecord to Apache Beam's Row

The reverse conversion is quite the same but the data conversion is the opposite one as expected:

protected static Row toRow(final Schema beamSchema,
                           final org.apache.avro.Schema avroSchema,
                           final IndexedRecord root) {
    return Row.withSchema(beamSchema)
        .addValues(avroSchema.getFields().stream()
            .map(field -> {
                final Object value = root.get(field.pos());
                if (value == null) {
                    return null;
                }

                final Schema.Field beamField = beamSchema.getField(field.pos());
                if (beamField.getType().getTypeName() == Schema.TypeName.DATETIME) {
                    return AbstractInstant.class.cast(value).toInstant().getMillis();
                }
                if (beamField.getType().getTypeName() == Schema.TypeName.ROW) {
                    return toRow(beamField.getType().getRowSchema(), field.schema(), IndexedRecord.class.cast(value));
                }
                if (beamField.getType().getTypeName() == Schema.TypeName.ARRAY && !beamField.getType().getCollectionElementType().getTypeName().isPrimitiveType()) {
                    final Schema nestedBeamSchema = beamField.getType().getCollectionElementType().getRowSchema();
                    final org.apache.avro.Schema nestedAvroSchema = field.schema().getElementType();
                    return Collection.class.cast(value).stream()
                            .map(it -> toRow(nestedBeamSchema, nestedAvroSchema, IndexedRecord.class.cast(it)))
                            .collect(toList());
                }
                if (beamField.getType().getTypeName() == Schema.TypeName.MAP && !beamField.getType().getMapValueType().getTypeName().isPrimitiveType()) {
                    if (beamField.getType().getMapKeyType().getTypeName() != Schema.TypeName.STRING) {
                        throw new IllegalArgumentException("Only Map with String keys are supported for now");
                    }
                    final Schema nestedBeamSchema = beamField.getType().getCollectionElementType().getRowSchema();
                    final org.apache.avro.Schema nestedAvroSchema = field.schema().getElementType();
                    return ((Map<String, IndexedRecord>) value).entrySet().stream()
                            .collect(Collectors.toMap(Map.Entry::getKey, it -> toRow(nestedBeamSchema, nestedAvroSchema, it.getValue())));
                }
                return value;
            })
            .collect(toList())
        ).build();
}

And the companion serializable function is:

public SerializableFunction<IndexedRecord, Row> createToRow(final Schema beamSchema,
                                                            final org.apache.avro.Schema avroSchema) {
    final SerializableAvroSchema avroSchemaRef = new SerializableAvroSchema(avroSchema);
    return avro -> toRow(beamSchema, avroSchemaRef.getSchema(), avro);
}

Usage

Considering all these methods were put in the same class, you can now convert a Row with this code:

final RowAvroSchemaConverter provider = new RowAvroSchemaConverter();
final org.apache.avro.Schema avroSchema = provider.toAvroSchema("com.github.rmannibucau.generated.Person", schema);
final IndexedRecord record = provider.createFromRow(schema, avroSchema).apply(row);
System.out.println(record);

Pitfalls of that kind of conversion

These conversions still have one major pitfall: they need the schema at pipeline definition time which means that it will not support data evolution which is quite an important limitation for big data ingestion. For such cases, other type of generic records can be more relevant.

To illustrate that, let's remind us the way beam handles Row conversions:

  • You define an implicit (from functions) or explicit org.apache.beam.sdk.schemas.SchemaProvider which is registered in the SchemaRegistry of the pipeline,
  • The Beam's runtime is able to convert an Row from/to your structure using this SchemaRegistry.

Concretely, a Row can be converted to a Person and a Person to a Row thanks to this registry. However the key of this registry is a TypeLiteral. To make it simpler it is a Class<?>. This means that if you use another generic record than the Row one of Beam, you can then only convert it once.

Doesn't sound very bothering right? Actually it is a lot. If you check how we converted our Row to an Avro's IndexedRecord, the key of the conversion is more the Avro's schema than the IndexedRecord type by itself. More, you can have an IndexedRecord representing a Person but also a Contract or a LogLine. Each time the type of IndexedRecord but the Schema has nothing to do between each of them. This means that the implicit Beam conversion rarely works for such pipelines and you will need to make it explicit.

Also, as you may have noticed, this SchemaRegistry is on the Pipeline API and the org.apache.beam.sdk.transforms.reflect.DoFnInvoker.ArgumentProvider which has an asRow() method. This means that, even if Beam tries to communicate that the Schema and data definition were extensible, it is not that true and the Row structure is the central pivot format in the core of the product.

Potential solution

In the previous data blogpost we saw that using Beam Row for something else than the in memory representation is not very relevant (reusability issues, size issue, ...). However Row and Schema, as JsonObject can be for JSON-P, are just API and user facing models. This means, as this post showed, that it is trivial to back Row and Schema implementations by another concrete format, like Avro - or JSON if you think about it.

This means that if Row and Schema are redesigned to load a RowProvider and a SchemaProvider, then the serialization will be handled through another mechanism which is selectable on the pipeline and it would transparently solve the mainstream issue when it hits persistence. It will also avoid introducing a new specific way to serialize data between workers and increase the interoperability with some ecosystem as it will provide a default conversion to one (or multiple if there are multiple providers) native types.

The SchemaRegistry concept can still be used but it must go with a redesign of its key and likely add the Schema (Beam's one) to the key.

At the end, the current solution doesn't go far enough but there is no blocker to make it a portable solution which can encounter its promises: run on any engine. As shown in this post, Avro/Beam conversion is not that difficult and the challenge here will be to guarantee some stability in the model design - otherwise converters would be broken each time there is a small type addition. 

From the same author:

In the same category: