Testing NiFi Flow – The good, the bad and the ugly

Introduction

Some time has passed since we wrote our last blogpost about Apache NiFi where we pointed out what could be improved. It’s a very nice tool, so we are still using it, but we’ve found some other things that could be improved to make it even better. Of course, we could write another post where all we do is complain, but does that make the world better? Unfortunately not. So we decided that we could do better than that. We took the most painful issue and implemented a solution – that’s how NiFi Flow Tester was created.

Use case

Let’s assume you have to create a simple flow in NiFi according to some specification your client gave you:

  1. read some XML files from directory
  2. validate them (using XSD file)
  3. convert them from XML to JSON
  4. log if something failed
  5. pass it for further processing if everything went well

after studying documentation and some googling you end up with this flow: First you need to list all the files, then read their content, validate it, convert to JSON and pass it further. Looks great. Now it’s time for some tests – you have two options.

Test in NiFi directly – the bad

To test it manually, you need to copy some file to the input directory, wait to see what happens and if everything went well – you can verify it (again, manually) using ‘View data provenance’. Let’s try this approach. Something went wrong. Can you tell where? Something inside ValidateXml, but what exactly? So now to the LogAttribute -> View provenance data and check record attributes to find this one:

validatexml.invalid.error
The markup in the document following the root element must be well-formed.

Maybe there’s something wrong with XML file that your client gave you?

> cat data.txt
<person>
    <name>Foo</name>
    <type>student</type>
    <age>31</age>
</person>
<person>
    <name>Invalid age</name>
    <type>student</type>
    <age>-11</age>
</person>
<person>
    <name>Bar</name>
    <type>student</type>
    <age>12</age>
</person>

Looks like your client has some strange file format, where each line is a separate XML file. Ok, that’s fine you just need to add one more processor which splits text by each line and do the manual tests AGAIN… but at this point, you should have asked a question – is there a better way to do this? After all, we’re programmers and we love to write code.

Nifi Mock – the ugly

Nifi Mock library comes with a processor testing tool – TestRunner. Let’s use this to test our flow! TestRunner can only run one processor, but we can work it out. Let’s start with first one:

val listFileRunner = TestRunners.newTestRunner(new ListFile)
listFileRunner.setProperty(ListFile.DIRECTORY, s"$testDir/person/")
listFileRunner.run()
val listFileResults = listFileRunner.getFlowFilesForRelationship("success")

We create a runner with the processor, set a directory to read from, run it and get the results from the relationship. That was easy, let’s create another TestRunner with FetchFile processor, enqueue results from the previous step, run and collect results.

val fetchFileRunner = TestRunners.newTestRunner(new FetchFile)
listFileResults.foreach(f => fetchFileRunner.enqueue(f))
fetchFileRunner.run()
val fetchFileResults = fetchFileRunner.getFlowFilesForRelationship("success")

Great! Next one:

val splitTextRunner = TestRunners.newTestRunner(new SplitText)
splitTextRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "1")
fetchFileResults.foreach(f => splitTextRunner.enqueue(f)) 
splitTextRunner.run()
val splitTextResults = splitTextRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS)

Two more processors and we are done. And don’t forget to check all the runners and results names when you are done with copy-pasting this code! That’s not cool. I mean, we love to write code but this is too much boilerplate, isn’t it? I’m sure you saw the pattern here. We saw it too and that’s why we introduced a solution.

Nifi Flow Tester – the good

In our new library we have two ways to create flow. Today we will focus on the simple one, which is better for prototyping. First, we need to create new NifiFlowBuilder() and add some nodes to it:

new NifiFlowBuilder()
  .addNode(
    "ListFile",
    new ListFile,
    Map(ListFile.DIRECTORY.getName -> s"$testDir/person/")
  )
  .addNode("FetchFile", new FetchFile, Map())
  .addNode(
    "SplitText",
    new Split Text,
    Map(SplitText.LINE_SPLIT_COUNT.getName -> "1")
  )
  .addNode(
    "ValidateXml",
    new ValidateXml,
    Map(ValidateXml.SCHEMA_FILE.getName -> s"$testDir/person-schema.xsd")
  )
  .addNode(
    "TransformXml",
    new TransformXml,
    Map(TransformXml.XSLT_FILE_NAME.getName -> s"$testDir/xml-to-json.xsl")
  )

First, you need to specify the node name, which allows you to identify your node. It can be the same as the class name as long as you don’t use the same processor twice in flow. Then of course you need to specify the processor with all the parameters as simple Map[String, String]. The safest way to do this is to use processor’s class fields of type PropertyDescriptor. Unfortunately, they’re not always public, so sometimes you have written their name – instead of ListFile.DIRECTORY.getName we could write Input Directory.

When all the nodes are specified, we can add connections between them using node names. The first parameter is the source node, the second is a destination and the third is the selected relationship. Again, using processor class fields is less error prone but sometimes these fields are not public (for instance ListFile or FetchFile processor)

.addConnection("ListFile", "FetchFile", "success")
.addConnection("FetchFile", "SplitText", "success")
.addConnection("SplitText", "ValidateXml", SplitText.REL_SPLITS)
.addConnection("ValidateXml", "TransformXml", ValidateXml.REL_VALID)
.addOutputConnection("TransformXml", "success")

In the end, we add a connection to the output port to get the results. Everything looks good. Now just call build method to create flow. The entire code looks like this:

val flow = new NifiFlowBuilder()
      .addNode("ListFile", new ListFile, Map(ListFile.DIRECTORY.getName -> s"$testDir/person/"))
      .addNode("FetchFile", new FetchFile, Map())
      .addNode("SplitText", new SplitText, Map(SplitText.LINE_SPLIT_COUNT.getName -> "1"))
      .addNode("ValidateXml", new ValidateXml, Map(ValidateXml.SCHEMA_FILE.getName -> s"$testDir/person-schema.xsd"))
      .addNode("TransformXml", new TransformXml, Map(TransformXml.XSLT_FILE_NAME.getName -> s"$testDir/xml-to-json.xsl"))
      .addConnection("ListFile", "FetchFile", "success")
      .addConnection("FetchFile", "SplitText", "success")
      .addConnection( "SplitText", "ValidateXml", SplitText.REL_SPLITS)
      .addConnection("ValidateXml", "TransformXml", ValidateXml.REL_VALID)
      .addOutputConnection("TransformXml", "success")
      .build()

Now we just need to run the flow, collect the results and verify them:

flow.run() 
val files = flow.executionResult.outputFlowFiles 
files.head.assertContentEquals("""{"person":{"name":"Foo","type":"student","age":31}}""")

Disclaimer: this is not the best way to test if JSON is correct, but it was done for simplicity.

Conclusion

Apache NiFi seems to be perfect unless you start a serious data integration. Without the ability to test your changes fast, you will become extremely frustrated with clicking every time your client notices a bug. Sometimes it may be a bug of the NiFi itself, but you will never know it until you debug the code. That’s something our library can help you with. You can find it here: https://github.com/TouK/plumber

You May Also Like

Atom Feeds with Spring MVC

How to add feeds (Atom) to your web application with just two classes?
How about Spring MVC?

Here are my assumptions:
  • you are using Spring framework
  • you have some entity, say “News”, that you want to publish in your feeds
  • your "News" entity has creationDate, title, and shortDescription
  • you have some repository/dao, say "NewsRepository", that will return the news from your database
  • you want to write as little as possible
  • you don't want to format Atom (xml) by hand
You actually do NOT need to use Spring MVC in your application already. If you do, skip to step 3.


Step 1: add Spring MVC dependency to your application
With maven that will be:
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webmvc</artifactId>
    <version>3.1.0.RELEASE</version>
</dependency>

Step 2: add Spring MVC DispatcherServlet
With web.xml that would be:
<servlet>
    <servlet-name>dispatcher</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>classpath:spring-mvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
    <servlet-name>dispatcher</servlet-name>
    <url-pattern>/feed</url-pattern>
</servlet-mapping>
Notice, I set the url-pattern to “/feed” which means I don't want Spring MVC to handle any other urls in my app (I'm using a different web framework for the rest of the app). I also give it a brand new contextConfigLocation, where only the mvc configuration is kept.

Remember that, when you add a DispatcherServlet to an app that already has Spring (from ContextLoaderListener for example), your context is inherited from the global one, so you should not create beans that exist there again, or include xml that defines them. Watch out for Spring context getting up twice, and refer to spring or servlet documentation to understand what's happaning.

Step 3. add ROME – a library to handle Atom format
With maven that is:
<dependency>
    <groupId>net.java.dev.rome</groupId>
    <artifactId>rome</artifactId>
    <version>1.0.0</version>
</dependency>

Step 4. write your very simple controller
@Controller
public class FeedController {
    static final String LAST_UPDATE_VIEW_KEY = "lastUpdate";
    static final String NEWS_VIEW_KEY = "news";
    private NewsRepository newsRepository;
    private String viewName;

    protected FeedController() {} //required by cglib

    public FeedController(NewsRepository newsRepository, String viewName) {
        notNull(newsRepository); hasText(viewName);
        this.newsRepository = newsRepository;
        this.viewName = viewName;
    }

    @RequestMapping(value = "/feed", method = RequestMethod.GET)        
    @Transactional
    public ModelAndView feed() {
        ModelAndView modelAndView = new ModelAndView();
        modelAndView.setViewName(viewName);
        List<News> news = newsRepository.fetchPublished();
        modelAndView.addObject(NEWS_VIEW_KEY, news);
        modelAndView.addObject(LAST_UPDATE_VIEW_KEY, getCreationDateOfTheLast(news));
        return modelAndView;
    }

    private Date getCreationDateOfTheLast(List<News> news) {
        if(news.size() > 0) {
            return news.get(0).getCreationDate();
        }
        return new Date(0);
    }
}
And here's a test for it, in case you want to copy&paste (who doesn't?):
@RunWith(MockitoJUnitRunner.class)
public class FeedControllerShould {
    @Mock private NewsRepository newsRepository;
    private Date FORMER_ENTRY_CREATION_DATE = new Date(1);
    private Date LATTER_ENTRY_CREATION_DATE = new Date(2);
    private ArrayList<News> newsList;
    private FeedController feedController;

    @Before
    public void prepareNewsList() {
        News news1 = new News().title("title1").creationDate(FORMER_ENTRY_CREATION_DATE);
        News news2 = new News().title("title2").creationDate(LATTER_ENTRY_CREATION_DATE);
        newsList = newArrayList(news2, news1);
    }

    @Before
    public void prepareFeedController() {
        feedController = new FeedController(newsRepository, "viewName");
    }

    @Test
    public void returnViewWithNews() {
        //given
        given(newsRepository.fetchPublished()).willReturn(newsList);
        
        //when
        ModelAndView modelAndView = feedController.feed();
        
        //then
        assertThat(modelAndView.getModel())
                .includes(entry(FeedController.NEWS_VIEW_KEY, newsList));
    }

    @Test
    public void returnViewWithLastUpdateTime() {
        //given
        given(newsRepository.fetchPublished()).willReturn(newsList);

        //when
        ModelAndView modelAndView = feedController.feed();

        //then
        assertThat(modelAndView.getModel())
                .includes(entry(FeedController.LAST_UPDATE_VIEW_KEY, LATTER_ENTRY_CREATION_DATE));
    }

    @Test
    public void returnTheBeginningOfTimeAsLastUpdateInViewWhenListIsEmpty() {
        //given
        given(newsRepository.fetchPublished()).willReturn(new ArrayList<News>());

        //when
        ModelAndView modelAndView = feedController.feed();

        //then
        assertThat(modelAndView.getModel())
                .includes(entry(FeedController.LAST_UPDATE_VIEW_KEY, new Date(0)));
    }
}
Notice: here, I'm using fest-assert and mockito. The dependencies are:
<dependency>
 <groupId>org.easytesting</groupId>
 <artifactId>fest-assert</artifactId>
 <version>1.4</version>
 <scope>test</scope>
