TouK Nussknacker – using Apache Flink made easier for analysts and business

Few weeks ago we (TouK) revealed on Github our latest open source project – Nussknacker. What is it and why should you care?

Why?

First, some history: more than year ago one of our clients decided it’s high time for Real Time Marketing. They had pretty large data streams and lots of ideas for marketing campaigns, so one of the key success factors was the ability to process the data fast. We prepared a POC based on Apache Flink and it turned out that it’s really great piece of technology – fast and accurate.

There was just one problem – how to write and customize processes? Flink, as many modern stream processing engines, provides rich and friendly DSL, both in Java and Scala. But for our client that was not really enough. See, they are enterprise that do not employ developers – they have many decent, competent analysts who know the business and SQL – but not Java or (Heaven forbid…) Scala.

Nussknacker to the rescue!

So we decided to create a simple process designer for them. It looks more or less like this:

You draw a diagram, fill in the details (like filter expressions – “#input.usageData > 0” or SMS/mail content), then you press the Deploy button and voilà – your brand new process in running on Flink cluster!

Of course first somebody (that is, developer) has to prepare data sources (most probably Kafka topics), design data model (POJOs or case classes) and implement actions – like sending emails or sending some events to another Kafka topic. But once a model of data and external services are defined, an analyst can define and deploy processes all by him/herself.

More features

Sounds a bit scary to let your users run stream processes with GUI? Bad filter conditions can have serious performance implications if you deal with streams of tens of thousands of events per second… That’s why we let user test their diagrams first – each test case can be generated by sample of real data coming from e.g. Kafka and then run in Flink sandbox mini-cluster.

We have also many more features that make working with Nussknacker and Flink easier: subprocesses, versioning, generating PDF documentation, migration between environments and last but certainly not least – integration with InfluxDB/Grafana to provide detailed insight into how process is doing:

Where can I use it?

What are Nussknacker use cases? Our main deployments deal with RTM (Real Time Marketing). One of our clients started with RTM and then found out that Nussknacker is also great choice for fraud detection in real time. Industries we are working with include telcos, banks and media companies. We are also thinking about other possibilities – for example IoT.

Sounds cool?

If you are interested in easy access for semi-technical analysts to streaming data – give Nussknacker a try!

You can find the code at Github: https://github.com/touk/nussknacker, we also have a nice, Docker-based quickstart: https://touk.github.io/nussknacker/Quickstart.html.

And if you are coming to Flink Forward next week in Berlin – join me on Wednesday afternoon to hear more – .

In next days/weeks we’ll post more information on TouK blog both on Nussknacker architecture and internals and on interesting use cases – stay tuned :)

You May Also Like

Thought static method can’t be easy to mock, stub nor track? Wrong!

No matter why, no matter is it a good idea. Sometimes one just wants to check or it's necessary to be done. Mock a static method, woot? Impossibru!

In pure Java world it is still a struggle. But Groovy allows you to do that really simple. Well, not groovy alone, but with a great support of Spock.

Lets move on straight to the example. To catch some context we have an abstract for the example needs. A marketing project with a set of offers. One to many.

import spock.lang.Specification

class OfferFacadeSpec extends Specification {

    OfferFacade facade = new OfferFacade()

    def setup() {
        GroovyMock(Project, global: true)
    }

    def 'delegates an add offer call to the domain with proper params'() {
        given:
            Map params = [projId: projectId, name: offerName]

        when:
            Offer returnedOffer = facade.add(params)

        then:
            1 * Project.addOffer(projectId, _) >> { projId, offer -> offer }
            returnedOffer.name == params.name

        where:
            projectId | offerName
            1         | 'an Offer'
            15        | 'whasup!?'
            123       | 'doskonała oferta - kup teraz!'
    }
}
So we test a facade responsible for handling "add offer to the project" call triggered  somewhere in a GUI.
We want to ensure that static method Project.addOffer(long, Offer) will receive correct params when java.util.Map with user form input comes to the facade.add(params).
This is unit test, so how Project.addOffer() works is out of scope. Thus we want to stub it.

The most important is a GroovyMock(Project, global: true) statement.
What it does is modifing Project class to behave like a Spock's mock. 
GroovyMock() itself is a method inherited from SpecificationThe global flag is necessary to enable mocking static methods.
However when one comes to the need of mocking static method, author of Spock Framework advice to consider redesigning of implementation. It's not a bad advice, I must say.

Another important thing are assertions at then: block. First one checks an interaction, if the Project.addOffer() method was called exactly once, with a 1st argument equal to the projectId and some other param (we don't have an object instance yet to assert anything about it).
Right shit operator leads us to the stub which replaces original method implementation by such statement.
As a good stub it does nothing. The original method definition has return type Offer. The stub needs to do the same. So an offer passed as the 2nd argument is just returned.
Thanks to this we can assert about name property if it's equal with the value from params. If no return was designed the name could be checked inside the stub Closure, prefixed with an assert keyword.

Worth of  mentioning is that if you want to track interactions of original static method implementation without replacing it, then you should try using GroovySpy instead of GroovyMock.

Unfortunately static methods declared at Java object can't be treated in such ways. Though regular mocks and whole goodness of Spock can be used to test pure Java code, which is awesome anyway :)No matter why, no matter is it a good idea. Sometimes one just wants to check or it's necessary to be done. Mock a static method, woot? Impossibru!

In pure Java world it is still a struggle. But Groovy allows you to do that really simple. Well, not groovy alone, but with a great support of Spock.

Lets move on straight to the example. To catch some context we have an abstract for the example needs. A marketing project with a set of offers. One to many.

import spock.lang.Specification

class OfferFacadeSpec extends Specification {

    OfferFacade facade = new OfferFacade()

    def setup() {
        GroovyMock(Project, global: true)
    }

    def 'delegates an add offer call to the domain with proper params'() {
        given:
            Map params = [projId: projectId, name: offerName]

        when:
            Offer returnedOffer = facade.add(params)

        then:
            1 * Project.addOffer(projectId, _) >> { projId, offer -> offer }
            returnedOffer.name == params.name

        where:
            projectId | offerName
            1         | 'an Offer'
            15        | 'whasup!?'
            123       | 'doskonała oferta - kup teraz!'
    }
}
So we test a facade responsible for handling "add offer to the project" call triggered  somewhere in a GUI.
We want to ensure that static method Project.addOffer(long, Offer) will receive correct params when java.util.Map with user form input comes to the facade.add(params).
This is unit test, so how Project.addOffer() works is out of scope. Thus we want to stub it.

The most important is a GroovyMock(Project, global: true) statement.
What it does is modifing Project class to behave like a Spock's mock. 
GroovyMock() itself is a method inherited from SpecificationThe global flag is necessary to enable mocking static methods.
However when one comes to the need of mocking static method, author of Spock Framework advice to consider redesigning of implementation. It's not a bad advice, I must say.

Another important thing are assertions at then: block. First one checks an interaction, if the Project.addOffer() method was called exactly once, with a 1st argument equal to the projectId and some other param (we don't have an object instance yet to assert anything about it).
Right shit operator leads us to the stub which replaces original method implementation by such statement.
As a good stub it does nothing. The original method definition has return type Offer. The stub needs to do the same. So an offer passed as the 2nd argument is just returned.
Thanks to this we can assert about name property if it's equal with the value from params. If no return was designed the name could be checked inside the stub Closure, prefixed with an assert keyword.

Worth of  mentioning is that if you want to track interactions of original static method implementation without replacing it, then you should try using GroovySpy instead of GroovyMock.

Unfortunately static methods declared at Java object can't be treated in such ways. Though regular mocks and whole goodness of Spock can be used to test pure Java code, which is awesome anyway :)