Easy fraud detection in Nigerian bank with TouK Nussknacker

Problem

We have a large mysql database with payment transactions from multiple banks, aggregated for Nigerian biggest operator. Each day the customers create about 300 thousand transactions. There are banking transactions and top-ups. One of our fraud detection processes for top-ups looks like this:

  1. Find top-up (called Airtime in our domain) transactions
  2. with amount above 3000
  3. ended with success
  4. If there was more than one such transaction in last 24 hours then we mark the customer as a fraud suspect
  5. If customer was marked as a fraud suspect in previous 24 hours (it means 24-48h hours ago) then we should block him

We were using cron in our previous solution, which ran big SQL query once a day and created report from found transactions. We wanted to make this fraud detection process realtime and to know about suspicious situations as fast as possible. That is why we decided to use TouK Nussknacker.

TouK Nussknacker

Nussknacker logo

Nussknacker is a GUI to the Apache Flink developed by TouK to help its clients manage various event stream processes. Nussknacker was first deployed in 2016 at one of the Polish mobile network operators. The GUI helps business analysts, marketers and other trained users create and manage processes with a minimal support from developers.

Architecture

Architecture of our solution was based on demo project available in Nussknackera github project. It contains:

  • UI – allows us to create, test and deploy flink jobs and also read the metrics or logs of them.
  • job manager – dispatches flink jobs to task managers
  • task manager – runs flink jobs
  • zookeeper – for discovery of task managers and keeping information about state
  • logstash – harvests the logs from flink tasks, parse them and send to elasticsearch
  • elasticsarch – keeps logs in index for further analyze
  • kibana – allows for searching in harvested logs
  • influxdb – keeps jobs metrics
  • grafana – allows for searching and viewing jobs metrics
  • mysql – the source of transactions

Prepare components

Nussknacker assumes some initial effort to create a problem model and components which know how to convert real data into the model, aggregate it and send to external destination.

The model

If we want to create a Nussknacker process, we have to create a model first. The model is a description of data, but also contains all data that might be used in the future or displayed in logs.

In our case we have created Transaction case class in Scala:

case class Transaction(
            id: Long,
            amount: BigDecimal,
            customerMsisdn: String,
            date: LocalDateTime,
            externalCustomerId: String,
            uid: String,
            paymentMethodId: String,
            statusId: Long,
            status: String,
            failureType: String,
            failureDetails: String,
            drawDownBillerId: Long,
            paymentProvider: String,
            paymentProviderId: Long,
            product: String) extends WithFields {

  val timestamp: Long = date.toInstant(ZoneOffset.UTC).toEpochMilli

  // ...
}

It is important to create a timestamp field representing when the event occured. Thanks to that aggregates are calculated in valid and deterministic way. This is very important for historical or delayed data.

Build input

The data is kept in the MySQL database, therefore flink needs information about how to fetch it, convert to the model and emit to processing.

We have created JdbcTransactionSource data class:

case class JdbcTransactionSource(limit: Long, intervalInSeconds: Long, jdbcConfig: JdbcConfig)
  extends RichSourceFunction[Transaction]
    with Checkpointed[java.lang.Long]
{
    // ...
}

The amount of data to fetch is limited by the source, because we do not want to take all the database records on first start or start after long period of time. What is more, a time interval is set between data fetches. The source has checkpoints stored in the HDFS or in other configured storage with metadata saved in zookeeper to know which records have already been processed.

The whole job of source class could be summarized by these lines:

val rs = statement.executeQuery()
while (rs.next()) {
  val transaction = fromResultSetRow(rs)
  sourceContext.collectWithTimestamp(transaction, transaction.timestamp)
  lastTransactionId = transaction.id
}

Build sink

Sink is a component which knows where to send data after processing. In our case, we want to just log a message with transaction. Generated logs will be sent to elasticsearch by logstash.

Sink code is quite simple:

class LoggingSink(messagePrefix: String, loggingLevel: String) extends FlinkSink with LazyLogging with Serializable {

  override def toFlinkFunction: SinkFunction[Any] = new SinkFunction[Any] with Serializable {
    override def invoke(in: Any): Unit = {
      val message = s"$messagePrefix - $in"
      loggingLevel match {
        case "error" => logger.error(message)
        case "warn" => logger.warn(message)
        case default => logger.info(message)
      }
    }
  }

  override def testDataOutput: Option[(Any) => String] = Some(in => s"[$loggingLevel] $messagePrefix $in")
}

 

Create aggregate function

One of the more difficult tasks is to a create our custom aggregate function. In our case, it is counter function, which will count occurences of transactions that meet our conditions. Each aggregate is created for a certain key. In our case, the key is customerMsisdn, since it points directly at a single customer. For each customer we want to know how many transactions in a time window were made or found.

We have created EventsCounter class which parses input parameters from configuration and based on the key transforms incoming events and stores count in the eventsCounter variable:

class EventsCounter extends CustomStreamTransformer {

  @MethodToInvoke(returnType = classOf[EventCount])
  def execute(@ParamName("key") key: LazyInterpreter[String],
              @ParamName("length") length: String) = {
    FlinkCustomStreamTransformation((start: DataStream[InterpretationResult]) => {
      val lengthInMillis = Duration(length).toMillis
      start.keyBy(key.syncInterpretationFunction)
        .transform("eventsCounter", new CounterFunction(lengthInMillis))
    })
  }
}

 

  • @MethodToInvoke tells Nussknacker that the annotated method should be invoked when process reaches component
  • @ParamName tells Nussknacker that component is configurable via parameters provided in UI

CounterFunction counts occurences in the time window and stores them as EventCount objects which knows when the first occurence of event in this time window was:

class CounterFunction(lengthInMillis: Long) extends TimestampedEvictableState[Long] {

  override def stateDescriptor =
    new ValueStateDescriptor[MultiMap[Long, Long]]("state", classOf[MultiMap[Long, Long]])

  override def processElement(element: StreamRecord[InterpretationResult]): Unit = {
    setEvictionTimeForCurrentKey(element.getTimestamp + lengthInMillis)
    state.update(filterState(element.getTimestamp, lengthInMillis))

    val ir = element.getValue
    val eventCount = stateValue.add(element.getTimestamp, 1)
    state.update(eventCount)

    val eventsCount = eventCount.map.values.flatten.sum
    val smallestTimestamp = eventCount.map.keys.min
    output.collect(new StreamRecord[ValueWithContext[Any]](
      ValueWithContext(EventCount(count = eventsCount, smallestTimestamp = smallestTimestamp), ir.finalContext), element.getTimestamp)
    )
  }
}

case class EventCount(count: Long, smallestTimestamp: Long)

Build Nussknacker process

We have created some components needed for our domain problem. We created a jar library with a class describing process building blocks:

class FraudDetectorConfigCreator extends ProcessConfigCreator {
  val transactionCategory = "TransactionStream"

  override def sourceFactories(config: Config): Map[String, WithCategories[SourceFactory[_]]] = {
    import net.ceedubs.ficus.Ficus._
    import net.ceedubs.ficus.readers.ArbitraryTypeReader._
    Map(
      "Transactions" -> WithCategories(new TransactionSourceFactory(config.as[JdbcConfig]("jdbc")), transactionCategory)
    )
  }

  override def customStreamTransformers(config: Config): Map[String, WithCategories[CustomStreamTransformer]] = Map(
    "eventsCounter" -> WithCategories(new EventsCounter, transactionCategory)
  )

  override def sinkFactories(config: Config): Map[String, WithCategories[SinkFactory]] = Map(
    "LoggingFactory" -> WithCategories(new LoggingSinkFactory, transactionCategory)
  )

  // other factories that we do not need in our problem
}

Components are available via factories and each factory is created in one or more categories (always TranscationStream in our case).

configuration

We provided some configuration for Nussknacker UI.

fraudDemo {
  timeout: 10s
  checkpointInterval: 20s
  restartInterval: "10s"
  processConfigCreatorClass: "pl.touk.zephyr.frauddetector.FraudDetectorConfigCreator"
  jdbc {
    driver: "com.mysql.jdbc.Driver"
    url: "jdbc:mysql://mysql-zephyr:3306/backofficedev"
    userName: "backofficedev"
    password: "..."
  }     
  defaultValues {
    values {
    }
  }

}

This configuration will be forwarded to flink with our job.

Process definition

After Nussknacker start the UI is available ‘http://[host]:8080/‘ and we could start defining processes. We created a new process with category TransactionStrem. Using our custom components and also generic components (e. g. filter or split) we designed a fraud detection process.

process

We have applied filters to find only certain transactions for this process. In the filters we are using Spring SPEL expressions like #input.amount > 3000. Since we defined the model, Nussknacker can suggest fields and validate expressions. We want to log such transactions with level INFO and continue processing (this is why we have used split component).

