How to select the best coder for your data with Apache Beam
Apache Beam has a Coder abstraction to (de)serialize your data.
This enables you to write your pipelines without caring about the serialization but at some point you will need to care if your pipeline relies on it a lot (if you have several (re)shuffles, aggregation etc).
In general, you can bench using some pre-extraction or random data but the best is to test with your real pipeline data. But this requires to run the same pipeline multiple times, which also means you must make sure you have the same environment and only the coder changes.
To mitigate a bit this generally random point, I use a MultipleCodersCoder. Except the name is fun, this coder allows to run at the same time multiple coders during (de|en)coding times and register each duration and payload size in a memory registry.
High level, this is just a coder delegating to another coder for the main flow but using encode/decode methods as hooks to run other coder processing. The advantage is that it runs almost at the same time for all coders which allows to compare them more than doing multiple runs. This coder is very slow compared to only one of them but the goal is just to reuse the real pipeline data to get some comparison metrics, not to have actual figures to be able to compare to your SLA.
In terms of implementation, the coder takes one mainCoder which is the one used in the pipeline and a map of evaluatedCoders which are the monitored ones. The key of the map is just to give a human name to the coder (and potentially you can compare the same coder tuned with different settings so the coder class is not always relevant as a reporting key). The value is the coder instance.
The base of our coder will be:
package com.github.rmannibucau.describeit.coder;
import static java.util.stream.Collectors.toMap;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.vendor.guava.v20.com.google.common.io.ByteStreams;
public class MultipleCodersCoder<T> extends Coder<T> {
private final Coder<T> mainCoder;
private final Map<String, Coder<T>> coders;
private final int defaultEncodeBufferSize;
public MultipleCodersCoder(final Coder<T> mainCoder,
final Map<String, Coder<T>> evaluatedCoders) {
this(mainCoder, evaluatedCoders, 8192);
}
public MultipleCodersCoder(final Coder<T> mainCoder,
final Map<String, Coder<T>> evaluatedCoders,
final int defaultEncodeBufferSize) {
this.mainCoder = mainCoder;
this.coders = evaluatedCoders;
this.defaultEncodeBufferSize = defaultEncodeBufferSize;
}
@Override
public void encode(final T value, final OutputStream outStream) throws IOException {
mainCoder.encode(value, outStream);
}
@Override
public T decode(final InputStream inStream) throws IOException {
final byte[] bytes = ByteStreams.toByteArray(inStream);
return mainCoder.decode(new ByteArrayInputStream(bytes));
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return mainCoder.getCoderArguments();
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
mainCoder.verifyDeterministic();
}
}
This one is just a delegate coder except the decode which loads the stream into memory before decoding it. You will quickly see why.
Before we measure our coders performances we must have a registry. To keep it simple we will just create a singleton (even with static storage everywhere) which will collect each iteration of encode or decode methods:
public class MetricsRegistry {
private static final Collection<Map<String, Measure>> ENCODES = new ArrayList<>();
private static final Collection<Map<String, Measure>> DECODES = new ArrayList<>();
private MetricsRegistry() {
// no-op
}
static void addDecode(final Map<String, Measure> measures) {
synchronized (DECODES) {
DECODES.add(measures);
}
}
static void addEncode(final Map<String, Measure> measures) {
synchronized (ENCODES) {
ENCODES.add(measures);
}
}
public static Collection<Map<String, Measure>> getEncode() {
synchronized (ENCODES) {
return new ArrayList<>(ENCODES);
}
}
public static Collection<Map<String, Measure>> getDecode() {
synchronized (DECODES) {
return new ArrayList<>(DECODES);
}
}
public static void writeAsCsv(final Collection<Map<String, Measure>> data, final File output) throws IOException {
if (data.isEmpty()) {
throw new IllegalArgumentException("No data to write");
}
final Collection<String> titles = data.iterator().next().keySet().stream().sorted().collect(toList());
ofNullable(output.getParentFile()).ifPresent(File::mkdirs);
try (final PrintWriter out = new PrintWriter(new FileWriter(output))) {
// headers
out.println(Stream.concat(
titles.stream().map(it -> it + " (ns)"),
titles.stream().map(it -> it + " (b)")
).collect(joining(";")));
// data
data.forEach(line -> out.println(Stream.concat(
titles.stream().map(it -> line.get(it).duration),
titles.stream().map(it -> line.get(it).size)
).map(String::valueOf).collect(joining(";"))));
}
}
public static void reset() {
synchronized (DECODES) {
DECODES.clear();
}
synchronized (ENCODES) {
ENCODES.clear();
}
}
static class Measure {
private final long duration;
private final long size;
Measure(final long duration, final long size) {
this.duration = duration;
this.size = size;
}
public long getDuration() {
return duration;
}
public long getSize() {
return size;
}
@Override
public String toString() {
return "Measure{duration=" + duration + ", size=" + size + '}';
}
}
}
Now we can measure the performance for each evaluatedCoder :
@Override
public void encode(final T value, final OutputStream outStream) throws IOException {
MetricsRegistry.addEncode(coders.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> createEncodeMeasure(value, entry.getValue()))));
mainCoder.encode(value, outStream);
}
private MetricsRegistry.Measure createEncodeMeasure(final T data, final Coder<T> coder) {
final ByteArrayOutputStream stream = new ByteArrayOutputStream(defaultEncodeBufferSize);
final long start = System.nanoTime();
try {
coder.encode(data, stream);
} catch (final IOException e) {
throw new IllegalStateException("coder shouldn't fail", e);
}
final long end = System.nanoTime();
return new MetricsRegistry.Measure(end - start, stream.toByteArray().length);
}
Before writing the main flow output, we capture each coder data and then add them into the registry.
The decoding side is pretty much the same except a little trick: you must first encode the data since the main stream is likely using another coder logic:
@Override
public T decode(final InputStream inStream) throws IOException {
final byte[] bytes = ByteStreams.toByteArray(inStream);
final T decode = mainCoder.decode(new ByteArrayInputStream(bytes));
MetricsRegistry.addDecode(coders.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> {
final Coder<T> evaluatedCoder = entry.getValue();
final ByteArrayOutputStream outStream = new ByteArrayOutputStream(defaultEncodeBufferSize);
try {
evaluatedCoder.encode(decode, outStream);
} catch (final IOException e) {
throw new IllegalStateException(e);
}
final byte[] coderBytes = outStream.toByteArray();
return createDecodeMeasure(coderBytes.length, new ByteArrayInputStream(coderBytes), evaluatedCoder);
})));
return decode;
}
private MetricsRegistry.Measure createDecodeMeasure(final long length, final InputStream stream, final Coder<T> coder) {
final long start = System.nanoTime();
try {
coder.decode(stream);
} catch (final IOException e) {
throw new IllegalStateException("coder shouldn't fail", e);
}
final long end = System.nanoTime();
return new MetricsRegistry.Measure(end - start, length);
}
Now you can set this coder on a PCollection:
.apply("SomeStep", new MyRowStep())
.setCoder(createEvaluationCoder(schema));
// ...
private static Coder<Row> createEvaluationCoder(final Schema schema) {
final HashMap<String, Coder<Row>> evaluatedCoders = new HashMap<>();
evaluatedCoders.put("some-step_serializable", SerializableCoder.of(Row.class));
evaluatedCoders.put("some-step_beam-row", RowCoder.of(schema));
return new MultipleCodersCoder<>(RowCoder.of(schema), evaluatedCoders);
}
Now your pipeline is ready and your measure points are set (don't forget you can change the keys to mark the step measures and have multiple step measures in the same execution), you can run the pipeline with the direct runner and add this code at the end of the execution :
MetricsRegistry.writeAsCsv(
MetricsRegistry.getDecode(), new File("target/metrics_decode.csv"));
MetricsRegistry.writeAsCsv(
MetricsRegistry.getEncode(), new File("target/metrics_encode.csv"));
This will create two files with the captured data.
Tip: this solution works only with a local execution (direct runner, spark local, flink local, ...) but it is easy to add a step at the end of the pipeline which aggregates all local data and dump them in another storage if you want to do it in a cluster. That said, the local execution should be enough to compare coders (not to have precise values). Also, it is encouraged to run the pipeline once on a medium sized dataset and then only activate the metrics to make sure the JVM is "hot". This is why the metrics registry has a reset method.
The two files are more or less the same in terms of structure (only values change):
some-step_beam-row (ns);some-step_serializable (ns);some-step_beam-row (b);some-step_serializable (b)
88366930;1258035;12;1447
102216064;172889;18;1453
27641;129995;12;1442
27778;111890;12;1447
25315;130656;18;1448
12979;76494;18;1453
19258;100247;18;1453
19471;95238;18;1453
18994;103645;12;1447
21862;101490;12;1447
19112;92646;12;1442
18631;4269115;18;1448
30745;153819;18;1448
24528;100428;12;1442
30788;107798;18;1448
18316;76217;18;1448
19067;92164;18;1448
29138;111405;18;1448
14728;77548;18;1448
22048;94300;18;1448
28918;86020;12;1442
13412;71022;12;1442
23397;101131;12;1442
20484;106320;12;1442
19952;76387;12;1442
38191;99783;12;1442
From now on, you can use any data processor - including beam itself if you are in a joking mood, to extract statistics about these raw data. What's more, you can compute the statictics on the fly but having raw data will allow you to remove singular points and compute the aggregate in different ways.
Here is a sample report from these data:
Beam Row (duration, ns) | Serializable (duration, ns) | Beam Row (size, b) | Serializable (size, b) | |
---|---|---|---|---|
Min | 12 979 | 71 022 | 12 | 1442 |
Max | 102 216 064 | 4 269 115 | 18 | 1453 |
Average | 7 351 067 | 307 564 | 15 | 1446 |
Median | 21 955 | 100 779 | 15 | 1447 |
Even this simple use case with a very simple and flat structure is surprising: java serialization is, as expected, more verbose than beam row serialization but it can be way faster by itself (i.e. in memory, not on the wire where you will need to compute the throughput based on the size). Don't take these conclusion too seriously though, the measure dataset was not very big and diversity of the data was not accurate. What is important and why I'm sharing this table, is more to show how to process the previous data to be able to exploit them and decide which serialization to use.
From the same author:
In the same category: