Easy fraud detection in Nigerian bank with TouK Nussknacker

Problem

We have a large mysql database with payment transactions from multiple banks, aggregated for Nigerian biggest operator. Each day the customers create about 300 thousand transactions. There are banking transactions and top-ups. One of our fraud detection processes for top-ups looks like this:

  1. Find top-up (called Airtime in our domain) transactions
  2. with amount above 3000
  3. ended with success
  4. If there was more than one such transaction in last 24 hours then we mark the customer as a fraud suspect
  5. If customer was marked as a fraud suspect in previous 24 hours (it means 24-48h hours ago) then we should block him

We were using cron in our previous solution, which ran big SQL query once a day and created report from found transactions. We wanted to make this fraud detection process realtime and to know about suspicious situations as fast as possible. That is why we decided to use TouK Nussknacker.

TouK Nussknacker

Nussknacker logo

Nussknacker is a GUI to the Apache Flink developed by TouK to help its clients manage various event stream processes. Nussknacker was first deployed in 2016 at one of the Polish mobile network operators. The GUI helps business analysts, marketers and other trained users create and manage processes with a minimal support from developers.

Architecture

Architecture of our solution was based on demo project available in Nussknackera github project. It contains:

  • UI – allows us to create, test and deploy flink jobs and also read the metrics or logs of them.
  • job manager – dispatches flink jobs to task managers
  • task manager – runs flink jobs
  • zookeeper – for discovery of task managers and keeping information about state
  • logstash – harvests the logs from flink tasks, parse them and send to elasticsearch
  • elasticsarch – keeps logs in index for further analyze
  • kibana – allows for searching in harvested logs
  • influxdb – keeps jobs metrics
  • grafana – allows for searching and viewing jobs metrics
  • mysql – the source of transactions

Prepare components

Nussknacker assumes some initial effort to create a problem model and components which know how to convert real data into the model, aggregate it and send to external destination.

The model

If we want to create a Nussknacker process, we have to create a model first. The model is a description of data, but also contains all data that might be used in the future or displayed in logs.

In our case we have created Transaction case class in Scala:

case class Transaction(
            id: Long,
            amount: BigDecimal,
            customerMsisdn: String,
            date: LocalDateTime,
            externalCustomerId: String,
            uid: String,
            paymentMethodId: String,
            statusId: Long,
            status: String,
            failureType: String,
            failureDetails: String,
            drawDownBillerId: Long,
            paymentProvider: String,
            paymentProviderId: Long,
            product: String) extends WithFields {

  val timestamp: Long = date.toInstant(ZoneOffset.UTC).toEpochMilli

  // ...
}

It is important to create a timestamp field representing when the event occured. Thanks to that aggregates are calculated in valid and deterministic way. This is very important for historical or delayed data.

Build input

The data is kept in the MySQL database, therefore flink needs information about how to fetch it, convert to the model and emit to processing.

We have created JdbcTransactionSource data class:

case class JdbcTransactionSource(limit: Long, intervalInSeconds: Long, jdbcConfig: JdbcConfig)
  extends RichSourceFunction[Transaction]
    with Checkpointed[java.lang.Long]
{
    // ...
}

The amount of data to fetch is limited by the source, because we do not want to take all the database records on first start or start after long period of time. What is more, a time interval is set between data fetches. The source has checkpoints stored in the HDFS or in other configured storage with metadata saved in zookeeper to know which records have already been processed.

The whole job of source class could be summarized by these lines:

val rs = statement.executeQuery()
while (rs.next()) {
  val transaction = fromResultSetRow(rs)
  sourceContext.collectWithTimestamp(transaction, transaction.timestamp)
  lastTransactionId = transaction.id
}

Build sink

Sink is a component which knows where to send data after processing. In our case, we want to just log a message with transaction. Generated logs will be sent to elasticsearch by logstash.

Sink code is quite simple:

class LoggingSink(messagePrefix: String, loggingLevel: String) extends FlinkSink with LazyLogging with Serializable {

  override def toFlinkFunction: SinkFunction[Any] = new SinkFunction[Any] with Serializable {
    override def invoke(in: Any): Unit = {
      val message = s"$messagePrefix - $in"
      loggingLevel match {
        case "error" => logger.error(message)
        case "warn" => logger.warn(message)
        case default => logger.info(message)
      }
    }
  }

