Apache Beam pipelines with Scala: part 3 – dynamic processing

Posted in: Big Data, Cloud, Google Cloud Platform

In the third part of the series we will develop a pipeline to transform messages from “data” Pub/Sub using messages from the “control” topic as source code for our data processor.

The idea is to utilize Scala toolBox. It’s much easier than doing the same in Java. Basically it’s just three lines of code:

val toolbox = currentMirror.mkToolBox()
val tree = toolbox.parse(code)
toolbox.eval(tree)

The problem we have to solve is performance issue. Having parsing and evaluating this code for each processed element is very ineffective. But on the other hand “control” is not something that is frequently changing. So we can definitely get a profit utilizing memoization. We can do result cache per DoFn instance but it’s way too better to use it per JVM level. We just need to take care of thread safety. For the sake of simplicity I will use a synchronized approach while it would be more idiomatic and efficient to use Futures.

The one more thing we have to take care with memoization: how many results will we keep cached? Here is my simple attempt:

class Memoize[K, V](size: Int)(fun: K => V) {
  private val cache = scala.collection.concurrent.TrieMap.empty[K, V]
  private val queue = scala.collection.mutable.Queue.empty[K]
  def apply(k: K): V =
    cache.getOrElse(k,
      this.synchronized {
        cache.getOrElse(k, {
          if (queue.size >= size) {
            cache.remove(queue.dequeue())
          }
          queue.enqueue(k)
          val v = fun(k)
          cache.put(k, v)
          v
        })
    })
}

Please note that executing all to get a result for any not-yet-cached argument will be locked until cache is computed.

The next thing to think about is what K and V in our function should be. The K is obviously String. For V we need to get something that performs parsing itself INPUT => OUTPUT. For our needs we may want to minimize code in control messages so we convert input data into JsonObject before pass it to function. We would get TableRow as result. Thus V is JsonObject => TableRow.

We expect that it would be fine to use for “control” message:

  (json: com.google.gson.JsonObject) => {
    new com.google.api.services.bigquery.model.TableRow()
      .set("id", json.get("id").getAsLong)
      .set("data", json.get("text").getAsString)
  }

Now it’s time to define an object that provides us with the ability to do something like:

val tableRow = code.evalFor(json)

This is the such object:

object Dynamic {
  private val toolbox = currentMirror.mkToolBox()
  private val dynamic = new Memoize(10)((code: String) => {
    val tree = toolbox.parse(code)
    toolbox.eval(tree).asInstanceOf[JsonObject => TableRow]
  })
  def apply(code: String) = dynamic(code)
}

We keep single per JMV instances of toolbox and dynamic memoized compiler. So now we can simply use:

val tableRow = Dynamic(code)(json)

To make it works as String method above we should use implicit value class

  implicit class RichString(val code: String) extends AnyVal {
    def evalFor(arg: JsonObject): TableRow = Dynamic(code)(arg)
  }

Now the only thing we change from our previous side input example is slightly altered MyDoFn:

  class MyDoFn(sideView: PCollectionView[java.util.List[String]]) extends DoFn[String, TableRow] with LazyLogging {
    @ProcessElement
    def processElement(c: ProcessContext) {
      val t0 = System.currentTimeMillis()
      val sideInput = c.sideInput(sideView).get(0)
      val inputString = c.element()
      logger.info(s"Getting new data=$inputString")
      Try {
        val json = new JsonParser().parse(inputString).getAsJsonObject
        sideInput.evalFor(json)
      } match {
        case Success(row) =>
          logger.info(s"Inserting to BiqQuery: $row")
          c.output(row)
        case Failure(ex) =>
          logger.info(s"Unable to parse message: $inputString", ex)
      }
      val t1 = System.currentTimeMillis()
      logger.info(s"Processed data in ${t1 - t0} ms")
    }
  }

I added logger for time processing to show how much time it saves using cached functions.

We are ready to test it now. Start it locally with sbt:

$ sbt run
[info] Loading settings from plugins.sbt ...
[info] Loading project definition from C:\Users\Valentin\workspace\beam-dynamic\project
[info] Loading settings from build.sbt ...
[info] Set current project to beam-dynamic (in build file:/C:/Users/Valentin/workspace/beam-dynamic/)
[info] Running com.pythian.Beam

Now publish our first attempt code into “control” topic and then publish data to “data” topic

//for the "control topic"
  (json: com.google.gson.JsonObject) => {
    new com.google.api.services.bigquery.model.TableRow()
      .set("id", json.get("id").getAsLong)
      .set("data", "placeholder")
  }
// for the "data" topic
{"id":1,"text":"row1"}
{"id":2,"text":"row2"}
{"id":3,"text":"row3"}

We can see how data is processed in the logs:

2017/12/11 17:47:24.049 INFO  com.pythian.Beam$MyDoFn - Getting new data={"id":1,"text":"row1"}
2017/12/11 17:47:24.544 INFO  com.pythian.Beam$MyDoFn - Getting new data={"id":2,"text":"row2"}
2017/12/11 17:47:25.816 INFO  com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=1, data=placeholder}
2017/12/11 17:47:25.816 INFO  com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=2, data=placeholder}
2017/12/11 17:47:25.880 INFO  com.pythian.Beam$MyDoFn - Processed data in 923 ms
2017/12/11 17:47:25.880 INFO  com.pythian.Beam$MyDoFn - Processed data in 336 ms
2017/12/11 17:47:30.076 INFO  com.pythian.Beam$MyDoFn - Getting new data={"id":3,"text":"row3"}
2017/12/11 17:47:30.076 INFO  com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=3, data=placeholder}
2017/12/11 17:47:30.077 INFO  com.pythian.Beam$MyDoFn - Processed data in 1 ms

It took about 1 sec to compile the code for id=1, then for id=2 it was waiting for some time on synchronized while “control” function has been compiling and for id=3 it took just a 1 ms.

Now lets change our parsing code and publish yet another 3 data rows:

//for the "control" topic
  (json: com.google.gson.JsonObject) => {
    new com.google.api.services.bigquery.model.TableRow()
      .set("id", json.get("id").getAsLong)
      .set("data", json.get("text").getAsString)
  }
//for the "data" topic
{"id":4,"text":"row4"}
{"id":5,"text":"row5"}
{"id":6,"text":"row6"}

And here is a log:

2017/12/11 17:54:09.859 INFO  com.pythian.Beam$MyDoFn - Getting new data={"id":4,"text":"row4"}
2017/12/11 17:54:10.237 INFO  com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=4, data=row4}
2017/12/11 17:54:10.238 INFO  com.pythian.Beam$MyDoFn - Processed data in 379 ms
2017/12/11 17:54:26.885 INFO  com.pythian.Beam$MyDoFn - Getting new data={"id":5,"text":"row5"}
2017/12/11 17:54:26.885 INFO  com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=5, data=row5}
2017/12/11 17:54:26.886 INFO  com.pythian.Beam$MyDoFn - Processed data in 0 ms
2017/12/11 17:54:42.868 INFO  com.pythian.Beam$MyDoFn - Getting new data={"id":6,"text":"row6"}
2017/12/11 17:54:42.869 INFO  com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=6, data=row6}
2017/12/11 17:54:42.871 INFO  com.pythian.Beam$MyDoFn - Processed data in 1 ms

As expected the new code applied and later messages processed much faster than the first one.

Please note that as with any other dynamic approach you have take care about permissions for the “control” topic.

You can find the code here.

email

Interested in working with Valentin? Schedule a tech call.

About the Author

Valentin is a specialist in Big Data and Cloud solutions. He has extensive expertise in Cloudera Hadoop Distribution, Google Cloud Platform and skilled in building scalable performance critical distributed systems and data visualization systems.

No comments

Leave a Reply

Your email address will not be published. Required fields are marked *