Apache Beam project provides a Big Data abstraction over different "runners" - which are nothing more than real Big Data engines like Apache Spark, Flink or even Google Dataflow.

For such an abstraction layer you quickly hit one concern which is how to represent "data". Apache Beam is code driven, understand by that it expects, in its approach, you to write your "pipeline" (data processing flow) and your "transforms" (processing steps). This has the direct implication that you generally type all your steps.

Strongly typed pipeline: industrialization made hard

Here is a sample pipeline:

public static void main(final String[] args) {
    final Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());

    pipeline.apply("ReadCSV", TextIO.read().from("./people.txt"))
            .apply("MapToPerson", ParDo.of(new DoFn<String, Person>() {
                @ProcessElement
                public void onElement(
                      @Element final String file,
                      final OutputReceiver<Person> emitter) {
                    final String[] parts = file.split(";");
                    emitter.output(new Person(parts[0], parts[1], Integer.parseInt(parts[2])));
                }
            }))
            .apply("MapToJson", ParDo.of(new DoFn<Person, String>() {
                private transient Jsonb jsonb;

                @ProcessElement
                public void onElement(
                      @Element final Person person,
                      final OutputReceiver<String> emitter) {
                    if (jsonb == null) {
                        jsonb = JsonbBuilder.create();
                    }
                    emitter.output(jsonb.toJson(person));
                }

                @Teardown
                public void onTearDown() {
                    if (jsonb != null) {
                        try {
                            jsonb.close();
                        } catch (final Exception e) {
                            throw new IllegalStateException(e);
                        }
                    }
                }
            }))
            .apply("WriteToJson", TextIO.write().to("./target/output.json"));

    pipeline.run().waitUntilFinish();
}

This simple pipeline will read from a file its lines, then a mapper will convert it to a Person, then another mapper will convert it to JSON, and finally will write the output back to another file. Note that the output will not be target/output.json but a set of file matching this pattern (like output.json-[0-9]+-of-[0-9]+) to support concurrent bundles which is one of the basis of the Big Data pattern.

This pipeline by itself is not very accurate since it can be simplified but it shows quite simply the issue Beam has: to chain transformations (each apply()), you need to ensure each step match in term of type. The TextIO issue a String so we need a mapper to create a Person which is the type the Json step takes as input and the output TextIO takes a String too so we need to ensure out Json output is a String (whereas for a Json transform a JsonObject would be way more natural).

Said otherwise: it implies a lot of verbosity to do pretty much nothing. It can not look as a big concern and it is true if you just have one Big Data pipeline in your company. However when you start using Big Data for a lot of jobs and therefore create a lot of pipelines, you will very quickly want to industrialize your code and create a library you can share for common steps. It is exactly the same kind of solution than the old one we know very well in the "standalone" batch area where you have libraries of steps you can just compose to create a new batch just assembling bricks.

This philosophy requires to ensure each step is interoperable with each potential connection. In other words, you need a single type for records and drop the strong typing previous pipeline had otherwise you will keep writing these conversion steps which are more code to write, test and maintain but no added value in terms of business or feature at the end of the day.

How Apache Beam hit the generic record need

One of the tracks of Apache Beam is the Big Data SQL. High level it allows to use a SQL like language, adapted to Beam data model (PCollections typically), to select the data and process them without having to code the transforms.

Concretely, assuming the input data are correctly formatted you can add into your pipeline a step like:

.apply(SqlTransform.query("select age,name,id from PCOLLECTION where age > 18"))

PCOLLECTION being a virtual table composed of the elements from the input. Note that there are ways to select data from mutiple inputs but this is out of scope for this post.

The issue for such a transform is that it has two hypothesis:

  1. It can access input data
  2. It can output data you can process

These two concern are actually the exact same than the one explained in the previous part: how to interact with data structures you don't own?

Beam Row record, schema and friends

In terms of generic structures you can introspect, there are already some mainstream solutions:

  • XML is used for a very long time and is probably the most advanced one with its very complete schema and tooling built around it,
  • JSON is a bit less tooled and its schema is not that advanced but is very interesting because it embeds in a light way its schema for most cases which avoids to manage a schema + data generally,
  • Apache Avro IndexedRecord which is generally known from Big Data developpers which is exactly a schema which defines the structure of the data and the data which are serialized without the schema in general (except using FileContainer but it is never done in BigData to be able to distribute the processing easily and avoid the cost associated to the schema maintenance...but it leads to the well known versioning hell),
  • Protobuf which is close to avro,
  • ....

So Apache Beam had the choice to use one of the mainstream format....but wait, it is all about serialization, not modelling or API. Also which one to pick? Each ecosystem has its preference so why Beam, which is an abstraction, would prefer one over the other?

The serialization point is interesting. If you check avro, it doesn't have a date/time type so you have to use STRING or LONG type generally, it is ok since you can normalize how you serialize dates but this means in the SqlTransform the comparison of dates (which is a very common use case) would be quite hard to implement simply - would require to pass a format in the query which complexify the SQL syntax for this case.

So at the end, Apache Beam created the Beam Row structure.

The Row  structure is highly inspired from Avro in terms of design:

  • A schema which supports primitives (byte, two/four/eight bytes signed integer, decimal integer, float, double, string, datetime, boolean), byte arrays (mainly for lobs), and containers (arrays, nested rows, and map),
  • A data holder (the Row itself) which is just a list of values organized in the same order than the schema definition.

Want to visualize it with some code? Here is how our Person can be defined:

final Schema schema = Schema.builder()
  .addInt16Field("age")
  .addStringField("name")
  .addStringField("id")
  .build();
final Row person = Row.withSchema(schema)
  .addValues((short) 2, "beam", "1").build()

As you can see, a row is always associated with a schema (like Avro IndexedRecord has a getSchema()).

With this structure, the generic transforms (including the SQL one), can access the schema and therefore process the data without knowing them before. This lead to new transforms creation like the Filter one:

.apply(Filter.whereFieldName("age", age -> age > 18))

Other transforms to group records from a criteria, to project data etc are doable as well from the moment you know how to process data you don't know at build time.

Apache Beam even provides a way to convert this Row to POJO implicitly.

But wait, we are in Big Data world so data are potentially serialized, right? What about this Row thing? The Schema itself is Serializable so no big deal and it must be done only "once" (as in Big Data world so likely once per worker, the important part being not for each record). The Row itself gets a new beam Coder, a.k.a. the RowCoder which is created from the schema:

RowCoder.of(schema)

This coder has the schema so is able to process the incoming Row and serialize it processing each element (exactly like any (de)serializer). High level, it will delegate the serialization of each element to the coder matching its type. STRING will use the StringUtf8Coder, INT32 will use BigEndianIntegerCoder, etc... See RowCoderGenerator for more details.

How does it compare to Avro for instance? Assuming you use the same unitary serialization, which mainly means you serialize dates as longs and not string (which takes way more space), avro will generally be more compact cause it uses direct primitive serialization whereas the nested coders of the Row use java serialization which is more verbose than needed.

To illustrate that, let's use this schema:

Schema.builder()
    // flat part
    .addStringField("id")
    .addStringField("name")
    .addInt32Field("age")
    .addDateTimeField("created")
    .addDateTimeField("updated")
    // nested
    .addField("address", Schema.FieldType.row(Schema.builder()
            .addInt16Field("number")
            .addStringField("street")
            .addStringField("country")
            .build()))
    .build();

Which can be illustrated with this sample data:

{
  id: "0",
  name: "Beam",
  age: 2,
  created: 1535885173202,
  updated: 1535885173202,
  address: {
    number: 14,
    street: "Big Data",
    country: "Java"
  }
}

If you serialize it with beam RowCoder, you will use 45 bytes and using an Avro IndexedRecord, it will take 35 bytes....almost 30% which is a lot for Big Data world. To have a comparison, JSON (not compressed) takes 142 bytes and gzipped it takes 128 bytes - which is normal cause data size is comparable to structure size here. Interesting comparison is if you take the raw JSON (you drop the keys and consider the data being in arrays), then the size is 66 bytes. Now if you have a big STRING data (10240 'a' for instance), JSON will take 10378 bytes, avro 10273, row 10282. Compressed with GZIP, json takes 152 bytes and avro 90 bytes. Now if you make each character different JSON.gz becomes 20714 bytes and gzipped avro becomes 20570 bytes. Gzipped row is between both each time. What is interesting are not the figures but more than the serialization disk space is highly depending the data you transmit. If you have small data or said otherwise, if you data are comparable in size to the schema space, you must avoid to serialize the schema in general...if you serialize it - keep in mind Beam and Big Data engines will try to avoid that as much as possible. It also depends what you do of these data. Using JSON will make them more quickly available to the end usage whereas Avro or worse, Row serialization will require processing to serve it in your applications since they are highly dependent of the Big Data stack.

Why all that paragraph on the serialization? Big Data execution comes with serialization concern but this one is highly dependent of multiple points:

  • Your data structure,
  • The size of your data vs the size of your structure definition,
  • How the data will be stored and in which backend,
  • Which part of your system will access and read the data,
  • ....

All that means that it is not because a format looks wrong theorically that it is wrong in practise. Look the JSON example with big data, the difference is no more significative compared Avro but the cost to use Avro in a system which is not only about Big Data will be way higher than the cost to use JSON in terms of performance.

The conclusion about Avro Row structure is that the main issue with its serialization form is not that it is fat compared to other Big Data serializations but more that it is not interoperable at all. JSON is interoperable with any application today, Avro is quite interoperable and has libraries in most languages, protobuf as well...Beam Row structure is just relevant in Beam ecosystem. Keep in mind that if you picked Beam it is to be vendor independent and be able to switch of engine....it is likely not to be locked to Beam itself. This has the direct implication that you must ensure to never ever serialize in a backend Row structures directly and always convert it to a more mainstream format.

Finally it is important to keep in mind some issues with Beam Row structure before using it a lot in your code and potentially create a library around it:

  • Its DATETIME type relies on Joda-time AbstractInstant and is not compatible with Java 8 date times formats. Today you likely don't want to let Joda surface in any API so maybe avoid this type of column.
  • Beam Row format is in Beam SDK core so if you just want to use it to prepare some functions you will import jackson, tukaani, joda-time, guava! etc... in your classpath which is quite indesired (IMHO the Row structure should be extracted in an API module without any dependency).
  • Ensure to write a bridge to another serialization format to be able to read/write the data without having to propagate Beam (and its huge core stack) in all applications. Hopefully on that point, Beam Row structure will just becomes a pluggable API (as any EE API, with a provider etc) which would natively use another format and not a full stacked structure with a custom serialization.

However, even if Row structure is not perfect and its API can look weird since it just hides a list and not a real structure (you can't do an addField(name value)), it is a good step forward to a more generic coding of the pipelines. Some enhacements to do (better API, extract it in its own lightweight module, ...) and we could get a meta pipeline environment pretty easily.

In next post I will show you how to convert a Row  to an Avro IndexedRecord almost transparently :).

 

From the same author:

In the same category: