TouK Nussknacker – using Apache Flink made easier for analysts and business

Few weeks ago we (TouK) revealed on Github our latest open source project – Nussknacker. What is it and why should you care?

Why?

First, some history: more than year ago one of our clients decided it’s high time for Real Time Marketing. They had pretty large data streams and lots of ideas for marketing campaigns, so one of the key success factors was the ability to process the data fast. We prepared a POC based on Apache Flink and it turned out that it’s really great piece of technology – fast and accurate.

There was just one problem – how to write and customize processes? Flink, as many modern stream processing engines, provides rich and friendly DSL, both in Java and Scala. But for our client that was not really enough. See, they are enterprise that do not employ developers – they have many decent, competent analysts who know the business and SQL – but not Java or (Heaven forbid…) Scala.

Nussknacker to the rescue!

So we decided to create a simple process designer for them. It looks more or less like this:

You draw a diagram, fill in the details (like filter expressions – “#input.usageData > 0” or SMS/mail content), then you press the Deploy button and voilà – your brand new process in running on Flink cluster!

Of course first somebody (that is, developer) has to prepare data sources (most probably Kafka topics), design data model (POJOs or case classes) and implement actions – like sending emails or sending some events to another Kafka topic. But once a model of data and external services are defined, an analyst can define and deploy processes all by him/herself.

More features

Sounds a bit scary to let your users run stream processes with GUI? Bad filter conditions can have serious performance implications if you deal with streams of tens of thousands of events per second… That’s why we let user test their diagrams first – each test case can be generated by sample of real data coming from e.g. Kafka and then run in Flink sandbox mini-cluster.

We have also many more features that make working with Nussknacker and Flink easier: subprocesses, versioning, generating PDF documentation, migration between environments and last but certainly not least – integration with InfluxDB/Grafana to provide detailed insight into how process is doing:

Where can I use it?

What are Nussknacker use cases? Our main deployments deal with RTM (Real Time Marketing). One of our clients started with RTM and then found out that Nussknacker is also great choice for fraud detection in real time. Industries we are working with include telcos, banks and media companies. We are also thinking about other possibilities – for example IoT.

Sounds cool?

If you are interested in easy access for semi-technical analysts to streaming data – give Nussknacker a try!

You can find the code at Github: https://github.com/touk/nussknacker, we also have a nice, Docker-based quickstart: https://touk.github.io/nussknacker/Quickstart.html.

And if you are coming to Flink Forward next week in Berlin – join me on Wednesday afternoon to hear more – .

In next days/weeks we’ll post more information on TouK blog both on Nussknacker architecture and internals and on interesting use cases – stay tuned :)

You May Also Like

Log4j and MDC in Grails

Log4j provides very useful feature: MDC - mapped diagnostic context. It can be used to store data in context of current thread. It may sound scary a bit but idea is simple.

My post is based on post http://burtbeckwith.com/blog/?p=521 from Burt Beckwith's excellent blog, it's definitely worth checking if you are interested in Grails.

Short background story...


Suppose we want to do logging our brand new shopping system and we want to have in each log customer's shopping basket number. And our system can be used at once by many users who can perform many transactions, actions like adding items and so on. How can we achieve that? Of course we can add basket number in every place where we do some logging but this task would be boring and error-prone. 

Instead of this we can use MDC to store variable with basket number in map. 

In fact MDC can be treated as map of custom values for current thread that can be used by logger. 


How to do that with Grails?


Using MDC with Grails is quite simple. All we need to do is to create our own custom filter which works for given urls and puts our data in MDC.

Filters in Grails are classes in directory grails-app/conf/* which names end with *Filters.groovy postfix. We can create this class manually or use Grails command: 
grails create-filters info.rnowak.App.Basket

In result class named BasketFilters will be created in grails-app/conf/info/rnowak/UberApp.

Initially filter class looks a little bit empty:
class BasketFilters {
def filters = {
all(controller:'*', action:'*') {
before = {

}
after = { Map model ->

}
afterView = { Exception e ->

}
}
}
}
All we need to do is fill empty closures, modify filter properties and put some data into MDC.

all is the general name of our filter, as class BasketFilters (plural!) can contain many various filters. You can name it whatever you want, for this post let assume it will be named basketFilter

Another thing is change of filter parameters. According to official documentation (link) we can customize our filter in many ways. You can specify controller to be filtered, its actions, filtered urls and so on. In our example you can stay with default option where filter is applied to every action of every controller. If you are interested in filtering only some urls, use uri parameter with expression describing desired urls to be filtered.

Three closures that are already defined in template have their function and they are started in these conditions:

  • before - as name says, it is executed before filtered action takes place
  • after - similarly, it is called after the action
  • afterView - called after rendering of the actions view
Ok, so now we know what are these mysterious methods and when they are called. But what can be done within them? In official Grails docs (link again) under section 7.6.3 there is a list of properties that are available to use in filter.

With that knowledge, we can proceed to implementing filter.

Putting something into MDC in filter


What we want to do is quite easy: we want to retrieve basket number from parameters and put it into MDC in our filter:
class BasketFilters {
def filters = {
basketFilter(controller:'*', action:'*') {
before = {
MDC.put("basketNumber", params.basketNumber ?: "")
}
after = { Map model ->
MDC.remove("basketNumber")
}
}
}
}

We retrieve basket number from Grails params map and then we put in map under specified key ("basketNumber" in this case), which will be later used in logger conversion pattern. It is important to remove custom value after processing of action to avoid leaks.

So we are putting something into MDC. But how make use of it in logs?


We can refer to custom data in MDC in conversion patter using syntax: %X{key}, where key is our key we used in filter to put data, like:
def conversionPattern = "%d{yyyy-MM-dd HH:mm:ss} %-5p %t [%c{1}] %X{basketNumber} - %m%n"


And that's it :) We've put custom data in log4j MDC and successfully used it in logs to display interesting values.