Apache Beam: how to execute a task before/after the record flow starts/stops
Big Data pipelines are designed to process tons of data. However it is still common to need to execute some task once, either before or after (for batches) the flow was processed. This is not something you will find in Apache Beam API directly, there is no concept of "singleton" or so, because the preference is done to the resilience and distribution of the processing.
However - and I thank my colleague Etienne to have highlighted that, there are transforms you can rely on in Beam core to build such behavior.
Before seeing what it means in terms of code, let's see the high level design. The idea is to route the flow (data) to two different routes. This can be seens as a multicast of the data:
Indeed, this is not sufficient to implement a singleton but this is the key point of all the logic behind our implementation. From now on we must just ensure one branch - let say branch2 - can notify the other - let say branch1 - it has executed its code. In Apache Beam the communication can be done thanks to a "side input". It means that branch1 will wait on branch2 that a record is emitted. The transform implementing that is (well) called Wait#on.
Now we know how to synchronize two branches let's rewrite our pipeline/graph to rely on that:
The data will flow (1), then for the first record the conditional branch (2) will execute some code at the same time the first record is emitted. At the same time, the processing branch (3) will wait some record flow AND the conditional branch emit some record. In other words, the processing branch will start processing data only when data will flow and branch 2 executed its task, so we implemented a "before" or initialization task.
What does it look like in terms of code? Exactly the same:
// (1)
final PCollection<String> source = pipeline.apply("source", SomeIO.read());
final PCollection<Void> afterSingleton = source
.apply("singleton#first", Sample.any(1)) // (2)
.apply("singleton#task", ParDo.of(new DoFn<String, Void>() {
@ProcessElement // (3)
public void onElement(@Element final String input, final OutputReceiver<Void> output) {
MyTask.execute();
output.output(null);
}
}));
source
.apply("output#synchro", Wait.on(afterSingleton)) // (4)
.apply("output", SomeIO.write()));
// (5)
pipeline.run().waitUntilFinish();
- We start our pipeline by reading some data - whatever IO it is
- we add the branch2 to execute our initialization task and ensure it takes into account a single element (the first) to not execute the task N (>1) times,
- on the same branch we add a DoFn to execute the task we want and don't forget to output something (null is enough)
- on the main processing branch, we add a step to wait the record of the singleton/task branch is emitted - the red arrow in previous graph - before the actual processing (SomeIO.write here) which will happen on the main data flow - the green arrow in previous graph.
- Finally we run our pipeline.
This simple modelization works for initialization tasks but also destruction or "post" tasks. You just need to reverse the wait direction between the main processing and single time task:
final PCollection<String> source = pipeline.apply("source", SomeIO.read());
final PCollection<Void> output = source.apply("output", SomeIO.write());
source.apply("singleton#wait", Wait.on(output))
.apply("singleton#onelement", Sample.any(1))
.apply("singleton#task", ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void onElement(@Element final String input, final OutputReceiver<Void> output) {
MyTask.execute();
}
}));
pipeline.run().waitUntilFinish();
The "destruction" flow uses the same hacks:
- sample a single element to execute only once the task,
- wait for the precondition to be executed
This works because the Wait#on waits for the window to be closed to emit the output enabling to activate the other branch. This is also why it is important to add the sampling (with size 1) in the initialization case.
Indeed you can combine these two examples to have an initialization and a destruction before/after any task. One case can be to start/stop some external services (like some AWS costly service).
From the same author:
In the same category: