Apache Beam and JSR233 support or how to inject java transforms!
Apache Beam is a big data project intending to abstract big data environment. You can see it as a JavaEE API for Big Data. Concretely the overall idea is to write your pipeline (flow) and be able to execute it on Spark, Flink, Dataflow, .... without changing the code.
High level it is based on I/O and transformations. A transformation is an element taking 1-N inputs and creating 0-M outputs in the flow.
There are several default transformations but in most cases you will need some dynamism at some point and this is where the JSR 223 - a.k.a. scripting specification - enters into the game. This JSR which is part of the JVM provides a way to run a scripting language on the JVM. This is typically what Jython uses for Python, JRuby for ruby, Nashorn for javascript, etc...
Integrating this JSR with beam will allow to make the pipeline configurable and load the script which will convert some inputs to some outputs dynamically.
The easiest to implement a JSR 223 transform is to do a DoFn which will provide you the needed hooks:
- pre-compilation of the script for good performances
- execution of the script per element of the PCollection (the input dataset representation in beam)
Here is what a pipeline can look like:
PipelineOptions options = PipelineOptionsFactory.create(); <1>
Pipeline p = Pipeline.create(options); <2>
p.apply(TextIO.read().from("fo://somewhere")) <3>
.apply(new Scripting<String, Integer>() {} <4>
.withLanguage("js")
.withScript("context.output(context.element().length());"))
.apply(TextIO.write().to("example")); <5>
p.run(); <6>
After having created a pipeline configuration (1) and a pipeline (2) we can create our flow. This example read data from files (3) and then execute this Scripting transformation (4) and finally writes the output to other files (5). The last step of the code (this can be a main(String[])) is to run (6) the pipeline which will execute it on the environment you selected (by your dependencies). Will typically be Spark or Flink for instance.
In previous example the Scripting transform has three interesting points:
- its configuration is based on the script language, this is because the JSR223 supports any language and also the script to pass to the transform the code to inject
- it uses subclassing ({}) because beam uses Coder to be able to pass the records between the transformation and potentially determine their type. If you don't subclass the Scripting transform you it can't determine the type of the record and implicitly lookup the coder to use. The alternative is to set the coder to use on the pipeline at that point.
- the script uses a context which is an implicit binding/variable which exactly matches the ProcessContext of beam which allows to get access to inputs (element()) and to produce outputs.
In term of implementation if 100% relies on the java ScriptEngine and the only trick is to check if the engine is Compilable to precompile the script and get better performances. You can find a complete implementation on my github.
This is a very good step but most of the languages will be interpreted and not as fast as desired. To make it faster we can write a custom engine which supports....java :).
This is what does the JavaEngine of the same project which will create an CompiledScript implementation injecting the context we talked about earlier before the script passed to the transform and then will compile it using java Compiler API (this assumes you run with a JDK). Once the class loaded in a temporary classloader - to ensure we can drop it once used by the component - we just delegate the logic to the built CompiledScript.
The trick here is to ensure the code compiles since it requires the context to be typed for the main input/output. To do that we reuse the subclassing trick which allows to determine the transform input/output type then we set the types on the engine which can then type the context just before the injected code and let the script rely on a typed ProcessContext.
With that little hack you can now write your pipeline and inject java code you load when building the pipeline which allows to create big data jobs very dynamically!
From the same author:
In the same category: