Testing NiFi Flow – The good, the bad and the ugly

Introduction

Some time has passed since we wrote our last blogpost about Apache NiFi where we pointed out what could be improved. It’s a very nice tool, so we are still using it, but we’ve found some other things that could be improved to make it even better. Of course, we could write another post where all we do is complain, but does that make the world better? Unfortunately not. So we decided that we could do better than that. We took the most painful issue and implemented a solution – that’s how NiFi Flow Tester was created.

Use case

Let’s assume you have to create a simple flow in NiFi according to some specification your client gave you:

  1. read some XML files from directory
  2. validate them (using XSD file)
  3. convert them from XML to JSON
  4. log if something failed
  5. pass it for further processing if everything went well

after studying documentation and some googling you end up with this flow: First you need to list all the files, then read their content, validate it, convert to JSON and pass it further. Looks great. Now it’s time for some tests – you have two options.

Test in NiFi directly – the bad

To test it manually, you need to copy some file to the input directory, wait to see what happens and if everything went well – you can verify it (again, manually) using ‘View data provenance’. Let’s try this approach. Something went wrong. Can you tell where? Something inside ValidateXml, but what exactly? So now to the LogAttribute -> View provenance data and check record attributes to find this one:

validatexml.invalid.error
The markup in the document following the root element must be well-formed.

Maybe there’s something wrong with XML file that your client gave you?

> cat data.txt

    Foo
    student
    31


    Invalid age
    student
    -11


    Bar
    student
    12

Looks like your client has some strange file format, where each line is a separate XML file. Ok, that’s fine you just need to add one more processor which splits text by each line and do the manual tests AGAIN… but at this point, you should have asked a question – is there a better way to do this? After all, we’re programmers and we love to write code.

Nifi Mock – the ugly

Nifi Mock library comes with a processor testing tool – TestRunner. Let’s use this to test our flow! TestRunner can only run one processor, but we can work it out. Let’s start with first one:

val listFileRunner = TestRunners.newTestRunner(new ListFile)
listFileRunner.setProperty(ListFile.DIRECTORY, s"$testDir/person/")
listFileRunner.run()
val listFileResults = listFileRunner.getFlowFilesForRelationship("success")

We create a runner with the processor, set a directory to read from, run it and get the results from the relationship. That was easy, let’s create another TestRunner with FetchFile processor, enqueue results from the previous step, run and collect results.

val fetchFileRunner = TestRunners.newTestRunner(new FetchFile)
listFileResults.foreach(f => fetchFileRunner.enqueue(f))
fetchFileRunner.run()
val fetchFileResults = fetchFileRunner.getFlowFilesForRelationship("success")

Great! Next one:

val splitTextRunner = TestRunners.newTestRunner(new SplitText)
splitTextRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "1")
fetchFileResults.foreach(f => splitTextRunner.enqueue(f)) 
splitTextRunner.run()
val splitTextResults = splitTextRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS)

Two more processors and we are done. And don’t forget to check all the runners and results names when you are done with copy-pasting this code! That’s not cool. I mean, we love to write code but this is too much boilerplate, isn’t it? I’m sure you saw the pattern here. We saw it too and that’s why we introduced a solution.

Nifi Flow Tester – the good

In our new library we have two ways to create flow. Today we will focus on the simple one, which is better for prototyping. First, we need to create new NifiFlowBuilder() and add some nodes to it:

new NifiFlowBuilder()
  .addNode(
    "ListFile",
    new ListFile,
    Map(ListFile.DIRECTORY.getName -> s"$testDir/person/")
  )
  .addNode("FetchFile", new FetchFile, Map())
  .addNode(
    "SplitText",
    new Split Text,
    Map(SplitText.LINE_SPLIT_COUNT.getName -> "1")
  )
  .addNode(
    "ValidateXml",
    new ValidateXml,
    Map(ValidateXml.SCHEMA_FILE.getName -> s"$testDir/person-schema.xsd")
  )
  .addNode(
    "TransformXml",
    new TransformXml,
    Map(TransformXml.XSLT_FILE_NAME.getName -> s"$testDir/xml-to-json.xsl")
  )

First, you need to specify the node name, which allows you to identify your node. It can be the same as the class name as long as you don’t use the same processor twice in flow. Then of course you need to specify the processor with all the parameters as simple Map[String, String]. The safest way to do this is to use processor’s class fields of type PropertyDescriptor. Unfortunately, they’re not always public, so sometimes you have written their name – instead of ListFile.DIRECTORY.getName we could write Input Directory.

When all the nodes are specified, we can add connections between them using node names. The first parameter is the source node, the second is a destination and the third is the selected relationship. Again, using processor class fields is less error prone but sometimes these fields are not public (for instance ListFile or FetchFile processor)

.addConnection("ListFile", "FetchFile", "success")
.addConnection("FetchFile", "SplitText", "success")
.addConnection("SplitText", "ValidateXml", SplitText.REL_SPLITS)
.addConnection("ValidateXml", "TransformXml", ValidateXml.REL_VALID)
.addOutputConnection("TransformXml", "success")

In the end, we add a connection to the output port to get the results. Everything looks good. Now just call build method to create flow. The entire code looks like this:

val flow = new NifiFlowBuilder()
      .addNode("ListFile", new ListFile, Map(ListFile.DIRECTORY.getName -> s"$testDir/person/"))
      .addNode("FetchFile", new FetchFile, Map())
      .addNode("SplitText", new SplitText, Map(SplitText.LINE_SPLIT_COUNT.getName -> "1"))
      .addNode("ValidateXml", new ValidateXml, Map(ValidateXml.SCHEMA_FILE.getName -> s"$testDir/person-schema.xsd"))
      .addNode("TransformXml", new TransformXml, Map(TransformXml.XSLT_FILE_NAME.getName -> s"$testDir/xml-to-json.xsl"))
      .addConnection("ListFile", "FetchFile", "success")
      .addConnection("FetchFile", "SplitText", "success")
      .addConnection( "SplitText", "ValidateXml", SplitText.REL_SPLITS)
      .addConnection("ValidateXml", "TransformXml", ValidateXml.REL_VALID)
      .addOutputConnection("TransformXml", "success")
      .build()

Now we just need to run the flow, collect the results and verify them:

flow.run() 
val files = flow.executionResult.outputFlowFiles 
files.head.assertContentEquals("""{"person":{"name":"Foo","type":"student","age":31}}""")

Disclaimer: this is not the best way to test if JSON is correct, but it was done for simplicity.

Conclusion

Apache NiFi seems to be perfect unless you start a serious data integration. Without the ability to test your changes fast, you will become extremely frustrated with clicking every time your client notices a bug. Sometimes it may be a bug of the NiFi itself, but you will never know it until you debug the code. That’s something our library can help you with. You can find it here: https://github.com/TouK/plumber

You May Also Like

Clojure web development – state of the art

It’s now more than a year that I’m getting familiar with Clojure and the more I dive into it, the more it becomes the language. Once you defeat the “parentheses fear”, everything else just makes the difference: tooling, community, good engineering practices. So it’s now time for me to convince others. In this post I’ll try to walktrough a simple web application from scratch to show key tools and libraries used to develop with Clojure in late 2015.

Note for Clojurians: This material is rather elementary and may be useful for you if you already know Clojure a bit but never did anything bigger than hello world application.

Note for Java developers: This material shows how to replace Spring, Angular, grunt, live-reload with a bunch of Clojure tools and libraries and a bit of code.

The repo with final code and individual steps is here.

Bootstrap

I think all agreed that component is the industry standard for managing lifecycle of Clojure applications. If you are a Java developer you may think of it as a Spring (DI) replacement - you declare dependencies between “components” which are resolved on “system” startup. So you just say “my component needs a repository/database pool” and component library “injects” it for you.

To keep things simple I like to start with duct web app template. It’s a nice starter component application following the 12-factor philosophy. So let’s start with it:

lein new duct clojure-web-app +example

The +example parameter tells duct to create an example endpoint with HTTP routes - this would be helpful. To finish bootstraping run lein setup inside clojure-web-app directory.

Ok, let’s dive into the code. Component and injection related code should be in system.clj file:

(defn new-system [config]
  (let [config (meta-merge base-config config)]
    (-> (component/system-map
         :app  (handler-component (:app config))
         :http (jetty-server (:http config))
         :example (endpoint-component example-endpoint))
        (component/system-using
         {:http [:app]
          :app  [:example]
          :example []}))))

In the first section you instantiate components without dependencies, which are resolved in the second section. So in this example, “http” component (server) requires “app” (application abstraction), which in turn is injected with “example” (actual routes). If your component needs others, you just can get then by names (precisely: by Clojure keywords).

To start the system you must fire a REPL - interactive environment running within context of your application:

lein repl

After seeing prompt type (go). Application should start, you can visit http://localhost:3000 to see some example page.

