Easy fraud detection in Nigerian bank with TouK Nussknacker

Problem

We have a large mysql database with payment transactions from multiple banks, aggregated for Nigerian biggest operator. Each day the customers create about 300 thousand transactions. There are banking transactions and top-ups. One of our fraud detection processes for top-ups looks like this:

  1. Find top-up (called Airtime in our domain) transactions
  2. with amount above 3000
  3. ended with success
  4. If there was more than one such transaction in last 24 hours then we mark the customer as a fraud suspect
  5. If customer was marked as a fraud suspect in previous 24 hours (it means 24-48h hours ago) then we should block him

We were using cron in our previous solution, which ran big SQL query once a day and created report from found transactions. We wanted to make this fraud detection process realtime and to know about suspicious situations as fast as possible. That is why we decided to use TouK Nussknacker.

TouK Nussknacker

Nussknacker logo

Nussknacker is a GUI to the Apache Flink developed by TouK to help its clients manage various event stream processes. Nussknacker was first deployed in 2016 at one of the Polish mobile network operators. The GUI helps business analysts, marketers and other trained users create and manage processes with a minimal support from developers.

Architecture

Architecture of our solution was based on demo project available in Nussknackera github project. It contains:

  • UI – allows us to create, test and deploy flink jobs and also read the metrics or logs of them.
  • job manager – dispatches flink jobs to task managers
  • task manager – runs flink jobs
  • zookeeper – for discovery of task managers and keeping information about state
  • logstash – harvests the logs from flink tasks, parse them and send to elasticsearch
  • elasticsarch – keeps logs in index for further analyze
  • kibana – allows for searching in harvested logs
  • influxdb – keeps jobs metrics
  • grafana – allows for searching and viewing jobs metrics
  • mysql – the source of transactions

Prepare components

Nussknacker assumes some initial effort to create a problem model and components which know how to convert real data into the model, aggregate it and send to external destination.

The model

If we want to create a Nussknacker process, we have to create a model first. The model is a description of data, but also contains all data that might be used in the future or displayed in logs.

In our case we have created Transaction case class in Scala:

case class Transaction(
            id: Long,
            amount: BigDecimal,
            customerMsisdn: String,
            date: LocalDateTime,
            externalCustomerId: String,
            uid: String,
            paymentMethodId: String,
            statusId: Long,
            status: String,
            failureType: String,
            failureDetails: String,
            drawDownBillerId: Long,
            paymentProvider: String,
            paymentProviderId: Long,
            product: String) extends WithFields {

  val timestamp: Long = date.toInstant(ZoneOffset.UTC).toEpochMilli

  // ...
}

It is important to create a timestamp field representing when the event occured. Thanks to that aggregates are calculated in valid and deterministic way. This is very important for historical or delayed data.

Build input

The data is kept in the MySQL database, therefore flink needs information about how to fetch it, convert to the model and emit to processing.

We have created JdbcTransactionSource data class:

case class JdbcTransactionSource(limit: Long, intervalInSeconds: Long, jdbcConfig: JdbcConfig)
  extends RichSourceFunction[Transaction]
    with Checkpointed[java.lang.Long]
{
    // ...
}

The amount of data to fetch is limited by the source, because we do not want to take all the database records on first start or start after long period of time. What is more, a time interval is set between data fetches. The source has checkpoints stored in the HDFS or in other configured storage with metadata saved in zookeeper to know which records have already been processed.

The whole job of source class could be summarized by these lines:

val rs = statement.executeQuery()
while (rs.next()) {
  val transaction = fromResultSetRow(rs)
  sourceContext.collectWithTimestamp(transaction, transaction.timestamp)
  lastTransactionId = transaction.id
}

Build sink

Sink is a component which knows where to send data after processing. In our case, we want to just log a message with transaction. Generated logs will be sent to elasticsearch by logstash.

Sink code is quite simple:

class LoggingSink(messagePrefix: String, loggingLevel: String) extends FlinkSink with LazyLogging with Serializable {

  override def toFlinkFunction: SinkFunction[Any] = new SinkFunction[Any] with Serializable {
    override def invoke(in: Any): Unit = {
      val message = s"$messagePrefix - $in"
      loggingLevel match {
        case "error" => logger.error(message)
        case "warn" => logger.warn(message)
        case default => logger.info(message)
      }
    }
  }

  override def testDataOutput: Option[(Any) => String] = Some(in => s"[$loggingLevel] $messagePrefix $in")
}

 

