Testing NiFi Flow – The good, the bad and the ugly

Introduction

Some time has passed since we wrote our last blogpost about Apache NiFi where we pointed out what could be improved. It’s a very nice tool, so we are still using it, but we’ve found some other things that could be improved to make it even better. Of course, we could write another post where all we do is complain, but does that make the world better? Unfortunately not. So we decided that we could do better than that. We took the most painful issue and implemented a solution – that’s how NiFi Flow Tester was created.

Use case

Let’s assume you have to create a simple flow in NiFi according to some specification your client gave you:

  1. read some XML files from directory
  2. validate them (using XSD file)
  3. convert them from XML to JSON
  4. log if something failed
  5. pass it for further processing if everything went well

after studying documentation and some googling you end up with this flow: First you need to list all the files, then read their content, validate it, convert to JSON and pass it further. Looks great. Now it’s time for some tests – you have two options.

Test in NiFi directly – the bad

To test it manually, you need to copy some file to the input directory, wait to see what happens and if everything went well – you can verify it (again, manually) using ‘View data provenance’. Let’s try this approach. Something went wrong. Can you tell where? Something inside ValidateXml, but what exactly? So now to the LogAttribute -> View provenance data and check record attributes to find this one:

validatexml.invalid.error
The markup in the document following the root element must be well-formed.

Maybe there’s something wrong with XML file that your client gave you?

> cat data.txt
<person>
    <name>Foo</name>
    <type>student</type>
    <age>31</age>
</person>
<person>
    <name>Invalid age</name>
    <type>student</type>
    <age>-11</age>
</person>
<person>
    <name>Bar</name>
    <type>student</type>
    <age>12</age>
</person>

Looks like your client has some strange file format, where each line is a separate XML file. Ok, that’s fine you just need to add one more processor which splits text by each line and do the manual tests AGAIN… but at this point, you should have asked a question – is there a better way to do this? After all, we’re programmers and we love to write code.

Nifi Mock – the ugly

Nifi Mock library comes with a processor testing tool – TestRunner. Let’s use this to test our flow! TestRunner can only run one processor, but we can work it out. Let’s start with first one:

val listFileRunner = TestRunners.newTestRunner(new ListFile)
listFileRunner.setProperty(ListFile.DIRECTORY, s"$testDir/person/")
listFileRunner.run()
val listFileResults = listFileRunner.getFlowFilesForRelationship("success")

We create a runner with the processor, set a directory to read from, run it and get the results from the relationship. That was easy, let’s create another TestRunner with FetchFile processor, enqueue results from the previous step, run and collect results.

val fetchFileRunner = TestRunners.newTestRunner(new FetchFile)
listFileResults.foreach(f => fetchFileRunner.enqueue(f))
fetchFileRunner.run()
val fetchFileResults = fetchFileRunner.getFlowFilesForRelationship("success")

Great! Next one:

val splitTextRunner = TestRunners.newTestRunner(new SplitText)
splitTextRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "1")
fetchFileResults.foreach(f => splitTextRunner.enqueue(f)) 
splitTextRunner.run()
val splitTextResults = splitTextRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS)

Two more processors and we are done. And don’t forget to check all the runners and results names when you are done with copy-pasting this code! That’s not cool. I mean, we love to write code but this is too much boilerplate, isn’t it? I’m sure you saw the pattern here. We saw it too and that’s why we introduced a solution.

Nifi Flow Tester – the good

In our new library we have two ways to create flow. Today we will focus on the simple one, which is better for prototyping. First, we need to create new NifiFlowBuilder() and add some nodes to it:

new NifiFlowBuilder()
  .addNode(
    "ListFile",
    new ListFile,
    Map(ListFile.DIRECTORY.getName -> s"$testDir/person/")
  )
  .addNode("FetchFile", new FetchFile, Map())
  .addNode(
    "SplitText",
    new Split Text,
    Map(SplitText.LINE_SPLIT_COUNT.getName -> "1")
  )
  .addNode(
    "ValidateXml",
    new ValidateXml,
    Map(ValidateXml.SCHEMA_FILE.getName -> s"$testDir/person-schema.xsd")
  )
  .addNode(
    "TransformXml",
    new TransformXml,
    Map(TransformXml.XSLT_FILE_NAME.getName -> s"$testDir/xml-to-json.xsl")
  )

First, you need to specify the node name, which allows you to identify your node. It can be the same as the class name as long as you don’t use the same processor twice in flow. Then of course you need to specify the processor with all the parameters as simple Map[String, String]. The safest way to do this is to use processor’s class fields of type PropertyDescriptor. Unfortunately, they’re not always public, so sometimes you have written their name – instead of ListFile.DIRECTORY.getName we could write Input Directory.

