Apache beam pipelines With Scala: Part 2 – Side Input

Posted in: Big Data, Cloud, Google Cloud Platform, Technical Track

In the second part of this series we will develop a pipeline to transform messages from “data” Pub/Sub topic with the ability to control the process via “control” topic.

How to pass effectively non-immutable input into DoFn, is not obvious, but there is a clue in documentation:

If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.

Having this in hand we can utilize the construction like this:

  val sideView = p.apply(PubsubIO.readStrings()
      .fromSubscription(fullSideInputSubscriptionName))
    .apply(Window.into[String](new GlobalWindows())
      .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
      .discardingFiredPanes())
    .apply(View.asList())

We have one single global window, fire trigger for incoming rows, keep only new values and convert it to the view to be able to use it for sideInput.

A couple of things to note here. First of all you have to keep in mind that the system is eventually consistent and doesn’t prevent message order. So in this case you may want to use accumulatingFiredPanes instead. Secondly, trigger firing is supposed to provide at least 1 value, but theoretically it may multiply and if you use View.asSingletoneton your pipeline will fail. The last thing to note is that pipeline will wait for the first message from the sideInput before it will start processing the steps using it.

Now we change our template slightly to use sideView in it:

  p.apply(PubsubIO.readStrings().fromSubscription(fullSubscriptionName))
    .apply(ParDo.of(new MyDoFn(sideView)).withSideInputs(sideView))
    .apply(BigQueryIO
      .writeTableRows()
      .to(targetTable)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER))

DoFn uses this side input to switch if we either process the data or ignore incoming messages:

  class MyDoFn(sideView: PCollectionView[java.util.List[String]]) extends DoFn[String, TableRow] with LazyLogging {
    @ProcessElement
    def processElement(c: ProcessContext) {
      val sideInput = c.sideInput(sideView).get(0)
      val inputString = c.element()
      if (sideInput == "ENABLED") {
        ...
      } else {
        logger.info(s"Ignoring input messages, sideInput=$sideInput")
      }
    }
  }

To test our pipeline we will send a sequence of messages into both Pub/Sub topics. Start the pipeline locally:

$ sbt run
[info] Loading settings from plugins.sbt ...
[info] Loading project definition from C:\Users\Valentin\workspace\beam-sideinput\project
[info] Loading settings from build.sbt ...
[info] Set current project to beam-sideinput (in build file:/C:/Users/Valentin/workspace/beam-sideinput/)
[info] Running com.pythian.Beam

Send a few messages to “data” topic:

{"id":1,"text":"row1"}
{"id":2,"text":"row2"}
{"id":3,"text":"row3"}

There is nothing processed or ignored in the logs as far as we haven’t yet published anything into the “control” topic.
Now lets publish "ENABLED" in the “control” topic. In a while we will see three rows above processed:

2017/12/11 14:43:09.773 INFO  com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=2, data=row2}
2017/12/11 14:43:09.773 INFO  com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=3, data=row3}
2017/12/11 14:43:09.773 INFO  com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=1, data=row1}

Send one more message to the “data” topic — it is processed as expected

{"id":4,"text":"row4"}
...
2017/12/11 14:44:42.805 INFO  com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=4, data=row4}

Now send "DISABLED" to the “control” topic and try yet another message to the “data” topic:

{"id":5,"text":"row5"}
...
2017/12/11 14:46:32.097 INFO  com.pythian.Beam$MyDoFn - Ignoring input messages, sideInput=DISABLED

Exactly as expected. Note that having this pipelines running on DataFlow there may be delay in propagation and you will get new “data” processed after you published disabling message.

You can find the code here.

In the third part of this series we will build a powerful pipeline that uses side input messages as source code for data processors. This may be helpful for the building flexible pipelines that can be tuned on a fly without restarting of them.

email

Interested in working with Valentin? Schedule a tech call.

About the Author

Valentin is a specialist in Big Data and Cloud solutions. He has extensive expertise in Cloudera Hadoop Distribution, Google Cloud Platform and skilled in building scalable performance critical distributed systems and data visualization systems.

No comments

Leave a Reply

Your email address will not be published. Required fields are marked *