TouK Nussknacker – using Apache Flink made easier for analysts and business

Few weeks ago we (TouK) revealed on Github our latest open source project - Nussknacker. What is it and why should you care?

Why?


First, some history: more than year ago one of our clients decided it’s high time for Real Time Marketing. They had pretty large data streams and lots of ideas for marketing campaigns, so one of the key success factors was the ability to process the data fast. We prepared a POC based on Apache Flink and it turned out that it’s really great piece of technology - fast and accurate.

There was just one problem - how to write and customize processes? Flink, as many modern stream processing engines, provides rich and friendly DSL, both in Java and Scala. But for our client that was not really enough. See, they are enterprise that do not employ developers - they have many decent, competent analysts who know the business and SQL - but not Java or (Heaven forbid…) Scala.

Nussknacker to the rescue!


So we decided to create simple process designer for them. It looks more or less like this:

editor.png

You draw a diagram, fill the details (like filter expressions - “#input.usageData > 0” or SMS/mail content), then you press the Deploy button and voilà - your brand new process in running on Flink cluster!

Of course first somebody (that is, developer) has to prepare data sources (most probably Kafka topics), design data model (POJOs or case classes) and implement actions - like sending email or sending some event to another Kafka topic. But once model of data and external services are defined, an analyst can define and deploy processes all by him/herself.

More features


Sounds a bit scary to let your users run stream processes with GUI? Bad filter condition can have serious performance implications if you’re dealing with streams of tens of thousands of events per second... That’s why we let user test their diagrams first - each test case can be generated by sample of real data coming from e.g. Kafka and then run in Flink sandbox mini-cluster.

We have also many more features that make working with Nussknacker and Flink easier: subprocesses, versioning, generating PDF documentation, migration between environments and last but certainly not least - integration with InfluxDB/Grafana to provide detailed insight into how process is doing:

monitoring.png

Where can I use it?


What are Nussknacker use cases? Our main deployments deal with RTM (Real Time Marketing). One of our clients started with RTM and then found out that Nussknacker is also great choice for fraud detection in real time. Industries we are working with include telcos, banks and media companies. We are also thinking about other possibilities - for example IoT.

Sounds cool?


If you are interested in easy access for semi-technical analysts to streaming data - give Nussknacker a try!

You can find the code at Github: https://github.com/touk/nussknacker, we also have a nice, Docker-based quickstart: https://touk.github.io/nussknacker/Quickstart.html.

And if you are coming to Flink Forward next week in Berlin - join me on Wednesday afternoon to hear more - https://berlin.flink-forward.org/kb_sessions/touk-nussknacker-creating-flink-jobs-with-gui/.

In next days/weeks we’ll post more information on TouK blog both on Nussknacker architecture and internals and on interesting use cases - stay tuned :)

Need to make a quick json fixes – JSONPath for rescue

From time to time I have a need to do some fixes in my json data. In a world of flat files I do this with grep/sed/awk tool chain. How to handle it for JSON? Searching for a solution I came across the JSONPath. It quite mature tool (from 2007) but I haven't hear about it so I decided to share my experience with others. First of all you can try it without pain online: http://jsonpath.curiousconcept.com/. Full syntax is described at http://goessner.net/articles/JsonPath/ But also you can download python binding and run it from command line:
$ sudo apt-get install python-jsonpath-rw
$ sudo apt-get install python-setuptools
$ sudo easy_install -U jsonpath
After that you can use inside python or with simple cli wrapper:
#!/usr/bin/python
import sys, json, jsonpath

path = sys.argv[1]

result = jsonpath.jsonpath(json.load(sys.stdin), path)
print json.dumps(result, indent=2)
… you can use it in your shell e.g. for json:
{
  "store": {
    "book": [
      {
        "category": "reference",
        "author": "Nigel Rees",
        "title": "Sayings of the Century",
        "price": 8.95
      },
      {
        "category": "fiction",
        "author": "Evelyn Waugh",
        "title": "Sword of Honour",
        "price": 12.99
      },
      {
        "category": "fiction",
        "author": "Herman Melville",
        "title": "Moby Dick",
        "isbn": "0-553-21311-3",
        "price": 8.99
      },
      {
        "category": "fiction",
        "author": "J. R. R. Tolkien",
        "title": "The Lord of the Rings",
        "isbn": "0-395-19395-8",
        "price": 22.99
      }
    ],
    "bicycle": {
      "color": "red",
      "price": 19.95
    }
  }
}
You can print only book nodes with price lower than 10 by:
$ jsonpath '$..book[?(@.price < 10)]' < books.json
Result:
[
  {
    "category": "reference",
    "price": 8.95,
    "title": "Sayings of the Century",
    "author": "Nigel Rees"
  },
  {
    "category": "fiction",
    "price": 8.99,
    "title": "Moby Dick",
    "isbn": "0-553-21311-3",
    "author": "Herman Melville"
  }
]
Have a nice JSON hacking!

Enums for scala

Scala has very limited implementation of Enumeration. Enumerated objects can't extends other classes. Partial replacement for it is to use sealed classes. You can do pattern matching on them. When you ommit some possible value you will get compiler warning for not exhaustive pattern matching. One missing feature is that you can't get sorted values of all objects extending them. You can simple got it using my (40-lines) EnumOf class from scala-enum. Examples below.

