Apache Beam 2.9.0 was released last week, so if you were using a previous version you are likely tempted to upgrade. This is what I did what was expected to be a single line change in a pom had surprisingly be a bit more work.

Before explaining what happened, let me describe the setup I had:

  • I was running in a small spark cluster (1 master and 1 slave, mainly because I spotted it out in integration tests during my project build using a JUnit rule forking a Spark "cluster"),
  • I was "submitting" a fatjar built with maven-shade-plugin,
  • The submitted "main" was building a simple Apache Beam pipeline and "running" it

All was working very well with Beam 2.8.0 but when I upgraded to Apache Beam 2.9.0 I got the error:

No TransformEvaluator registered for BOUNDED transform xxxx

This is very surprising because in 2.8.0 it was known, so would it be a regression? After checking how the exception was thrown I realized the Spark runner changed, particularly in its translator part. In 2.8.0 it was looking like:

EVALUATORS.put(Read.Bounded.class, readBounded());

and in 2.9.0 it is like this:

EVALUATORS.put(PTransformTranslation.READ_TRANSFORM_URN, readBounded());

Not a crazy change right? The translator has been migrated from a Class keyed registry to an URN keyed registry. The goal is quite obvious : prepare Spark runner to support the polyglot feature Apache Beam promises (Beam calls it "portability" but since Beam is by definition a portable API between engines, it is too misleading to reuse this naming in a post).

Ok, so Beam enhanced its runner, changed the key to lookup the Spark implementation of each operation primitives, so does it mean the key is wrong? Still not, otherwise Beam build would have failed, its coverage is not that low - it is actually quite good.

The impacting change is not in the registry itself but the way it is queried. In 2.8.0 the key was passed and the registry was directly read, in 2.9.0 the PTransform is passed, converted in urn and the the registry is keyed.

The way Spark runner translates the PTransform in urn is to use another registry which is populated from a SPI (service provider interface)....yes a registry in front of a registry. The rational is to share most of the urn related logic in a module and let all runners reuse it. Indeed Beam could have done a generic runner and just let the specific (like Spark) parts be pluggable but they did the opposite : a library is provided and each runner reuses what it needs/wants from the generic part, this is their choice.

Ok but does it work in Spark standalone? Yes, because the classpath is correct and this is all the core of this issue. Keep in mind the deployment is the recommanded one on Apache Beam Spark documentation, i.e. do a shade and submit it through Spark submit. The issue is that the SPI is loading its implementations (= it is populating its registry) from the resources in META-INF/services/org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar. The issue is contained in the plural of this previous sentence : multiple modules define this resource so the shade plugin will just take one and the registry will not fully be populated which means some transforms will not be available. Here we are, we know what was the issue and how to fix it : merging the SPI files in a single one (since each not commented/empty line define a fully qualified name you can just append all the files to get the aggregated flavor). Here is how to do it:

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <transformers>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

Since the use of this standard Java plugability system is very old and common, Maven shade plugin provides a built-in transformer to do that so it is just a matter of configuration of your build to fix the upgrade.

So at that point you can wonder why Maven shade plugin didn't do it by itself? The plugin was designed to stay simple and just do what you ask. Automatically merging two files can seem a good idea but can also be dangerous in several cases, and some of these SPI will not support having multiple implementations, the shade saving from an exception by swallowing on of the registration. This means the plugin is designed to stay safe by default. This is so true that it logs all the potential conflicts between modules and the overlapping classes. However since Beam packaging is not yet very clean there are too much and the plugin just says that and that you need to enable debug logs to see them all. Due to the number of conflicts you would likely miss this single blocking conflict (this is just a 2 files conflicts). At least this is what the default logging of the plugin implies. In practise this is not fully the case. The plugin only manages the conflicts for classes and not resources so it means you would have missed this issue anyway.

For your information, here is, in debug mode, the log you get once the transformer is in action:

[DEBUG] Transforming META-INF/services/org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar using org.apache.maven.plugins.shade.resource.ServicesResourceTransformer

The last note will be that it can not be trivial to identify it because as a good Google driven project, Beam uses Google @AutoService which is an annotation you can put on any class which will generate thanks to an annotation processor - so at build time - the SPI file to register the implementation in current compiled module. This is great for developpers, but means you can't find the implementation as easily as expected on github : concretely, if you are looking for the META-INF/services files you will not find them so better to just copy the dependencies of your project in a folder - mvn dependency:copy-dependencies does it very well - and unzip each jar in a specific folder to check the present files.

From the same author:

In the same category: