Easy fraud detection in Nigerian bank with TouK Nussknacker

Problem

We have a large mysql database with payment transactions from multiple banks, aggregated for Nigerian biggest operator. Each day the customers create about 300 thousand transactions. There are banking transactions and top-ups. One of our fraud detection processes for top-ups looks like this:

  1. Find top-up (called Airtime in our domain) transactions
  2. with amount above 3000
  3. ended with success
  4. If there was more than one such transaction in last 24 hours then we mark the customer as a fraud suspect
  5. If customer was marked as a fraud suspect in previous 24 hours (it means 24-48h hours ago) then we should block him

We were using cron in our previous solution, which ran big SQL query once a day and created report from found transactions. We wanted to make this fraud detection process realtime and to know about suspicious situations as fast as possible. That is why we decided to use TouK Nussknacker.

TouK Nussknacker

Nussknacker logo

Nussknacker is a GUI to the Apache Flink developed by TouK to help its clients manage various event stream processes. Nussknacker was first deployed in 2016 at one of the Polish mobile network operators. The GUI helps business analysts, marketers and other trained users create and manage processes with a minimal support from developers.

Architecture

Architecture of our solution was based on demo project available in Nussknackera github project. It contains:

  • UI – allows us to create, test and deploy flink jobs and also read the metrics or logs of them.
  • job manager – dispatches flink jobs to task managers
  • task manager – runs flink jobs
  • zookeeper – for discovery of task managers and keeping information about state
  • logstash – harvests the logs from flink tasks, parse them and send to elasticsearch
  • elasticsarch – keeps logs in index for further analyze
  • kibana – allows for searching in harvested logs
  • influxdb – keeps jobs metrics
  • grafana – allows for searching and viewing jobs metrics
  • mysql – the source of transactions

Prepare components

Nussknacker assumes some initial effort to create a problem model and components which know how to convert real data into the model, aggregate it and send to external destination.

The model

If we want to create a Nussknacker process, we have to create a model first. The model is a description of data, but also contains all data that might be used in the future or displayed in logs.

In our case we have created Transaction case class in Scala:

case class Transaction(
            id: Long,
            amount: BigDecimal,
            customerMsisdn: String,
            date: LocalDateTime,
            externalCustomerId: String,
            uid: String,
            paymentMethodId: String,
            statusId: Long,
            status: String,
            failureType: String,
            failureDetails: String,
            drawDownBillerId: Long,
            paymentProvider: String,
            paymentProviderId: Long,
            product: String) extends WithFields {

  val timestamp: Long = date.toInstant(ZoneOffset.UTC).toEpochMilli

  // ...
}

It is important to create a timestamp field representing when the event occured. Thanks to that aggregates are calculated in valid and deterministic way. This is very important for historical or delayed data.

Build input

The data is kept in the MySQL database, therefore flink needs information about how to fetch it, convert to the model and emit to processing.

We have created JdbcTransactionSource data class:

case class JdbcTransactionSource(limit: Long, intervalInSeconds: Long, jdbcConfig: JdbcConfig)
  extends RichSourceFunction[Transaction]
    with Checkpointed[java.lang.Long]
{
    // ...
}

The amount of data to fetch is limited by the source, because we do not want to take all the database records on first start or start after long period of time. What is more, a time interval is set between data fetches. The source has checkpoints stored in the HDFS or in other configured storage with metadata saved in zookeeper to know which records have already been processed.

The whole job of source class could be summarized by these lines:

val rs = statement.executeQuery()
while (rs.next()) {
  val transaction = fromResultSetRow(rs)
  sourceContext.collectWithTimestamp(transaction, transaction.timestamp)
  lastTransactionId = transaction.id
}

Build sink

Sink is a component which knows where to send data after processing. In our case, we want to just log a message with transaction. Generated logs will be sent to elasticsearch by logstash.

Sink code is quite simple:

class LoggingSink(messagePrefix: String, loggingLevel: String) extends FlinkSink with LazyLogging with Serializable {

  override def toFlinkFunction: SinkFunction[Any] = new SinkFunction[Any] with Serializable {
    override def invoke(in: Any): Unit = {
      val message = s"$messagePrefix - $in"
      loggingLevel match {
        case "error" => logger.error(message)
        case "warn" => logger.warn(message)
        case default => logger.info(message)
      }
    }
  }

  override def testDataOutput: Option[(Any) => String] = Some(in => s"[$loggingLevel] $messagePrefix $in")
}

 

Create aggregate function

One of the more difficult tasks is to a create our custom aggregate function. In our case, it is counter function, which will count occurences of transactions that meet our conditions. Each aggregate is created for a certain key. In our case, the key is customerMsisdn, since it points directly at a single customer. For each customer we want to know how many transactions in a time window were made or found.

We have created EventsCounter class which parses input parameters from configuration and based on the key transforms incoming events and stores count in the eventsCounter variable:

class EventsCounter extends CustomStreamTransformer {

  @MethodToInvoke(returnType = classOf[EventCount])
  def execute(@ParamName("key") key: LazyInterpreter[String],
              @ParamName("length") length: String) = {
    FlinkCustomStreamTransformation((start: DataStream[InterpretationResult]) => {
      val lengthInMillis = Duration(length).toMillis
      start.keyBy(key.syncInterpretationFunction)
        .transform("eventsCounter", new CounterFunction(lengthInMillis))
    })
  }
}

 

  • @MethodToInvoke tells Nussknacker that the annotated method should be invoked when process reaches component
  • @ParamName tells Nussknacker that component is configurable via parameters provided in UI

CounterFunction counts occurences in the time window and stores them as EventCount objects which knows when the first occurence of event in this time window was:

class CounterFunction(lengthInMillis: Long) extends TimestampedEvictableState[Long] {

  override def stateDescriptor =
    new ValueStateDescriptor[MultiMap[Long, Long]]("state", classOf[MultiMap[Long, Long]])

  override def processElement(element: StreamRecord[InterpretationResult]): Unit = {
    setEvictionTimeForCurrentKey(element.getTimestamp + lengthInMillis)
    state.update(filterState(element.getTimestamp, lengthInMillis))

    val ir = element.getValue
    val eventCount = stateValue.add(element.getTimestamp, 1)
    state.update(eventCount)

    val eventsCount = eventCount.map.values.flatten.sum
    val smallestTimestamp = eventCount.map.keys.min
    output.collect(new StreamRecord[ValueWithContext[Any]](
      ValueWithContext(EventCount(count = eventsCount, smallestTimestamp = smallestTimestamp), ir.finalContext), element.getTimestamp)
    )
  }
}

case class EventCount(count: Long, smallestTimestamp: Long)

Build Nussknacker process

We have created some components needed for our domain problem. We created a jar library with a class describing process building blocks:

class FraudDetectorConfigCreator extends ProcessConfigCreator {
  val transactionCategory = "TransactionStream"

  override def sourceFactories(config: Config): Map[String, WithCategories[SourceFactory[_]]] = {
    import net.ceedubs.ficus.Ficus._
    import net.ceedubs.ficus.readers.ArbitraryTypeReader._
    Map(
      "Transactions" -> WithCategories(new TransactionSourceFactory(config.as[JdbcConfig]("jdbc")), transactionCategory)
    )
  }

  override def customStreamTransformers(config: Config): Map[String, WithCategories[CustomStreamTransformer]] = Map(
    "eventsCounter" -> WithCategories(new EventsCounter, transactionCategory)
  )

  override def sinkFactories(config: Config): Map[String, WithCategories[SinkFactory]] = Map(
    "LoggingFactory" -> WithCategories(new LoggingSinkFactory, transactionCategory)
  )

  // other factories that we do not need in our problem
}

Components are available via factories and each factory is created in one or more categories (always TranscationStream in our case).

configuration

We provided some configuration for Nussknacker UI.

fraudDemo {
  timeout: 10s
  checkpointInterval: 20s
  restartInterval: "10s"
  processConfigCreatorClass: "pl.touk.zephyr.frauddetector.FraudDetectorConfigCreator"
  jdbc {
    driver: "com.mysql.jdbc.Driver"
    url: "jdbc:mysql://mysql-zephyr:3306/backofficedev"
    userName: "backofficedev"
    password: "..."
  }     
  defaultValues {
    values {
    }
  }

}

This configuration will be forwarded to flink with our job.

Process definition

After Nussknacker start the UI is available ‘http://[host]:8080/‘ and we could start defining processes. We created a new process with category TransactionStrem. Using our custom components and also generic components (e. g. filter or split) we designed a fraud detection process.

process

We have applied filters to find only certain transactions for this process. In the filters we are using Spring SPEL expressions like #input.amount > 3000. Since we defined the model, Nussknacker can suggest fields and validate expressions. We want to log such transactions with level INFO and continue processing (this is why we have used split component).

Next we count transactions in the 24 hours time window and find transactions for each customer that occured more than once. We log them again (this time with level WARN) and continue fraud detection.

The last step is to count suspcitious transactions that occured in the last 48 hours and filter our this suspicious transaction that occured in last 24 hours. Finally, we log such transactions with level ERROR.

After designing and saving, the process is ready for testing.

Testing

The JdbcTransactionSource provides method generateTestData, which connects to the database, fetches requested number of transactions and saves them to a file. In Nussknacker it can be done by clicking the generate button in the section Test. Next, we can execute the process locally with data from the file by clicking the from file button. The number of process transactions in each step will be shown in the nodes:

process_after_test

We can also look inside each node and check which transaction reached it by clicking on it.

Deploy and results

After testing the process, it can be deployed – sent to the job manager and started. To deploy the process we just need to click the Deploy button.

In metrics tab we could connect to dashboard and watch metrics from components.

metrics

In seach tab we could query harvested logs and see how many frauds were found.

search

What next?

We have created just one process that allows us to find fraud transactions in our system. Since we already created model, data source, sink and setup architecture, we are ready to quickly design and deploy new processes – e. g. finding big number of small transactions from the same customer (as next fraud detection process) or find failed transactions with specific details (to find repeatable problems of our customers).

You May Also Like

Need to make a quick json fixes – JSONPath for rescue

From time to time I have a need to do some fixes in my json data. In a world of flat files I do this with grep/sed/awk tool chain. How to handle it for JSON? Searching for a solution I came across the JSONPath. It quite mature tool (from 2007) but I haven't hear about it so I decided to share my experience with others.

First of all you can try it without pain online: http://jsonpath.curiousconcept.com/. Full syntax is described at http://goessner.net/articles/JsonPath/



But also you can download python binding and run it from command line:
$ sudo apt-get install python-jsonpath-rw
$ sudo apt-get install python-setuptools
$ sudo easy_install -U jsonpath

After that you can use inside python or with simple cli wrapper:
#!/usr/bin/python
import sys, json, jsonpath

path = sys.argv[
1]

result = jsonpath.jsonpath(json.load(sys.stdin), path)
print json.dumps(result, indent=2)

… you can use it in your shell e.g. for json:
{
"store": {
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
{
"category": "fiction",
"author": "Herman Melville",
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
},
{
"category": "fiction",
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"isbn": "0-395-19395-8",
"price": 22.99
}
],
"bicycle": {
"color": "red",
"price": 19.95
}
}
}

You can print only book nodes with price lower than 10 by:
$ jsonpath '$..book[?(@.price 

Result:
[
{
"category": "reference",
"price": 8.95,
"title": "Sayings of the Century",
"author": "Nigel Rees"
},
{
"category": "fiction",
"price": 8.99,
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"author": "Herman Melville"
}
]