  override def testDataOutput: Option[(Any) => String] = Some(in => s"[$loggingLevel] $messagePrefix $in")
}

 

Create aggregate function

One of the more difficult tasks is to a create our custom aggregate function. In our case, it is counter function, which will count occurences of transactions that meet our conditions. Each aggregate is created for a certain key. In our case, the key is customerMsisdn, since it points directly at a single customer. For each customer we want to know how many transactions in a time window were made or found.

We have created EventsCounter class which parses input parameters from configuration and based on the key transforms incoming events and stores count in the eventsCounter variable:

class EventsCounter extends CustomStreamTransformer {

  @MethodToInvoke(returnType = classOf[EventCount])
  def execute(@ParamName("key") key: LazyInterpreter[String],
              @ParamName("length") length: String) = {
    FlinkCustomStreamTransformation((start: DataStream[InterpretationResult]) => {
      val lengthInMillis = Duration(length).toMillis
      start.keyBy(key.syncInterpretationFunction)
        .transform("eventsCounter", new CounterFunction(lengthInMillis))
    })
  }
}

 

  • @MethodToInvoke tells Nussknacker that the annotated method should be invoked when process reaches component
  • @ParamName tells Nussknacker that component is configurable via parameters provided in UI

CounterFunction counts occurences in the time window and stores them as EventCount objects which knows when the first occurence of event in this time window was:

class CounterFunction(lengthInMillis: Long) extends TimestampedEvictableState[Long] {

  override def stateDescriptor =
    new ValueStateDescriptor[MultiMap[Long, Long]]("state", classOf[MultiMap[Long, Long]])

  override def processElement(element: StreamRecord[InterpretationResult]): Unit = {
    setEvictionTimeForCurrentKey(element.getTimestamp + lengthInMillis)
    state.update(filterState(element.getTimestamp, lengthInMillis))

    val ir = element.getValue
    val eventCount = stateValue.add(element.getTimestamp, 1)
    state.update(eventCount)

    val eventsCount = eventCount.map.values.flatten.sum
    val smallestTimestamp = eventCount.map.keys.min
    output.collect(new StreamRecord[ValueWithContext[Any]](
      ValueWithContext(EventCount(count = eventsCount, smallestTimestamp = smallestTimestamp), ir.finalContext), element.getTimestamp)
    )
  }
}

case class EventCount(count: Long, smallestTimestamp: Long)

Build Nussknacker process

We have created some components needed for our domain problem. We created a jar library with a class describing process building blocks:

class FraudDetectorConfigCreator extends ProcessConfigCreator {
  val transactionCategory = "TransactionStream"

  override def sourceFactories(config: Config): Map[String, WithCategories[SourceFactory[_]]] = {
    import net.ceedubs.ficus.Ficus._
    import net.ceedubs.ficus.readers.ArbitraryTypeReader._
    Map(
      "Transactions" -> WithCategories(new TransactionSourceFactory(config.as[JdbcConfig]("jdbc")), transactionCategory)
    )
  }

  override def customStreamTransformers(config: Config): Map[String, WithCategories[CustomStreamTransformer]] = Map(
    "eventsCounter" -> WithCategories(new EventsCounter, transactionCategory)
  )

  override def sinkFactories(config: Config): Map[String, WithCategories[SinkFactory]] = Map(
    "LoggingFactory" -> WithCategories(new LoggingSinkFactory, transactionCategory)
  )

  // other factories that we do not need in our problem
}

Components are available via factories and each factory is created in one or more categories (always TranscationStream in our case).

configuration

We provided some configuration for Nussknacker UI.

fraudDemo {
  timeout: 10s
  checkpointInterval: 20s
  restartInterval: "10s"
  processConfigCreatorClass: "pl.touk.zephyr.frauddetector.FraudDetectorConfigCreator"
  jdbc {
    driver: "com.mysql.jdbc.Driver"
    url: "jdbc:mysql://mysql-zephyr:3306/backofficedev"
    userName: "backofficedev"
    password: "..."
  }     
  defaultValues {
    values {
    }
  }

}

This configuration will be forwarded to flink with our job.

Process definition