When all the nodes are specified, we can add connections between them using node names. The first parameter is the source node, the second is a destination and the third is the selected relationship. Again, using processor class fields is less error prone but sometimes these fields are not public (for instance ListFile or FetchFile processor)

.addConnection("ListFile", "FetchFile", "success")
.addConnection("FetchFile", "SplitText", "success")
.addConnection("SplitText", "ValidateXml", SplitText.REL_SPLITS)
.addConnection("ValidateXml", "TransformXml", ValidateXml.REL_VALID)
.addOutputConnection("TransformXml", "success")

In the end, we add a connection to the output port to get the results. Everything looks good. Now just call build method to create flow. The entire code looks like this:

val flow = new NifiFlowBuilder()
      .addNode("ListFile", new ListFile, Map(ListFile.DIRECTORY.getName -> s"$testDir/person/"))
      .addNode("FetchFile", new FetchFile, Map())
      .addNode("SplitText", new SplitText, Map(SplitText.LINE_SPLIT_COUNT.getName -> "1"))
      .addNode("ValidateXml", new ValidateXml, Map(ValidateXml.SCHEMA_FILE.getName -> s"$testDir/person-schema.xsd"))
      .addNode("TransformXml", new TransformXml, Map(TransformXml.XSLT_FILE_NAME.getName -> s"$testDir/xml-to-json.xsl"))
      .addConnection("ListFile", "FetchFile", "success")
      .addConnection("FetchFile", "SplitText", "success")
      .addConnection( "SplitText", "ValidateXml", SplitText.REL_SPLITS)
      .addConnection("ValidateXml", "TransformXml", ValidateXml.REL_VALID)
      .addOutputConnection("TransformXml", "success")
      .build()

Now we just need to run the flow, collect the results and verify them:

flow.run() 
val files = flow.executionResult.outputFlowFiles 
files.head.assertContentEquals("""{"person":{"name":"Foo","type":"student","age":31}}""")

Disclaimer: this is not the best way to test if JSON is correct, but it was done for simplicity.

Conclusion

Apache NiFi seems to be perfect unless you start a serious data integration. Without the ability to test your changes fast, you will become extremely frustrated with clicking every time your client notices a bug. Sometimes it may be a bug of the NiFi itself, but you will never know it until you debug the code. That’s something our library can help you with. You can find it here: https://github.com/TouK/plumber

You May Also Like

Visualizing GIS data in JavaFX 2.0 beta using GeoTools

Geographic data mostly comprises of polygon coordinates sets along with attributes, like country or city name, etc. This is quite easy to visualize in JavaFX, which supports rendering for SVG paths. In the article, I show how to read such GIS data from...Geographic data mostly comprises of polygon coordinates sets along with attributes, like country or city name, etc. This is quite easy to visualize in JavaFX, which supports rendering for SVG paths. In the article, I show how to read such GIS data from...

How to automate tests with Groovy 2.0, Spock and Gradle

This is the launch of the 1st blog in my life, so cheers and have a nice reading!

y u no test?

Couple of years ago I wasn't a big fan of unit testing. It was obvious to me that well prepared unit tests are crucial though. I didn't known why exactly crucial yet then. I just felt they are important. My disliking to write automation tests was mostly related to the effort necessary to prepare them. Also a spaghetti code was easily spotted in test sources.

Some goodies at hand

Now I know! Test are crucial to get a better design and a confidence. Confidence to improve without a hesitation. Moreover, now I have the tool to make test automation easy as Sunday morning... I'm talking about the Spock Framework. If you got here probably already know what the Spock is, so I won't introduce it. Enough to say that Spock is an awesome unit testing tool which, thanks to Groovy AST Transformation, simplifies creation of tests greatly.

An obstacle

The point is, since a new major version of Groovy has been released (2.0), there is no matching version of Spock available yet.

What now?

Well, in a matter of fact there is such a version. It's still under development though. It can be obtained from this Maven repository. We can of course use the Maven to build a project and run tests. But why not to go even more "groovy" way? XML is not for humans, is it? Lets use Gradle.

The build file

Update: at the end of the post is updated version of the build file.
apply plugin: 'groovy'
apply plugin: 'idea'

def langLevel = 1.7

sourceCompatibility = langLevel
targetCompatibility = langLevel

group = 'com.tamashumi.example.testwithspock'
version = '0.1'

repositories {
mavenLocal()
mavenCentral()
maven { url 'http://oss.sonatype.org/content/repositories/snapshots/' }
}

dependencies {
groovy 'org.codehaus.groovy:groovy-all:2.0.1'
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0-SNAPSHOT'
}

idea {
project {
jdkName = langLevel
languageLevel = langLevel
}
}
As you can see the build.gradle file is almost self-explanatory. Groovy plugin is applied to compile groovy code. It needs groovy-all.jar - declared in version 2.0 at dependencies block just next to Spock in version 0.7. What's most important, mentioned Maven repository URL is added at repositories block.