Next we count transactions in the 24 hours time window and find transactions for each customer that occured more than once. We log them again (this time with level WARN) and continue fraud detection.

The last step is to count suspcitious transactions that occured in the last 48 hours and filter our this suspicious transaction that occured in last 24 hours. Finally, we log such transactions with level ERROR.

After designing and saving, the process is ready for testing.

Testing

The JdbcTransactionSource provides method generateTestData, which connects to the database, fetches requested number of transactions and saves them to a file. In Nussknacker it can be done by clicking the generate button in the section Test. Next, we can execute the process locally with data from the file by clicking the from file button. The number of process transactions in each step will be shown in the nodes:

process_after_test

We can also look inside each node and check which transaction reached it by clicking on it.

Deploy and results

After testing the process, it can be deployed – sent to the job manager and started. To deploy the process we just need to click the Deploy button.

In metrics tab we could connect to dashboard and watch metrics from components.

metrics

In seach tab we could query harvested logs and see how many frauds were found.

search

What next?

We have created just one process that allows us to find fraud transactions in our system. Since we already created model, data source, sink and setup architecture, we are ready to quickly design and deploy new processes – e. g. finding big number of small transactions from the same customer (as next fraud detection process) or find failed transactions with specific details (to find repeatable problems of our customers).

You May Also Like

HISE

HISE stands for Human Interactions Service Engine.I have recently posted a proposal, which was accepted by Apache ODE PMC, which means the development will start soon.If you are interested in this project, you are welcome to join us.HISE stands for Human Interactions Service Engine.I have recently posted a proposal, which was accepted by Apache ODE PMC, which means the development will start soon.If you are interested in this project, you are welcome to join us.

How to automate tests with Groovy 2.0, Spock and Gradle

This is the launch of the 1st blog in my life, so cheers and have a nice reading!

y u no test?

Couple of years ago I wasn't a big fan of unit testing. It was obvious to me that well prepared unit tests are crucial though. I didn't known why exactly crucial yet then. I just felt they are important. My disliking to write automation tests was mostly related to the effort necessary to prepare them. Also a spaghetti code was easily spotted in test sources.

Some goodies at hand

Now I know! Test are crucial to get a better design and a confidence. Confidence to improve without a hesitation. Moreover, now I have the tool to make test automation easy as Sunday morning... I'm talking about the Spock Framework. If you got here probably already know what the Spock is, so I won't introduce it. Enough to say that Spock is an awesome unit testing tool which, thanks to Groovy AST Transformation, simplifies creation of tests greatly.

An obstacle

The point is, since a new major version of Groovy has been released (2.0), there is no matching version of Spock available yet.

What now?

Well, in a matter of fact there is such a version. It's still under development though. It can be obtained from this Maven repository. We can of course use the Maven to build a project and run tests. But why not to go even more "groovy" way? XML is not for humans, is it? Lets use Gradle.

The build file

Update: at the end of the post is updated version of the build file.
apply plugin: 'groovy'
apply plugin: 'idea'

def langLevel = 1.7

sourceCompatibility = langLevel
targetCompatibility = langLevel

group = 'com.tamashumi.example.testwithspock'
version = '0.1'

repositories {
mavenLocal()
mavenCentral()
maven { url 'http://oss.sonatype.org/content/repositories/snapshots/' }
}

dependencies {
groovy 'org.codehaus.groovy:groovy-all:2.0.1'
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0-SNAPSHOT'
}

idea {
project {
jdkName = langLevel
languageLevel = langLevel
}
}
As you can see the build.gradle file is almost self-explanatory. Groovy plugin is applied to compile groovy code. It needs groovy-all.jar - declared in version 2.0 at dependencies block just next to Spock in version 0.7. What's most important, mentioned Maven repository URL is added at repositories block.

Project structure and execution

Gradle's default project directory structure is similar to Maven's one. Unfortunately there is no 'create project' task and you have to create it by hand. It's not a big obstacle though. The structure you will create will more or less look as follows:
<project root>

├── build.gradle
└── src
├── main
│ ├── groovy
└── test
└── groovy
To build a project now you can type command gradle build or gradle test to only run tests.

How about Java?

You can test native Java code with Spock. Just add src/main/java directory and a following line to the build.gradle:
apply plugin: 'java'
This way if you don't want or just can't deploy Groovy compiled stuff into your production JVM for any reason, still whole goodness of testing with Spock and Groovy is at your hand.

A silly-simple example

Just to show that it works, here you go with a basic example.