</dependency>
<dependency>
 <groupId>org.mockito</groupId>
 <artifactId>mockito-all</artifactId>
 <version>1.8.5</version>
 <scope>test</scope>
</dependency>

Step 5. write your very simple view
Here's where all the magic formatting happens. Be sure to take a look at all the methods of Entry class, as there is quite a lot you may want to use/fill.
import org.springframework.web.servlet.view.feed.AbstractAtomFeedView;
[...]

public class AtomFeedView extends AbstractAtomFeedView {
    private String feedId = "tag:yourFantastiSiteName";
    private String title = "yourFantastiSiteName: news";
    private String newsAbsoluteUrl = "http://yourfanstasticsiteUrl.com/news/"; 

    @Override
    protected void buildFeedMetadata(Map<String, Object> model, Feed feed, HttpServletRequest request) {
        feed.setId(feedId);
        feed.setTitle(title);
        setUpdatedIfNeeded(model, feed);
    }

    private void setUpdatedIfNeeded(Map<String, Object> model, Feed feed) {
        @SuppressWarnings("unchecked")
        Date lastUpdate = (Date)model.get(FeedController.LAST_UPDATE_VIEW_KEY);
        if (feed.getUpdated() == null || lastUpdate != null || lastUpdate.compareTo(feed.getUpdated()) > 0) {
            feed.setUpdated(lastUpdate);
        }
    }

    @Override
    protected List<Entry> buildFeedEntries(Map<String, Object> model, HttpServletRequest request, HttpServletResponse response) throws Exception {
        @SuppressWarnings("unchecked")
        List<News> newsList = (List<News>)model.get(FeedController.NEWS_VIEW_KEY);
        List<Entry> entries = new ArrayList<Entry>();
        for (News news : newsList) {
            addEntry(entries, news);
        }
        return entries;
    }

    private void addEntry(List<Entry> entries, News news) {
        Entry entry = new Entry();
        entry.setId(feedId + ", " + news.getId());
        entry.setTitle(news.getTitle());
        entry.setUpdated(news.getCreationDate());
        entry = setSummary(news, entry);
        entry = setLink(news, entry);
        entries.add(entry);
    }

    private Entry setSummary(News news, Entry entry) {
        Content summary = new Content();
        summary.setValue(news.getShortDescription());
        entry.setSummary(summary);
        return entry;
    }

    private Entry setLink(News news, Entry entry) {
        Link link = new Link();
        link.setType("text/html");
        link.setHref(newsAbsoluteUrl + news.getId()); //because I have a different controller to show news at http://yourfanstasticsiteUrl.com/news/ID
        entry.setAlternateLinks(newArrayList(link));
        return entry;
    }

}

Step 6. add your classes to your Spring context
I'm using xml approach. because I'm old and I love xml. No, seriously, I use xml because I may want to declare FeedController a few times with different views (RSS 1.0, RSS 2.0, etc.).

So this is the forementioned spring-mvc.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver">
        <property name="mediaTypes">
            <map>
                <entry key="atom" value="application/atom+xml"/>
                <entry key="html" value="text/html"/>
            </map>
        </property>
        <property name="viewResolvers">
            <list>
                <bean class="org.springframework.web.servlet.view.BeanNameViewResolver"/>
            </list>
        </property>
    </bean>

    <bean class="eu.margiel.pages.confitura.feed.FeedController">
        <constructor-arg index="0" ref="newsRepository"/>
        <constructor-arg index="1" value="atomFeedView"/>
    </bean>

    <bean id="atomFeedView" class="eu.margiel.pages.confitura.feed.AtomFeedView"/>
</beans>

And you are done.

I've been asked a few times before to put all the working code in some public repo, so this time it's the other way around. I've describe things that I had already published, and you can grab the commit from the bitbucket.

Hope that helps.