Create aggregate function

One of the more difficult tasks is to a create our custom aggregate function. In our case, it is counter function, which will count occurences of transactions that meet our conditions. Each aggregate is created for a certain key. In our case, the key is customerMsisdn, since it points directly at a single customer. For each customer we want to know how many transactions in a time window were made or found.

We have created EventsCounter class which parses input parameters from configuration and based on the key transforms incoming events and stores count in the eventsCounter variable:

class EventsCounter extends CustomStreamTransformer {

  @MethodToInvoke(returnType = classOf[EventCount])
  def execute(@ParamName("key") key: LazyInterpreter[String],
              @ParamName("length") length: String) = {
    FlinkCustomStreamTransformation((start: DataStream[InterpretationResult]) => {
      val lengthInMillis = Duration(length).toMillis
      start.keyBy(key.syncInterpretationFunction)
        .transform("eventsCounter", new CounterFunction(lengthInMillis))
    })
  }
}

 

  • @MethodToInvoke tells Nussknacker that the annotated method should be invoked when process reaches component
  • @ParamName tells Nussknacker that component is configurable via parameters provided in UI

CounterFunction counts occurences in the time window and stores them as EventCount objects which knows when the first occurence of event in this time window was:

class CounterFunction(lengthInMillis: Long) extends TimestampedEvictableState[Long] {

  override def stateDescriptor =
    new ValueStateDescriptor[MultiMap[Long, Long]]("state", classOf[MultiMap[Long, Long]])

  override def processElement(element: StreamRecord[InterpretationResult]): Unit = {
    setEvictionTimeForCurrentKey(element.getTimestamp + lengthInMillis)
    state.update(filterState(element.getTimestamp, lengthInMillis))

    val ir = element.getValue
    val eventCount = stateValue.add(element.getTimestamp, 1)
    state.update(eventCount)

    val eventsCount = eventCount.map.values.flatten.sum
    val smallestTimestamp = eventCount.map.keys.min
    output.collect(new StreamRecord[ValueWithContext[Any]](
      ValueWithContext(EventCount(count = eventsCount, smallestTimestamp = smallestTimestamp), ir.finalContext), element.getTimestamp)
    )
  }
}

case class EventCount(count: Long, smallestTimestamp: Long)

Build Nussknacker process

We have created some components needed for our domain problem. We created a jar library with a class describing process building blocks:

class FraudDetectorConfigCreator extends ProcessConfigCreator {
  val transactionCategory = "TransactionStream"

  override def sourceFactories(config: Config): Map[String, WithCategories[SourceFactory[_]]] = {
    import net.ceedubs.ficus.Ficus._
    import net.ceedubs.ficus.readers.ArbitraryTypeReader._
    Map(
      "Transactions" -> WithCategories(new TransactionSourceFactory(config.as[JdbcConfig]("jdbc")), transactionCategory)
    )
  }

  override def customStreamTransformers(config: Config): Map[String, WithCategories[CustomStreamTransformer]] = Map(
    "eventsCounter" -> WithCategories(new EventsCounter, transactionCategory)
  )

  override def sinkFactories(config: Config): Map[String, WithCategories[SinkFactory]] = Map(
    "LoggingFactory" -> WithCategories(new LoggingSinkFactory, transactionCategory)
  )

  // other factories that we do not need in our problem
}

Components are available via factories and each factory is created in one or more categories (always TranscationStream in our case).

configuration

We provided some configuration for Nussknacker UI.

fraudDemo {
  timeout: 10s
  checkpointInterval: 20s
  restartInterval: "10s"
  processConfigCreatorClass: "pl.touk.zephyr.frauddetector.FraudDetectorConfigCreator"
  jdbc {
    driver: "com.mysql.jdbc.Driver"
    url: "jdbc:mysql://mysql-zephyr:3306/backofficedev"
    userName: "backofficedev"
    password: "..."
  }     
  defaultValues {
    values {
    }
  }

}

This configuration will be forwarded to flink with our job.

Process definition

After Nussknacker start the UI is available ‘http://[host]:8080/‘ and we could start defining processes. We created a new process with category TransactionStrem. Using our custom components and also generic components (e. g. filter or split) we designed a fraud detection process.

process

We have applied filters to find only certain transactions for this process. In the filters we are using Spring SPEL expressions like #input.amount > 3000. Since we defined the model, Nussknacker can suggest fields and validate expressions. We want to log such transactions with level INFO and continue processing (this is why we have used split component).