Project structure and execution

Gradle's default project directory structure is similar to Maven's one. Unfortunately there is no 'create project' task and you have to create it by hand. It's not a big obstacle though. The structure you will create will more or less look as follows:
<project root>

├── build.gradle
└── src
├── main
│ ├── groovy
└── test
└── groovy
To build a project now you can type command gradle build or gradle test to only run tests.

How about Java?

You can test native Java code with Spock. Just add src/main/java directory and a following line to the build.gradle:
apply plugin: 'java'
This way if you don't want or just can't deploy Groovy compiled stuff into your production JVM for any reason, still whole goodness of testing with Spock and Groovy is at your hand.

A silly-simple example

Just to show that it works, here you go with a basic example.

Java simple example class:

public class SimpleJavaClass {

public int sumAll(int... args) {

int sum = 0;

for (int arg : args){
sum += arg;
}

return sum;
}
}

Groovy simple example class:

class SimpleGroovyClass {

String concatenateAll(char separator, String... args) {

args.join(separator as String)
}
}

The test, uhm... I mean the Specification:

class JustASpecification extends Specification {

@Unroll('Sums integers #integers into: #expectedResult')
def "Can sum different amount of integers"() {

given:
def instance = new SimpleJavaClass()

when:
def result = instance.sumAll(* integers)

then:
result == expectedResult

where:
expectedResult | integers
11 | [3, 3, 5]
8 | [3, 5]
254 | [2, 4, 8, 16, 32, 64, 128]
22 | [7, 5, 6, 2, 2]
}

@Unroll('Concatenates strings #strings with separator "#separator" into: #expectedResult')
def "Can concatenate different amount of integers with a specified separator"() {

given:
def instance = new SimpleGroovyClass()

when:
def result = instance.concatenateAll(separator, * strings)

then:
result == expectedResult

where:
expectedResult | separator | strings
'Whasup dude?' | ' ' as char | ['Whasup', 'dude?']
'2012/09/15' | '/' as char | ['2012', '09', '15']
'nice-to-meet-you' | '-' as char | ['nice', 'to', 'meet', 'you']
}
}
To run tests with Gradle simply execute command gradle test. Test reports can be found at <project root>/build/reports/tests/index.html and look kind a like this.


Please note that, thanks to @Unroll annotation, test is executed once per each parameters row in the 'table' at specification's where: block. This isn't a Java label, but a AST transformation magic.

IDE integration

Gradle's plugin for Iintellij Idea

I've added also Intellij Idea plugin for IDE project generation and some configuration for it (IDE's JDK name). To generate Idea's project files just run command: gradle idea There are available Eclipse and Netbeans plugins too, however I haven't tested them. Idea's one works well.

Intellij Idea's plugins for Gradle

Idea itself has a light Gradle support built-in on its own. To not get confused: Gradle has plugin for Idea and Idea has plugin for Gradle. To get even more 'pluginated', there is also JetGradle plugin within Idea. However I haven't found good reason for it's existence - well, maybe excluding one. It shows dependency tree. There is a bug though - JetGradle work's fine only for lang level 1.6. Strangely all the plugins together do not conflict each other. They even give complementary, quite useful tool set.

Running tests under IDE

Jest to add something sweet this is how Specification looks when run with jUnit  runner under Intellij Idea (right mouse button on JustASpecification class or whole folder of specification extending classes and select "Run ...". You'll see a nice view like this.

Building web application

If you need to build Java web application and bundle it as war archive just add plugin by typing the line
apply plugin: 'war'
in the build.gradle file and create a directory src/main/webapp.

Want to know more?

If you haven't heard about Spock or Gradle before or just curious, check the following links:

What next?

The last thing left is to write the real production code you are about to test. No matter will it be Groovy or Java, I leave this to your need and invention. Of course, you are welcome to post a comments here. I'll answer or even write some more posts about the subject.

Important update

Spock version 0.7 has been released, so the above build file doesn't work anymore. It's easy to fix it though. Just remove last dash and a word SNAPSHOT from Spock dependency declaration. Other important thing is that now spock-core depends on groovy-all-2.0.5, so to avoid dependency conflict groovy dependency should be changed from version 2.0.1 to 2.0.5.
Besides oss.sonata.org snapshots maven repository can be removed. No obstacles any more and the build file now looks as follows:
apply plugin: 'groovy'
apply plugin: 'idea'

def langLevel = 1.7

sourceCompatibility = langLevel
targetCompatibility = langLevel

group = 'com.tamashumi.example.testwithspock'
version = '0.1'

repositories {
mavenLocal()
mavenCentral()
}

dependencies {
groovy 'org.codehaus.groovy:groovy-all:2.0.5'
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0'
}

idea {
project {
jdkName = langLevel
languageLevel = langLevel
}
}