Java simple example class:

public class SimpleJavaClass {

public int sumAll(int... args) {

int sum = 0;

for (int arg : args){
sum += arg;
}

return sum;
}
}

Groovy simple example class:

class SimpleGroovyClass {

String concatenateAll(char separator, String... args) {

args.join(separator as String)
}
}

The test, uhm... I mean the Specification:

class JustASpecification extends Specification {

@Unroll('Sums integers #integers into: #expectedResult')
def "Can sum different amount of integers"() {

given:
def instance = new SimpleJavaClass()

when:
def result = instance.sumAll(* integers)

then:
result == expectedResult

where:
expectedResult | integers
11 | [3, 3, 5]
8 | [3, 5]
254 | [2, 4, 8, 16, 32, 64, 128]
22 | [7, 5, 6, 2, 2]
}

@Unroll('Concatenates strings #strings with separator "#separator" into: #expectedResult')
def "Can concatenate different amount of integers with a specified separator"() {

given:
def instance = new SimpleGroovyClass()

when:
def result = instance.concatenateAll(separator, * strings)

then:
result == expectedResult

where:
expectedResult | separator | strings
'Whasup dude?' | ' ' as char | ['Whasup', 'dude?']
'2012/09/15' | '/' as char | ['2012', '09', '15']
'nice-to-meet-you' | '-' as char | ['nice', 'to', 'meet', 'you']
}
}
To run tests with Gradle simply execute command gradle test. Test reports can be found at <project root>/build/reports/tests/index.html and look kind a like this.


Please note that, thanks to @Unroll annotation, test is executed once per each parameters row in the 'table' at specification's where: block. This isn't a Java label, but a AST transformation magic.

IDE integration

Gradle's plugin for Iintellij Idea

I've added also Intellij Idea plugin for IDE project generation and some configuration for it (IDE's JDK name). To generate Idea's project files just run command: gradle idea There are available Eclipse and Netbeans plugins too, however I haven't tested them. Idea's one works well.

Intellij Idea's plugins for Gradle

Idea itself has a light Gradle support built-in on its own. To not get confused: Gradle has plugin for Idea and Idea has plugin for Gradle. To get even more 'pluginated', there is also JetGradle plugin within Idea. However I haven't found good reason for it's existence - well, maybe excluding one. It shows dependency tree. There is a bug though - JetGradle work's fine only for lang level 1.6. Strangely all the plugins together do not conflict each other. They even give complementary, quite useful tool set.

Running tests under IDE

Jest to add something sweet this is how Specification looks when run with jUnit  runner under Intellij Idea (right mouse button on JustASpecification class or whole folder of specification extending classes and select "Run ...". You'll see a nice view like this.

Building web application

If you need to build Java web application and bundle it as war archive just add plugin by typing the line
apply plugin: 'war'
in the build.gradle file and create a directory src/main/webapp.

Want to know more?

If you haven't heard about Spock or Gradle before or just curious, check the following links:

What next?

The last thing left is to write the real production code you are about to test. No matter will it be Groovy or Java, I leave this to your need and invention. Of course, you are welcome to post a comments here. I'll answer or even write some more posts about the subject.

Important update

Spock version 0.7 has been released, so the above build file doesn't work anymore. It's easy to fix it though. Just remove last dash and a word SNAPSHOT from Spock dependency declaration. Other important thing is that now spock-core depends on groovy-all-2.0.5, so to avoid dependency conflict groovy dependency should be changed from version 2.0.1 to 2.0.5.
Besides oss.sonata.org snapshots maven repository can be removed. No obstacles any more and the build file now looks as follows:
apply plugin: 'groovy'
apply plugin: 'idea'

def langLevel = 1.7

sourceCompatibility = langLevel
targetCompatibility = langLevel

group = 'com.tamashumi.example.testwithspock'
version = '0.1'

repositories {
mavenLocal()
mavenCentral()
}

dependencies {
groovy 'org.codehaus.groovy:groovy-all:2.0.5'
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0'
}

idea {
project {
jdkName = langLevel
languageLevel = langLevel
}
}

micro-burn has Trello integration

After a few long evenings I've finally integrated micro-burn with Trello. All you need to run it for your Trello board is to write short configuration and run fat jar. It renders burndown chart visualising progress of cards on your board.You can specif...After a few long evenings I've finally integrated micro-burn with Trello. All you need to run it for your Trello board is to write short configuration and run fat jar. It renders burndown chart visualising progress of cards on your board.