Have a nice JSON hacking!From time to time I have a need to do some fixes in my json data. In a world of flat files I do this with grep/sed/awk tool chain. How to handle it for JSON? Searching for a solution I came across the JSONPath. It quite mature tool (from 2007) but I haven't hear about it so I decided to share my experience with others.

Multi module Gradle project with IDE support

This article is a short how-to about multi-module project setup with usage of the Gradle automation build tool.

Here's how Rich Seller, a StackOverflow user, describes Gradle:
Gradle promises to hit the sweet spot between Ant and Maven. It uses Ivy's approach for dependency resolution. It allows for convention over configuration but also includes Ant tasks as first class citizens. It also wisely allows you to use existing Maven/Ivy repositories.
So why would one use yet another JVM build tool such as Gradle? The answer is simple: to avoid frustration involved by Ant or Maven.

Short story

I was fooling around with some fresh proof of concept and needed a build tool. I'm pretty familiar with Maven so created project from an artifact, and opened the build file, pom.xml for further tuning.
I had been using Grails with its own build system (similar to Gradle, btw) already for some time up then, so after quite a time without Maven, I looked on the pom.xml and found it to be really repulsive.

Once again I felt clearly: XML is not for humans.

After quick googling I found Gradle. It was still in beta (0.8 version) back then, but it's configured with Groovy DSL and that's what a human likes :)

Where are we

In the time Ant can be met but among IT guerrillas, Maven is still on top and couple of others like for example Ivy conquer for the best position, Gradle smoothly went into its mature age. It's now available in 1.3 version, released at 20th of November 2012. I'm glad to recommend it to anyone looking for relief from XML configured tools, or for anyone just looking for simple, elastic and powerful build tool.

Lets build

I have already written about basic project structure so I skip this one, reminding only the basic project structure:
<project root>

├── build.gradle
└── src
├── main
│ ├── java
│ └── groovy

└── test
├── java
└── groovy
Have I just referred myself for the 1st time? Achievement unlocked! ;)

Gradle as most build tools is run from a command line with parameters. The main parameter for Gradle is a 'task name', for example we can run a command: gradle build.
There is no 'create project' task, so the directory structure has to be created by hand. This isn't a hassle though.
Java and groovy sub-folders aren't always mandatory. They depend on what compile plugin is used.

Parent project

Consider an example project 'the-app' of three modules, let say:
  1. database communication layer
  2. domain model and services layer
  3. web presentation layer
Our project directory tree will look like:
the-app

├── dao-layer
│ └── src

├── domain-model
│ └── src

├── web-frontend
│ └── src

├── build.gradle
└── settings.gradle
the-app itself has no src sub-folder as its purpose is only to contain sub-projects and build configuration. If needed it could've been provided with own src though.

To glue modules we need to fill settings.gradle file under the-app directory with a single line of content specifying module names:
include 'dao-layer', 'domain-model', 'web-frontend'
Now the gradle projects command can be executed to obtain such a result:
:projects

------------------------------------------------------------
Root project
------------------------------------------------------------

Root project 'the-app'
+--- Project ':dao-layer'
+--- Project ':domain-model'
\--- Project ':web-frontend'
...so we know that Gradle noticed the modules. However gradle build command won't run successful yet because build.gradle file is still empty.

Sub project

As in Maven we can create separate build config file per each module. Let say we starting from DAO layer.
Thus we create a new file the-app/dao-layer/build.gradle with a line of basic build info (notice the new build.gradle was created under sub-project directory):
apply plugin: 'java'
This single line of config for any of modules is enough to execute gradle build command under the-app directory with following result:
:dao-layer:compileJava
:dao-layer:processResources UP-TO-DATE
:dao-layer:classes
:dao-layer:jar
:dao-layer:assemble
:dao-layer:compileTestJava UP-TO-DATE
:dao-layer:processTestResources UP-TO-DATE
:dao-layer:testClasses UP-TO-DATE
:dao-layer:test
:dao-layer:check
:dao-layer:build

BUILD SUCCESSFUL

Total time: 3.256 secs
To use Groovy plugin slightly more configuration is needed:
apply plugin: 'groovy'