After Nussknacker start the UI is available ‘http://[host]:8080/‘ and we could start defining processes. We created a new process with category TransactionStrem. Using our custom components and also generic components (e. g. filter or split) we designed a fraud detection process.

process

We have applied filters to find only certain transactions for this process. In the filters we are using Spring SPEL expressions like #input.amount > 3000. Since we defined the model, Nussknacker can suggest fields and validate expressions. We want to log such transactions with level INFO and continue processing (this is why we have used split component).

Next we count transactions in the 24 hours time window and find transactions for each customer that occured more than once. We log them again (this time with level WARN) and continue fraud detection.

The last step is to count suspcitious transactions that occured in the last 48 hours and filter our this suspicious transaction that occured in last 24 hours. Finally, we log such transactions with level ERROR.

After designing and saving, the process is ready for testing.

Testing

The JdbcTransactionSource provides method generateTestData, which connects to the database, fetches requested number of transactions and saves them to a file. In Nussknacker it can be done by clicking the generate button in the section Test. Next, we can execute the process locally with data from the file by clicking the from file button. The number of process transactions in each step will be shown in the nodes:

process_after_test

We can also look inside each node and check which transaction reached it by clicking on it.

Deploy and results

After testing the process, it can be deployed – sent to the job manager and started. To deploy the process we just need to click the Deploy button.

In metrics tab we could connect to dashboard and watch metrics from components.

metrics

In seach tab we could query harvested logs and see how many frauds were found.

search

What next?

We have created just one process that allows us to find fraud transactions in our system. Since we already created model, data source, sink and setup architecture, we are ready to quickly design and deploy new processes – e. g. finding big number of small transactions from the same customer (as next fraud detection process) or find failed transactions with specific details (to find repeatable problems of our customers).

You May Also Like

33rd Degree day 1 review

33rd Degree is over. After the one last year, my expectations were very high, but Grzegorz Duda once again proved he's more than able to deliver. With up to five tracks (most of the time: four presentations + one workshop), and ~650 attendees,  there was a lot to see and a lot to do, thus everyone will probably have a little bit different story to tell. Here is mine.

Twitter: From Ruby on Rails to the JVM

Raffi Krikorian talking about Twitter and JVM
The conference started with  Raffi Krikorian from Twitter, talking about their use for JVM. Twitter was build with Ruby but with their performance management a lot of the backend was moved to Scala, Java and Closure. Raffi noted, that for Ruby programmers Scala was easier to grasp than Java, more natural, which is quite interesting considering how many PHP guys move to Ruby these days because of the same reasons. Perhaps the path of learning Jacek Laskowski once described (Java -> Groovy -> Scala/Closure) may be on par with PHP -> Ruby -> Scala. It definitely feels like Scala is the holy grail of languages these days.

Raffi also noted, that while JVM delivered speed and a concurrency model to Twitter stack, it wasn't enough, and they've build/customized their own Garbage Collector. My guess is that Scala/Closure could also be used because of a nice concurrency solutions (STM, immutables and so on).

Raffi pointed out, that with the scale of Twitter, you easily get 3 million hits per second, and that means you probably have 3 edge cases every second. I'd love to learn listen to lessons they've learned from this.

 

Complexity of Complexity


The second keynote of the first day, was Ken Sipe talking about complexity. He made a good point that there is a difference between complex and complicated, and that we often recognize things as complex only because we are less familiar with them. This goes more interesting the moment you realize that the shift in last 20 years of computer languages, from the "Less is more" paradigm (think Java, ASM) to "More is better" (Groovy/Scala/Closure), where you have more complex language, with more powerful and less verbose syntax, that is actually not more complicated, it just looks less familiar.

So while 10 years ago, I really liked Java as a general purpose language for it's small set of rules that could get you everywhere, it turned out that to do most of the real world stuff, a lot of code had to be written. The situation got better thanks to libraries/frameworks and so on, but it's just patching. New languages have a lot of stuff build into, which makes their set of rules and syntax much more complex, but once you get familiar, the real world usage is simple, faster, better, with less traps laying around, waiting for you to fall.

Ken also pointed out, that while Entity Service Bus looks really simple on diagrams, it's usually very difficult and complicated to use from the perspective of the programmer. And that's probably why it gets chosen so often - the guys selling/buying it, look no deeper than on the diagram.

 

Pointy haired bosses and pragmatic programmers: Facts and Fallacies of Software Development

Venkat Subramaniam with Dima
Dima got lucky. Or maybe not.

Venkat Subramaniam is the kind of a speaker that talk about very simple things in a way, which makes everyone either laugh or reflect. Yes, he is a showman, but hey, that's actually good, because even if you know the subject quite well, his talks are still very entertaining.
This talk was very generic (here's my thesis: the longer the title, the more generic the talk will be), interesting and fun, but at the end I'm unable to see anything new I'd have learned, apart from the distinction between Dynamic vs Static and Strong vs Weak typing, which I've seen the last year, but managed to forgot. This may be a very interesting argument for all those who are afraid of Groovy/Ruby, after bad experience with PHP or Perl.

Build Trust in Your Build to Deployment Flow!


Frederic Simon talked about DevOps and deployment, and that was a miss in my  schedule, because of two reasons. First, the talk was aimed at DevOps specifically, and while the subject is trendy lately, without big-scale problems, deployment is a process I usually set up and forget about. It just works, mostly because I only have to deal with one (current) project at a time. 
Not much love for Dart.
Second, while Frederic has a fabulous accent and a nice, loud voice, he tends to start each sentence loud and fade the sound at the end. This, together with mics failing him badly, made half of the presentation hard to grasp unless you were sitting in the first row.
I'm not saying the presentation was bad, far from it, it just clearly wasn't for me.
I've left a few minutes before the end, to see how many people came to Dart presentation by Mike West. I was kind of interested, since I'm following Warsaw Google Technology User Group and heard a few voices about why I should pay attentions to that new Google language. As you can see from the picture on the right, the majority tends to disagree with that opinion.

 

Non blocking, composable reactive web programming with Iteratees

Sadek Drobi's talk about Iteratees in Play 2.0 was very refreshing. Perhaps because I've never used Play before, but the presentation was flawless, with well explained problems, concepts and solutions.
Sadek started with a reflection on how much CPU we waste waiting for IO in web development, then moved to Play's Iteratees, to explain the concept and implementation, which while very different from the that overused Request/Servlet model, looked really nice and simple. I'm not sure though, how much the problem is present when you have a simple service, serving static content before your app server. Think apache (and faster) before tomcat. That won't fix the upload/download issue though, which is beautifully solved in Play 2.0

The Future of the Java Platform: Java SE 8 & Beyond


Simon Ritter is an intriguing fellow. If you take a glance at his work history (AT&T UNIX System Labs -> Novell -> Sun -> Oracle), you can easily see, he's a heavy weight player.
His presentation was rich in content, no corpo-bullshit. He started with a bit of history of JCP and how it looks like right now, then moved to the most interesting stuff, changes. Now I could give you a summary here, but there is really no point: you'd be much better taking look at the slides. There are only 48 of them, but everything is self-explanatory.
While I'm very disappointed with the speed of changes, especially when compared to the C# world, I'm glad with the direction and the fact that they finally want to BREAK the compatibility with the broken stuff (generics, etc.).  Moving to other languages I guess I won't be the one to scream "My god, finally!" somewhere in 2017, though. All the changes together look very promising, it's just that I'd like to have them like... now? Next year max, not near the heat death of the universe.

Simon also revealed one of the great mysteries of Java, to me:
The original idea behind JNI was to make it hard to write, to discourage people form using it.
On a side note, did you know Tegra3 has actually 5 cores? You use 4 of them, and then switch to the other one, when you battery gets low.

BOF: Spring and CloudFoundry


Having most of my folks moved to see "Typesafe stack 2.0" fabulously organized by Rafał Wasilewski and  Wojtek Erbetowski (with both of whom I had a pleasure to travel to the conference) and knowing it will be recorded, I've decided to see what Josh Long has to say about CloudFoundry, a subject I find very intriguing after the de facto fiasco of Google App Engine.

The audience was small but vibrant, mostly users of Amazon EC2, and while it turned out that Josh didn't have much, with pricing and details not yet public, the fact that Spring Source has already created their own competition (Could Foundry is both an Open Source app and a service), takes a lot from my anxiety.

For the review of the second day of the conference, go here.