Next we count transactions in the 24 hours time window and find transactions for each customer that occured more than once. We log them again (this time with level WARN) and continue fraud detection.

The last step is to count suspcitious transactions that occured in the last 48 hours and filter our this suspicious transaction that occured in last 24 hours. Finally, we log such transactions with level ERROR.

After designing and saving, the process is ready for testing.

Testing

The JdbcTransactionSource provides method generateTestData, which connects to the database, fetches requested number of transactions and saves them to a file. In Nussknacker it can be done by clicking the generate button in the section Test. Next, we can execute the process locally with data from the file by clicking the from file button. The number of process transactions in each step will be shown in the nodes:

process_after_test

We can also look inside each node and check which transaction reached it by clicking on it.

Deploy and results

After testing the process, it can be deployed – sent to the job manager and started. To deploy the process we just need to click the Deploy button.

In metrics tab we could connect to dashboard and watch metrics from components.

metrics

In seach tab we could query harvested logs and see how many frauds were found.

search

What next?

We have created just one process that allows us to find fraud transactions in our system. Since we already created model, data source, sink and setup architecture, we are ready to quickly design and deploy new processes – e. g. finding big number of small transactions from the same customer (as next fraud detection process) or find failed transactions with specific details (to find repeatable problems of our customers).

You May Also Like

Spock basics

Spock (homepage) is like its authors say 'testing and specification framework'. Spock combines very elegant and natural syntax with the powerful capabilities. And what is most important it is easy to use.

One note at the very beginning: I assume that you are already familiar with principles of Test Driven Development and you know how to use testing framework like for example JUnit.

So how can I start?


Writing spock specifications is very easy. We need basic configuration of Spock and Groovy dependencies (if you are using mavenized project with Eclipse look to my previous post: Spock, Java and Maven). Once we have everything set up and running smooth we can write our first specs (spec or specification is equivalent for test class in other frameworks like JUnit of TestNG).

What is great with Spock is fact that we can use it to test both Groovy projects and pure Java projects or even mixed projects.


Let's go!


Every spec class must inherit from spock.lang.Specification class. Only then test runner will recognize it as test class and start tests. We will write few specs for this simple class: User class and few tests not connected with this particular class.

We start with defining our class:
import spock.lang.*

class UserSpec extends Specification {

}
Now we can proceed to defining test fixtures and test methods.

All activites we want to perform before each test method, are to be put in def setup() {...} method and everything we want to be run after each test should be put in def cleanup() {...} method (they are equivalents for JUnit methods with @Before and @After annotations).

It can look like this:
class UserSpec extends Specification {
User user
Document document

def setup() {
user = new User()
document = DocumentTestFactory.createDocumentWithTitle("doc1")
}

def cleanup() {

}
}
Of course we can use field initialization for instantiating test objects:
class UserSpec extends Specification {
User user = new User()
Document document = DocumentTestFactory.createDocumentWithTitle("doc1")

def setup() {

}

def cleanup() {

}
}

What is more readable or preferred? It is just a matter of taste because according to Spock docs behaviour is the same in these two cases.

It is worth mentioning that JUnit @BeforeClass/@AfterClass are also present in Spock as def setupSpec() {...} and def cleanupSpec() {...}. They will be runned before first test and after last test method.


First tests


In Spock every method in specification class, expect setup/cleanup, is treated by runner as a test method (unless you annotate it with @Ignore).

Very interesting feature of Spock and Groovy is ability to name methods with full sentences just like regular strings:
class UserSpec extends Specification {
// ...

def "should assign coment to user"() {
// ...
}
}
With such naming convention we can write real specification and include details about specified behaviour in method name, what is very convenient when reading test reports and analyzing errors.

Test method (also called feature method) is logically divided into few blocks, each with its own purpose. Blocks are defined like labels in Java (but they are transformed with Groovy AST transform features) and some of them must be put in code in specific order.

Most basic and common schema for Spock test is:
class UserSpec extends Specification {
// ...

def "should assign coment to user"() {
given:
// do initialization of test objects
when:
// perform actions to be tested
then:
// collect and analyze results
}
}

But there are more blocks like:
  • setup
  • expect
  • where
  • cleanup
In next section I am going to describe each block shortly with little examples.

given block