repositories {
mavenLocal()
mavenCentral()
}

dependencies {
groovy 'org.codehaus.groovy:groovy-all:2.0.5'
}
At lines 3 to 6 Maven repositories are set. At line 9 dependency with groovy library version is specified. Of course plugin as 'java', 'groovy' and many more can be mixed each other.

If we have settings.gradle file and a build.gradle file for each module, there is no need for parent the-app/build.gradle file at all. Sure that's true but we can go another, better way.

One file to rule them all

Instead of creating many build.gradle config files, one per each module, we can use only the parent's one and make it a bit more juicy. So let us move the the-app/dao-layer/build.gradle a level up to the-app/build-gradle and fill it with new statements to achieve full project configuration:
def langLevel = 1.7

allprojects {

apply plugin: 'idea'

group = 'com.tamashumi'
version = '0.1'
}

subprojects {

apply plugin: 'groovy'

sourceCompatibility = langLevel
targetCompatibility = langLevel

repositories {
mavenLocal()
mavenCentral()
}

dependencies {
groovy 'org.codehaus.groovy:groovy-all:2.0.5'
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0'
}
}

project(':dao-layer') {

dependencies {
compile 'org.hibernate:hibernate-core:4.1.7.Final'
}
}

project(':domain-model') {

dependencies {
compile project(':dao-layer')
}
}

project(':web-frontend') {

apply plugin: 'war'

dependencies {
compile project(':domain-model')
compile 'org.springframework:spring-webmvc:3.1.2.RELEASE'
}
}

idea {
project {
jdkName = langLevel
languageLevel = langLevel
}
}
At the beginning simple variable langLevel is declared. It's worth knowing that we can use almost any Groovy code inside build.gradle file, statements like for example if conditions, for/while loops, closures, switch-case, etc... Quite an advantage over inflexible XML, isn't it?

Next the allProjects block. Any configuration placed in it will influence - what a surprise - all projects, so the parent itself and sub-projects (modules). Inside of the block we have the IDE (Intellij Idea) plugin applied which I wrote more about in previous article (look under "IDE Integration" heading). Enough to say that with this plugin applied here, command gradle idea will generate Idea's project files with modules structure and dependencies. This works really well and plugins for other IDEs are available too.
Remaining two lines at this block define group and version for the project, similar as this is done by Maven.

After that subProjects block appears. It's related to all modules but not the parent project. So here the Groovy language plugin is applied, as all modules are assumed to be written in Groovy.
Below source and target language level are set.
After that come references to standard Maven repositories.
At the end of the block dependencies to groovy version and test library - Spock framework.

Following blocks, project(':module-name'), are responsible for each module configuration. They may be omitted unless allProjects or subProjects configure what's necessary for a specific module. In the example per module configuration goes as follow:
  • Dao-layer module has dependency to an ORM library - Hibernate
  • Domain-model module relies on dao-layer as a dependency. Keyword project is used here again for a reference to other module.
  • Web-frontend applies 'war' plugin which build this module into java web archive. Besides it referes to domain-model module and also use Spring MVC framework dependency.

At the end in idea block is basic info for IDE plugin. Those are parameters corresponding to the Idea's project general settings visible on the following screen shot.


jdkName should match the IDE's SDK name otherwise it has to be set manually under IDE on each Idea's project files (re)generation with gradle idea command.

Is that it?

In the matter of simplicity - yes. That's enough to automate modular application build with custom configuration per module. Not a rocket science, huh? Think about Maven's XML. It would take more effort to setup the same and still achieve less expressible configuration quite far from user-friendly.

Check the online user guide for a lot of configuration possibilities or better download Gradle and see the sample projects.
As a tasty bait take a look for this short choice of available plugins:
  • java
  • groovy
  • scala
  • cpp
  • eclipse
  • netbeans
  • ida
  • maven
  • osgi
  • war
  • ear
  • sonar
  • project-report
  • signing
and more, 3rd party plugins...