Declaration

sealed abstract class Color(red: Double, green: Double, blue: Double)

object Color extends EnumOf[Color] {
case object Red extends Color(1, 0, 0)
case object Green extends Color(0, 1, 0)
case object Blue extends Color(0, 0, 1)
case object White extends Color(0, 0, 0)
case object Black extends Color(1, 1, 1)
}

Usage

Color.values shouldEqual List(Red, Green, Blue, White, Black)

Color.valueOfOpt(
"Blue").value shouldEqual Blue
Color.valueOfOpt(
"NotExisiting").isEmpty shouldBe true

You can also enumerate on objects nested in instances

Declaration

case class DistanceFrom(srcCity: String, srcCoordinates: Coordinate) extends EnumOf[DistanceBetween] {

case object ToBerlin extends DistanceFromSrcCityTo("Berlin", Coordinate(52.5075419, 13.4251364))
case object ToNewYork extends DistanceFromSrcCityTo("New York", Coordinate(40.7033127, -73.979681))

abstract class DistanceFromSrcCityTo(val destCity: String, val destCoordinates: Coordinate) extends DistanceBetween {
override def srcCoordinates: Coordinate = DistanceFrom.this.srcCoordinates
}
}

sealed abstract class DistanceBetween {
def srcCoordinates: Coordinate

def destCity: String
def destCoordinates: Coordinate

def inKm: Int = Coordinate.distanceInKm(srcCoordinates, destCoordinates).toInt
}

Usage

val DistanceFromWarsaw = DistanceFrom("Warsaw", Coordinate(52.232938, 21.0611941))

DistanceFromWarsaw.ToBerlin.inKm shouldEqual
519
DistanceFromWarsaw.ToNewYork.inKm shouldEqual 6856

DistanceFromWarsaw.values.map(_.inKm) shouldEqual List(519, 6856)

micro-burn has Trello integration

After a few long evenings I've finally integrated micro-burn with Trello. All you need to run it for your Trello board is to write short configuration and run fat jar. It renders burndown chart visualising progress of cards on your board.

You can specify story points adding them in curly braces inside card title, use Scrum for Trello browser extension or define default story points number for user stories. Completed checklist items are treated as a part of work done inside card. You can manage sprints on your own: creating new, specifying start/end/name, finishing or turn on full automatic mode: sprints will be created periodically.

Sprint management in usage:

Sample for lift-ng: Micro-burn 1.0.0 released

During a last few evenings in my free time I've worked on mini-application called micro-burn. The idea of it appear from work with Agile Jira in our commercial project. This is a great tool for agile projects management. It has inline tasks edition, drag & drop board, reports and many more, but it also have a few drawbacks that turn down our team motivation.

Motivation

From time to time our sprints scope is changing. It is not a big deal because we are trying to be agile :-) but Jira's burndowchart in this situation draw a peek. Because in fact that chart shows scope changes not a real burndown. It means, that chart cannot break down an x-axis if we really do more than we were planned – it always stop on at most zero. Also for better progress monitoring we've started to split our user stories to technical tasks and estimating them. Original burndowchart doesn't show points from technical tasks. I can find motivation of this – user story almost finished isn't finished at all until user can use it. But in the other hand, if we know which tasks is problematic we can do some teamwork to move it on. So I realize that it is a good opportunity to try some new approaches and tools.

Tools

I've started with lift framework. In the World of Single Page Applications, this framework has more than simple interface for serving REST services. It comes with awesome Comet support. Comet is a replacement for WebSockets that run on all browsers. It supports long polling and transparent fallback to short polling if limit of client connections exceed. In backend you can handle pushes in CometActor. For further reading take a look at Roundtrip promises But lift framework is also a kind of framework of frameworks. You can handle own abstraction of CometActors and push to client javascript that shorten up your way from server to client. So it was the trigger for author of lift-ng to make a lift with Angular integration that is build on top of lift. It provides AngularActors from which you can emit/broadcast events to scope of controller. NgModelBinders that synchronize your backend model with client scope in a few lines! I've used them to send project state (all sprints and thier details) to client and notify him about scrum board changes. My actor doing all of this hard work looks pretty small: Lift-ng also provides factories for creating of Angular services. Services could respond with futures that are transformed to Angular promises in-fly. This is all what was need to serve sprint history: And on the client side - use of service: In my opinion this two frameworks gives a huge boost in developing of web applications. You have the power of strongly typing with Scala, you can design your domain on Actors and all of this with simplicity of node.js – lack of json trasforming boilerplate and dynamic application reload.

DDD + Event Sourcing

I've also tried a few fresh approaches to DDD. I've organize domain objects in actors. There are SprintActors with encapsulate sprint aggregate root. Task changes are stored as events which are computed as a difference between two boards states. When it should be provided a history of sprint, next board states are computed from initial state and sequence of events. So I realize that the best way to keep this kind of event sourcing approach tested is to make random tests. This is a test doing random changes at board, calculating events and checking if initial state + events is equals to previously created state:

First look

Screenshot of first version:
If you want to look at this closer, check the source code or download ready to run fatjar on github.