Building a custom routing NiFi processor with Scala

Posted in: Big Data, Technical Track

In this post we will build a toy example NiFi processor which is still quite efficient and has powerful capabilities.

Processor logic is straightforward: it will read incoming files line by line, apply given function to transform each line into key-value pairs, group them by key, write values to output files and transfer them into specified relationships based on group key.

As a starting point I used nifi-processor-bundle-scala. You can deploy it right away with sbt.


We have 3 permanent relationships specified in trait RouterRelationships:


    : here will be transferred all files for the keys that don’t have related dynamic relationships configured;


    : here will be transferred incoming flow file if there is any failure happened during the processing;


    : we will collect all lines that couldn’t be processed into one file and move it here.

In addition to these, we can configure dynamic relationships via properties. Those properties name will be relationship name and value is boolean parameter if relationship should be crealted or not.


For properties we have one required property that should be configured with mapping function that will convert string into key-value pair. Here is the simple validator for this property:

  private val CODE_VALIDATOR = new Validator() {
    override def validate(subject: String, input: String, context: ValidationContext): ValidationResult = {
      val compiled = LineProcessor.compile(input)
      val text = compiled match {
        case Failure(ex) => ex.toString
        case _ =>
      new ValidationResult.Builder()
        .explanation(s"Compilation failed with exception: $text")

Please note that this validation is only about compilation: we can’t really ensure that this function will work correctly. It will just check that the code can be compiled and have String=>(String,String) type.

Other properties are dynamic and define relationships as described above. We have to implement getSupportedDynamicPropertyDescriptor and onPropertyModified methods in our processor class.


Here is where all processing logic lives. Once non-null flow file comes, we try to get compiled processing function from cache or actually compile it:

val processorCode = context.getProperty(PROCESSOR).getValue
val transform = compile(processorCode).get

Then we make preparations for RoutingSink (we will touch it later): create Sink that takes care of creating output flow files, prepare key-value and key classes for routing, and finally create an instance of RoutingSink:

val groupBy = RoutingSink.create[KeyValue, KeyType, FlowFile](FlowFileSink, _.keyType)

After this we are ready to read line-by-line input file, transform lines into key-value pairs and send them to Sink.

          val br = new BufferedReader(new InputStreamReader(in))
          var line: String = null
          while ( {
            line = br.readLine()
            line != null
          }) {
            val kv = Try {
              val (k, v) = transform(line)
              GoodKeyValue(k, v)

After all we close it, getting list of output flow files and manage file transferring to correspondent relationships:

          val files = groupBy.close()
          files.foreach {
            case (ErrorKeyType, file) =>
              session.transfer(file, RelIncompatible)
            case (DataKeyType(name), file) =>
              val rel = dynamicRelationships.get.getOrElse(name, RelUnmatched)
              session.transfer(file, rel)


RoutingSink is the akka actor based hand made implementation of routing worker pool were each worker processes only data for a one key. Worker actor creates an instance of its own Sink and adds all incoming data into it. Router actor performs housekeeping work and communications.

Build and deploy

In order to build the processor, please run mvn compile test package. It will also execute tests before building nar file. After packaging you should find nar file nifi-akka-bundle/nifi-akka-nar/target/nifi-akka-nar-1.0.nar.

To use the processor in NiFi copy this file into lib directory. Make sure you have the same NiFi version that you used for processor building (at the moment of writing this article it was 1.4.0).

Start NiFi and try to create something simple as at the picture below. Add 1 one property for dynamic relationship “black”, add the following code for json decomposing:

import pythian.nifi.processors.LineProcessor.GSON
(line: String) => {
  val jo = GSON.fromJson(line, classOf[JsonObject])
  val tpe = jo.get("type").getAsString
  val payload = jo.get("payload")
  val data = GSON.toJson(payload)
  (tpe, data)

Add a few GetFile and PutFile processors and start it:

The use file with following content to test it:

{BAD ROW !!! "type":"red","payload":{"id":36,"data":713}}

A moment after put this file into input directory it should be processed and you can find 2 files for red and green keys under unmatched location, one file for black and one incompatible with only bad row:

{BAD ROW !!! "type":"red","payload":{"id":36,"data":713}}

As further development processing can be moved from main thread into akka router with pull of workers. It should be more carefully taken about failures (what if sink creation failed?) and actor restarts.


Interested in working with Valentin? Schedule a tech call.

About the Author

Valentin is a specialist in Big Data and Cloud solutions. He has extensive expertise in Cloudera Hadoop Distribution, Google Cloud Platform and skilled in building scalable performance critical distributed systems and data visualization systems.

No comments

Leave a Reply

Your email address will not be published. Required fields are marked *