Apache beam pipelines with Scala: part 1 – template

Posted in: Big Data, Cloud, Google Cloud Platform, Technical Track

In this 3-part series I’ll show you how to build and run Apache Beam pipelines using Java API in Scala.

In the first part we will develop the simplest streaming pipeline that reads jsons from Google Cloud Pub/Sub, convert them into TableRow objects and insert them into Google Cloud BigQuery table. Then we will run our pipeline with sbt on local runner and then deploy it on Google Cloud Platform (GCP).

Prerequisites
You need to have GCP project created with enabled API for DataFlow, Pub/Sub and BigQuery. Perform gcloud auth login or create service account with proper permissions and download key.json locally, install JDK and sbt. Also you have to create Pub/Sub topic and subscription, target BQ table and temp location in GS.

Building pipeline
First of all you need to create an instance of options. You may either use fromArgs method or set parameters manually.

  trait TestOptions extends PipelineOptions with DataflowPipelineOptions
  val options = PipelineOptionsFactory.create().as(classOf[TestOptions])

The next thing to do is to define input subscription name and output table reference object for pipeline I/O.

  val fullSubscriptionName =
    s"projects/$projectId/subscriptions/$subscription"
  val targetTable =
    new TableReference()
      .setProjectId(projectId)
      .setDatasetId(dataset)
      .setTableId(tableName)

Now we can describe our DoFn function. It processes json string messages trying to convert them into TableRow:

  class MyDoFn extends DoFn[String, TableRow] with LazyLogging {
    @ProcessElement
    def processElement(c: ProcessContext) {
      val inputString = c.element()
      logger.info(s"Received message: $inputString")
      Try {
        Transport.getJsonFactory.fromString(inputString, classOf[TableRow])
      } match {
        case Success(row) ?
          logger.info(s"Converted to TableRow: $row")
          c.output(row)
        case Failure(ex) ?
          logger.info(s"Unable to parse message: $inputString", ex)
      }
    }
  }

The last thing to do is to combine all parts together using pipeline object:

  val p = Pipeline.create(options)
  p.apply("read-pubsub", PubsubIO
      .readStrings()
      .fromSubscription(fullSubscriptionName))
    .apply("process", ParDo.of(new MyDoFn))
    .apply("write-bq", BigQueryIO
      .writeTableRows()
      .to(targetTable)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER))

And we are ready to run it:

  p.run()

Run pipeline locally

To start your pipeline locally you need to specify DirectRunner in pipeline options. Then you can simply start your pipeline with sbt run command:

$ sbt run
[info] Loading settings from plugins.sbt ...
[info] Loading project definition from C:\Users\Valentin\workspace\beam-template\project
[info] Loading settings from build.sbt ...
[info] Set current project to beam-template (in build file:/C:/Users/Valentin/workspace/beam-template/)
[info] Running com.pythian.Beam

Then you can publish the message from cloud console into you topic to test it:

{ "id": 1, "data": "test data" }

In a while you should see something like:

2017/12/11 01:54:16.581 INFO  com.pythian.Beam$MyDoFn - Received message: { "id": 1, "data": "test data" }
2017/12/11 01:54:16.588 INFO  com.pythian.Beam$MyDoFn - Converted to TableRow: {"id":1,"data":"test data"}

You can now select your row from BigQuery (please note that table preview won’t show the rows which are in streaming buffer yet):

select * from test_nikotin.test where id = 1

Run pipeline in DataFlow

Once you are done with your tests you are ready to start it on GCP. Configure runner to DataflowRunner and run sbt:

$ sbt run
[info] Loading settings from plugins.sbt ...
...
[info] Running com.pythian.Beam
...
2017/12/11 01:50:04.937 INFO  o.a.b.r.dataflow.util.PackageUtil - Uploading 112 files from PipelineOptions.filesToStage to staging location to prepare for execution.
2017/12/11 01:50:09.093 INFO  o.a.b.r.dataflow.util.PackageUtil - Staging files complete: 111 files cached, 1 files newly uploaded
2017/12/11 01:50:09.196 INFO  o.a.b.r.d.DataflowPipelineTranslator - Adding read-pubsub/PubsubUnboundedSource as step s1
...
Dataflow SDK version: 2.1.0
2017/12/11 01:50:11.064 INFO  o.a.b.r.dataflow.DataflowRunner - To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/myproject/dataflow/job/2017-12-10_14_50_12-10326138943752681303
Submitted job: 2017-12-10_14_50_12-10326138943752681303
2017/12/11 01:50:11.064 INFO  o.a.b.r.dataflow.DataflowRunner - To cancel the job using the 'gcloud' tool, run:
> gcloud beta dataflow jobs --project=myproject cancel 2017-12-10_14_50_12-10326138943752681303
[success] Total time: 17 s, completed Dec 11, 2017 1:50:11 AM

You can navigate to DataFlow service in cloud console to verify it’s running as expected and check the logs in Cloud Logging.

You can find the code here

In the second part I’ll build a pipeline with control flow from another Pub/Sub topic via side input.

email
Want to talk with an expert? Schedule a call with our team to get the conversation started.

About the Author

Valentin is a specialist in Big Data and Cloud solutions. He has extensive expertise in Cloudera Hadoop Distribution, Google Cloud Platform and skilled in building scalable performance critical distributed systems and data visualization systems.

1 Comment. Leave new

Hi Valentin,

I have all the required dependencies of Beam SDK. But I am not able to extend DataflowPipelineOptions. Is there any specific jar/dependency that I should be importing ?

Reply

Leave a Reply

Your email address will not be published. Required fields are marked *