This block is used to setup test objects and their state. It has to be first block in test and cannot be repeated. Below is little example how can it be used:
class UserSpec extends Specification {
// ...

def "should add project to user and mark user as project's owner"() {
given:
User user = new User()
Project project = ProjectTestFactory.createProjectWithName("simple project")
// ...
}
}

In this code given block contains initialization of test objects and nothing more. We create simple user without any specified attributes and project with given name. In case when some of these objects could be reused in more feature methods, it could be worth putting initialization in setup method.

when and then blocks

When block contains action we want to test (Spock documentation calls it 'stimulus'). This block always occurs in pair with then block, where we are verifying response for satisfying certain conditions. Assume we have this simple test case:
class UserSpec extends Specification {
// ...

def "should assign user to comment when adding comment to user"() {
given:
User user = new User()
Comment comment = new Comment()
when:
user.addComment(comment)
then:
comment.getUserWhoCreatedComment().equals(user)
}

// ...
}

In when block there is a call of tested method and nothing more. After we are sure our action was performed, we can check for desired conditions in then block.

Then block is very well structured and its every line is treated by Spock as boolean statement. That means, Spock expects that we write instructions containing comparisons and expressions returning true or false, so we can create then block with such statements:
user.getName() == "John"
user.getAge() == 40
!user.isEnabled()
Each of lines will be treated as single assertion and will be evaluated by Spock.

Sometimes we expect that our method throws an exception under given circumstances. We can write test for it with use of thrown method:
class CommentSpec extends Specification {
def "should throw exception when adding null document to comment"() {
given:
Comment comment = new Comment()
when:
comment.setCommentedDocument(null)
then:
thrown(RuntimeException)
}
}

In this test we want to make sure that passing incorrect parameters is correctly handled by tested method and that method throws an exception in response. In case you want to be certain that method does not throw particular exception, simply use notThrown method.


expect block

Expect block is primarily used when we do not want to separate when and then blocks because it is unnatural. It is especially useful for simple test (and according to TDD rules all test should be simple and short) with only one condition to check, like in this example (it is simple but should show the idea):
def "should create user with given name"() {
given:
User user = UserTestFactory.createUser("john doe")
expect:
user.getName() == "john doe"
}



More blocks!


That were very simple tests with standard Spock test layout and canonical divide into given/when/then parts. But Spock offers more possibilities in writing tests and provides more blocks.


setup/cleanup blocks

These two blocks have the very same functionality as the def setup and def cleanup methods in specification. They allow to perform some actions before test and after test. But unlike these methods (which are shared between all tests) blocks work only in methods they are defined in. 


where - easy way to create readable parameterized tests

Very often when we create unit tests there is a need to "feed" them with sample data to test various cases and border values. With Spock this task is very easy and straighforward. To provide test data to feature method, we need to use where block. Let's take a look at little the piece of code:

def "should successfully validate emails with valid syntax"() {
expect:
emailValidator.validate(email) == true
where:
email }

In this example, Spock creates variable called email which is used when calling method being tested. Internally feature method is called once, but framework iterates over given values and calls expect/when block as many times as there are values (however, if we use @Unroll annotation Spock can create separate run for each of given values, more about it in one of next examples).

Now, lets assume that we want our feature method to test both successful and failure validations. To achieve that goal we can create few 
parameterized variables for both input parameter and expected result. Here is a little example:

def "should perform validation of email addresses"() {
expect:
emailValidator.validate(email) == result
where:
email result }
Well, it looks nice, but Spock can do much better. It offers tabular format of defining parameters for test what is much more readable and natural. Lets take a look:
def "should perform validation of email addresses"() {
expect:
emailValidator.validate(email) == result
where:
email | result
"WTF" | false
"@domain" | false
"foo@bar.com" | true
"a@test" | false
}
In this code, each column of our "table" is treated as a separate variable and rows are values for subsequent test iterations.

Another useful feature of Spock during parameterizing test is its ability to "unroll" each parameterized test. Feature method from previous example could be defined as (the body stays the same, so I do not repeat it):
@Unroll("should validate email #email")
def "should perform validation of email addresses"() {
// ...
}
With that annotation, Spock generate few methods each with its own name and run them separately. We can use symbols from where blocks in @Unroll argument by preceding it with '#' sign what is a signal to Spock to use it in generated method name.


What next?


Well, that was just quick and short journey  through Spock and its capabilities. However, with that basic tutorial you are ready to write many unit tests. In one of my future posts I am going to describe more features of Spock focusing especially on its mocking abilities.