A huge benefit of using component approach is that you get fully reloadable application. When you change literally anything - configuration, endpoints, implementation, you can just type (reset) in REPL and your application is up-to-date with the code. It’s a feature of the language, no JRebel, Spring-reloaded needed.

Adding REST endpoint

Ok, in the next step let’s add some basic REST endpoint returning JSON. We need to add 2 dependencies in project.clj file:

:dependencies
 ...
  [ring/ring-json "0.3.1"]
  [cheshire "5.1.1"]

Ring-json adds support for JSON for your routes (in ring it’s called middleware) and cheshire is Clojure JSON parser (like Jackson in Java). Modifying project dependencies if one of the few tasks that require restarting the REPL, so hit CTRL-C and type lein repl again.

To configure JSON middleware we have to add wrap-json-body and wrap-json-response just before wrap-defaults in system.clj:

(:require 
 ...
 [ring.middleware.json :refer [wrap-json-body wrap-json-response]])

(def base-config
   {:app {:middleware [[wrap-not-found :not-found]
                      [wrap-json-body {:keywords? true}]
                      [wrap-json-response]
                      [wrap-defaults :defaults]]

And finally, in endpoint/example.clj we must add some route with JSON response:

(:require 
 ...
 [ring.util.response :refer [response]]))

(defn example-endpoint [config]
  (routes
    (GET "/hello" [] (response {:hello "world"}))
    ...

Reload app with (reset) in REPL and test new route with curl:

curl -v http://localhost:3000/hello

< HTTP/1.1 200 OK
< Date: Tue, 15 Sep 2015 21:17:37 GMT
< Content-Type: application/json; charset=utf-8
< Set-Cookie: ring-session=37c337fb-6bbc-4e65-a060-1997718d03e0;Path=/;HttpOnly
< X-XSS-Protection: 1; mode=block
< X-Frame-Options: SAMEORIGIN
< X-Content-Type-Options: nosniff
< Content-Length: 151
* Server Jetty(9.2.10.v20150310) is not blacklisted
< Server: Jetty(9.2.10.v20150310)
<
* Connection #0 to host localhost left intact
{"hello": "world"}

It works! In case of any problems you can find working version in this commit.

Adding frontend with figwheel

Coding backend in Clojure is great, but what about the frontend? As you may already know, Clojure could be compiled not only to JVM bytecode, but also to Javascript. This may sound familiar if you used e.g. Coffescript. But ClojureScript philosophy is not only to provide some syntax sugar, but improve your development cycle with great tooling and fully interactive development. Let’s see how to achieve it.

The best way to introduce ClojureScript to a project is figweel. First let’s add fighweel plugin and configuration to project.clj:

:plugins
   ...
   [lein-figwheel "0.3.9"]

And cljsbuild configuration:

:cljsbuild
    {:builds [{:id "dev"
               :source-paths ["src-cljs"]
               :figwheel true
               :compiler {:main       "clojure-web-app.core"
                          :asset-path "js/out"
                          :output-to  "resources/public/js/clojure-web-app.js"
                          :output-dir "resources/public/js/out"}}]}

In short this tells ClojureScript compiler to take sources from src-cljs with figweel support and but resulting JavaScript into resources/public/js/clojure-web-app.js file. So we need to include this file in a simple HTML page:

<!DOCTYPE html>
<head>
</head>
<body>
  <div id="main">
  </div>
  <script src="js/clojure-web-app.js" type="text/javascript"></script>
</body>
</html>

To serve this static file we need to change some defaults and add corresponding route. In system.clj change api-defaults to site-defaults both in require section and base-config function. In example.clj add following route:

(GET "/" [] (io/resource "public/index.html")

Again (reset) in REPL window should reload everything.

But where is our ClojureScript source file? Let’s create file core.cljs in src-cljs/clojure-web-app directory:

(ns ^:figwheel-always clojure-web-app.core)

(enable-console-print!)

(println "hello from clojurescript")

Open another terminal and run lein fighweel. It should compile ClojureScript and print ‘Prompt will show when figwheel connects to your application’. Open http://localhost:3000. Fighweel window should prompt:

To quit, type: :cljs/quit
cljs.user=>

Type (js/alert "hello"). Boom! If everything worked you should see and alert in your browser. Open developers console in your browser. You should see hello from clojurescript printed on the console. Change it in core.cljs to (println "fighweel rocks") and save the file. Without reloading the page your should see updated message. Figweel rocks! Again, in case of any problems, refer to this commit.

In the next post I’ll show how to fetch data from MongoDB, serve it with REST to the broser and write ReactJs/Om components to render it. Stay tuned!