{"id":13147,"date":"2017-09-08T11:57:38","date_gmt":"2017-09-08T09:57:38","guid":{"rendered":"https:\/\/touk.pl\/blog\/?p=13147"},"modified":"2022-08-03T09:16:55","modified_gmt":"2022-08-03T07:16:55","slug":"easy-fraud-detection-in-nigerian-bank-with-touk-nussknacker","status":"publish","type":"post","link":"https:\/\/touk.pl\/blog\/2017\/09\/08\/easy-fraud-detection-in-nigerian-bank-with-touk-nussknacker\/","title":{"rendered":"Easy fraud detection in Nigerian bank with TouK Nussknacker"},"content":{"rendered":"<h2 id=\"problem\">Problem<\/h2>\n<p>We have a large mysql database with payment transactions from multiple banks, aggregated for Nigerian biggest operator. Each day the customers create about 300 thousand transactions. There are banking transactions and top-ups. One of our fraud detection processes for top-ups looks like this:<\/p>\n<ol style=\"list-style-type: decimal\">\n<li>Find top-up (called Airtime in our domain) transactions<\/li>\n<li>with amount above 3000<\/li>\n<li>ended with success<\/li>\n<li>If there was more than one such transaction in last 24 hours then we mark the customer as a fraud suspect<\/li>\n<li>If customer was marked as a fraud suspect in previous 24 hours (it means 24-48h hours ago) then we should block him<\/li>\n<\/ol>\n<p>We were using cron in our previous solution, which ran big SQL query once a day and created report from found transactions. We wanted to make this fraud detection process realtime and to know about suspicious situations as fast as possible. That is why we decided to use TouK Nussknacker.<\/p>\n<h2 id=\"touk-nussknacker\">TouK Nussknacker<\/h2>\n<div class=\"figure\"><img decoding=\"async\" src=\"https:\/\/gist.githubusercontent.com\/alien11689\/5d35fdcd91a7304e5eda5123f5bb6b54\/raw\/0c9ddf79642e0349860e527d89e91132f489c491\/nussknacker.png\" alt=\"Nussknacker logo\" \/><\/div>\n<p><a href=\"https:\/\/github.com\/TouK\/nussknacker\">Nussknacker<\/a> is a GUI to the <a href=\"https:\/\/flink.apache.org\">Apache Flink<\/a> developed by <a href=\"https:\/\/touk.pl\">TouK<\/a> to help its clients manage various event stream processes. Nussknacker was first deployed in 2016 at one of the Polish mobile network operators. The GUI helps business analysts, marketers and other trained users create and manage processes with a minimal support from developers.<\/p>\n<h2 id=\"architecture\">Architecture<\/h2>\n<p>Architecture of our solution was based on <a href=\"https:\/\/github.com\/TouK\/nussknacker\/tree\/master\/demo\/docker\">demo project available in Nussknackera github project<\/a>. It contains:<\/p>\n<ul>\n<li><code>UI<\/code> &#8211; allows us to create, test and deploy flink jobs and also read the metrics or logs of them.<\/li>\n<li><code>job manager<\/code> &#8211; dispatches flink jobs to task managers<\/li>\n<li><code>task manager<\/code> &#8211; runs flink jobs<\/li>\n<li><code>zookeeper<\/code> &#8211; for discovery of task managers and keeping information about state<\/li>\n<li><code>logstash<\/code> &#8211; harvests the logs from flink tasks, parse them and send to <code>elasticsearch<\/code><\/li>\n<li><code>elasticsarch<\/code> &#8211; keeps logs in index for further analyze<\/li>\n<li><code>kibana<\/code> &#8211; allows for searching in harvested logs<\/li>\n<li><code>influxdb<\/code> &#8211; keeps jobs metrics<\/li>\n<li><code>grafana<\/code> &#8211; allows for searching and viewing jobs metrics<\/li>\n<li><code>mysql<\/code> &#8211; the source of transactions<\/li>\n<\/ul>\n<h2 id=\"prepare-components\">Prepare components<\/h2>\n<p>Nussknacker assumes some initial effort to create a problem model and components which know how to convert real data into the model, aggregate it and send to external destination.<\/p>\n<h3 id=\"the-model\">The model<\/h3>\n<p>If we want to create a Nussknacker process, we have to create a model first. The model is a description of data, but also contains all data that might be used in the future or displayed in logs.<\/p>\n<p>In our case we have created <code>Transaction<\/code> case class in Scala:<\/p>\n<pre class=\"EnlighterJSRAW\" data-enlighter-language=\"generic\">case class Transaction(\r\n            id: Long,\r\n            amount: BigDecimal,\r\n            customerMsisdn: String,\r\n            date: LocalDateTime,\r\n            externalCustomerId: String,\r\n            uid: String,\r\n            paymentMethodId: String,\r\n            statusId: Long,\r\n            status: String,\r\n            failureType: String,\r\n            failureDetails: String,\r\n            drawDownBillerId: Long,\r\n            paymentProvider: String,\r\n            paymentProviderId: Long,\r\n            product: String) extends WithFields {\r\n\r\n  val timestamp: Long = date.toInstant(ZoneOffset.UTC).toEpochMilli\r\n\r\n  \/\/ ...\r\n}\r\n<\/pre>\n<p>It is important to create a timestamp field representing when the event occured. Thanks to that aggregates are calculated in valid and deterministic way. This is very important for historical or delayed data.<\/p>\n<h3 id=\"build-input\">Build input<\/h3>\n<p>The data is kept in the MySQL database, therefore flink needs information about how to fetch it, convert to the model and emit to processing.<\/p>\n<p>We have created <code>JdbcTransactionSource<\/code> data class:<\/p>\n<pre class=\"EnlighterJSRAW\" data-enlighter-language=\"generic\">case class JdbcTransactionSource(limit: Long, intervalInSeconds: Long, jdbcConfig: JdbcConfig)\r\n  extends RichSourceFunction[Transaction]\r\n    with Checkpointed[java.lang.Long]\r\n{\r\n    \/\/ ...\r\n}\r\n<\/pre>\n<p>The amount of data to fetch is limited by the source, because we do not want to take all the database records on first start or start after long period of time. What is more, a time interval is set between data fetches. The source has checkpoints stored in the HDFS or in other configured storage with metadata saved in zookeeper to know which records have already been processed.<\/p>\n<p>The whole job of source class could be summarized by these lines:<\/p>\n<pre class=\"EnlighterJSRAW\" data-enlighter-language=\"generic\">val rs = statement.executeQuery()\r\nwhile (rs.next()) {\r\n  val transaction = fromResultSetRow(rs)\r\n  sourceContext.collectWithTimestamp(transaction, transaction.timestamp)\r\n  lastTransactionId = transaction.id\r\n}\r\n<\/pre>\n<h3 id=\"build-sink\">Build sink<\/h3>\n<p>Sink is a component which knows where to send data after processing. In our case, we want to just log a message with transaction. Generated logs will be sent to elasticsearch by logstash.<\/p>\n<p>Sink code is quite simple:<\/p>\n<pre class=\"EnlighterJSRAW\" data-enlighter-language=\"generic\">class LoggingSink(messagePrefix: String, loggingLevel: String) extends FlinkSink with LazyLogging with Serializable {\r\n\r\n  override def toFlinkFunction: SinkFunction[Any] = new SinkFunction[Any] with Serializable {\r\n    override def invoke(in: Any): Unit = {\r\n      val message = s\"$messagePrefix - $in\"\r\n      loggingLevel match {\r\n        case \"error\" =&gt; logger.error(message)\r\n        case \"warn\" =&gt; logger.warn(message)\r\n        case default =&gt; logger.info(message)\r\n      }\r\n    }\r\n  }\r\n\r\n  override def testDataOutput: Option[(Any) =&gt; String] = Some(in =&gt; s\"[$loggingLevel] $messagePrefix $in\")\r\n}\r\n<\/pre>\n<p>&nbsp;<\/p>\n<h3 id=\"create-aggregate-function\">Create aggregate function<\/h3>\n<p>One of the more difficult tasks is to a create our custom aggregate function. In our case, it is counter function, which will count occurences of transactions that meet our conditions. Each aggregate is created for a certain key. In our case, the key is <code>customerMsisdn<\/code>, since it points directly at a single customer. For each customer we want to know how many transactions in a time window were made or found.<\/p>\n<p>We have created <code>EventsCounter<\/code> class which parses input parameters from configuration and based on the key transforms incoming events and stores count in the <code>eventsCounter<\/code> variable:<\/p>\n<pre class=\"EnlighterJSRAW\" data-enlighter-language=\"generic\">class EventsCounter extends CustomStreamTransformer {\r\n\r\n  @MethodToInvoke(returnType = classOf[EventCount])\r\n  def execute(@ParamName(\"key\") key: LazyInterpreter[String],\r\n              @ParamName(\"length\") length: String) = {\r\n    FlinkCustomStreamTransformation((start: DataStream[InterpretationResult]) =&gt; {\r\n      val lengthInMillis = Duration(length).toMillis\r\n      start.keyBy(key.syncInterpretationFunction)\r\n        .transform(\"eventsCounter\", new CounterFunction(lengthInMillis))\r\n    })\r\n  }\r\n}\r\n<\/pre>\n<p>&nbsp;<\/p>\n<ul>\n<li><code>@MethodToInvoke<\/code> tells Nussknacker that the annotated method should be invoked when process reaches component<\/li>\n<li><code>@ParamName<\/code> tells Nussknacker that component is configurable via parameters provided in UI<\/li>\n<\/ul>\n<p><code>CounterFunction<\/code> counts occurences in the time window and stores them as <code>EventCount<\/code> objects which knows when the first occurence of event in this time window was:<\/p>\n<pre class=\"EnlighterJSRAW\" data-enlighter-language=\"generic\">class CounterFunction(lengthInMillis: Long) extends TimestampedEvictableState[Long] {\r\n\r\n  override def stateDescriptor =\r\n    new ValueStateDescriptor[MultiMap[Long, Long]](\"state\", classOf[MultiMap[Long, Long]])\r\n\r\n  override def processElement(element: StreamRecord[InterpretationResult]): Unit = {\r\n    setEvictionTimeForCurrentKey(element.getTimestamp + lengthInMillis)\r\n    state.update(filterState(element.getTimestamp, lengthInMillis))\r\n\r\n    val ir = element.getValue\r\n    val eventCount = stateValue.add(element.getTimestamp, 1)\r\n    state.update(eventCount)\r\n\r\n    val eventsCount = eventCount.map.values.flatten.sum\r\n    val smallestTimestamp = eventCount.map.keys.min\r\n    output.collect(new StreamRecord[ValueWithContext[Any]](\r\n      ValueWithContext(EventCount(count = eventsCount, smallestTimestamp = smallestTimestamp), ir.finalContext), element.getTimestamp)\r\n    )\r\n  }\r\n}\r\n\r\ncase class EventCount(count: Long, smallestTimestamp: Long)\r\n<\/pre>\n<h2 id=\"build-nussknacker-process\">Build Nussknacker process<\/h2>\n<p>We have created some components needed for our domain problem. We created a jar library with a class describing process building blocks:<\/p>\n<pre class=\"EnlighterJSRAW\" data-enlighter-language=\"generic\">class FraudDetectorConfigCreator extends ProcessConfigCreator {\r\n  val transactionCategory = \"TransactionStream\"\r\n\r\n  override def sourceFactories(config: Config): Map[String, WithCategories[SourceFactory[_]]] = {\r\n    import net.ceedubs.ficus.Ficus._\r\n    import net.ceedubs.ficus.readers.ArbitraryTypeReader._\r\n    Map(\r\n      \"Transactions\" -&gt; WithCategories(new TransactionSourceFactory(config.as[JdbcConfig](\"jdbc\")), transactionCategory)\r\n    )\r\n  }\r\n\r\n  override def customStreamTransformers(config: Config): Map[String, WithCategories[CustomStreamTransformer]] = Map(\r\n    \"eventsCounter\" -&gt; WithCategories(new EventsCounter, transactionCategory)\r\n  )\r\n\r\n  override def sinkFactories(config: Config): Map[String, WithCategories[SinkFactory]] = Map(\r\n    \"LoggingFactory\" -&gt; WithCategories(new LoggingSinkFactory, transactionCategory)\r\n  )\r\n\r\n  \/\/ other factories that we do not need in our problem\r\n}\r\n<\/pre>\n<p>Components are available via factories and each factory is created in one or more categories (always <code>TranscationStream<\/code> in our case).<\/p>\n<h3 id=\"configuration\">configuration<\/h3>\n<p>We provided some configuration for Nussknacker UI.<\/p>\n<pre class=\"EnlighterJSRAW\" data-enlighter-language=\"generic\">fraudDemo {\r\n  timeout: 10s\r\n  checkpointInterval: 20s\r\n  restartInterval: \"10s\"\r\n  processConfigCreatorClass: \"pl.touk.zephyr.frauddetector.FraudDetectorConfigCreator\"\r\n  jdbc {\r\n    driver: \"com.mysql.jdbc.Driver\"\r\n    url: \"jdbc:mysql:\/\/mysql-zephyr:3306\/backofficedev\"\r\n    userName: \"backofficedev\"\r\n    password: \"...\"\r\n  }     \r\n  defaultValues {\r\n    values {\r\n    }\r\n  }\r\n\r\n}\r\n<\/pre>\n<p>This configuration will be forwarded to flink with our job.<\/p>\n<h3 id=\"process-definition\">Process definition<\/h3>\n<p>After Nussknacker start the UI is available &#8216;<a href=\"http:\/\/[host]:8080\/\">http:\/\/[host]:8080\/<\/a>&#8216; and we could start defining processes. We created a new process with category <code>TransactionStrem<\/code>. Using our custom components and also generic components (e. g. filter or split) we designed a fraud detection process.<\/p>\n<div class=\"figure\"><img decoding=\"async\" src=\"https:\/\/gist.githubusercontent.com\/alien11689\/5d35fdcd91a7304e5eda5123f5bb6b54\/raw\/7d4aa8dd2239a08c7f5d55a697e50806beac7d9c\/process.png\" alt=\"process\" \/><\/div>\n<p>We have applied filters to find only certain transactions for this process. In the filters we are using <a href=\"https:\/\/docs.spring.io\/spring\/docs\/current\/spring-framework-reference\/html\/expressions.html\">Spring SPEL<\/a> expressions like <code>#input.amount &gt; 3000<\/code>. Since we defined the model, Nussknacker can suggest fields and validate expressions. We want to log such transactions with level <code>INFO<\/code> and continue processing (this is why we have used split component).<\/p>\n<p>Next we count transactions in the 24 hours time window and find transactions for each customer that occured more than once. We log them again (this time with level <code>WARN<\/code>) and continue fraud detection.<\/p>\n<p>The last step is to count suspcitious transactions that occured in the last 48 hours and filter our this suspicious transaction that occured in last 24 hours. Finally, we log such transactions with level <code>ERROR<\/code>.<\/p>\n<p>After designing and saving, the process is ready for testing.<\/p>\n<h2 id=\"testing\">Testing<\/h2>\n<p>The <code>JdbcTransactionSource<\/code> provides method <code>generateTestData<\/code>, which connects to the database, fetches requested number of transactions and saves them to a file. In Nussknacker it can be done by clicking the <code>generate<\/code> button in the section <code>Test<\/code>. Next, we can execute the process locally with data from the file by clicking the <code>from file<\/code> button. The number of process transactions in each step will be shown in the nodes:<\/p>\n<div class=\"figure\"><img decoding=\"async\" src=\"https:\/\/gist.githubusercontent.com\/alien11689\/5d35fdcd91a7304e5eda5123f5bb6b54\/raw\/7d4aa8dd2239a08c7f5d55a697e50806beac7d9c\/processAfterTest.png\" alt=\"process_after_test\" \/><\/div>\n<p>We can also look inside each node and check which transaction reached it by clicking on it.<\/p>\n<h2 id=\"deploy-and-results\">Deploy and results<\/h2>\n<p>After testing the process, it can be deployed &#8211; sent to the job manager and started. To deploy the process we just need to click the <code>Deploy<\/code> button.<\/p>\n<p>In metrics tab we could connect to dashboard and watch metrics from components.<\/p>\n<div class=\"figure\"><img decoding=\"async\" src=\"https:\/\/gist.githubusercontent.com\/alien11689\/5d35fdcd91a7304e5eda5123f5bb6b54\/raw\/7d4aa8dd2239a08c7f5d55a697e50806beac7d9c\/metrics.png\" alt=\"metrics\" \/><\/div>\n<p>In seach tab we could query harvested logs and see how many frauds were found.<\/p>\n<div class=\"figure\"><img decoding=\"async\" src=\"https:\/\/gist.githubusercontent.com\/alien11689\/5d35fdcd91a7304e5eda5123f5bb6b54\/raw\/7d4aa8dd2239a08c7f5d55a697e50806beac7d9c\/search.png\" alt=\"search\" \/><\/div>\n<h2 id=\"what-next\">What next?<\/h2>\n<p>We have created just one process that allows us to find fraud transactions in our system. Since we already created model, data source, sink and setup architecture, we are ready to quickly design and deploy new processes &#8211; e. g. finding big number of small transactions from the same customer (as next fraud detection process) or find failed transactions with specific details (to find repeatable problems of our customers).<\/p>\n","protected":false},"excerpt":{"rendered":"Problem We have a large mysql database with payment transactions from multiple banks, aggregated for Nigerian biggest operator.&hellip;\n","protected":false},"author":54,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[11],"tags":[252,622],"_links":{"self":[{"href":"https:\/\/touk.pl\/blog\/wp-json\/wp\/v2\/posts\/13147"}],"collection":[{"href":"https:\/\/touk.pl\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/touk.pl\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/touk.pl\/blog\/wp-json\/wp\/v2\/users\/54"}],"replies":[{"embeddable":true,"href":"https:\/\/touk.pl\/blog\/wp-json\/wp\/v2\/comments?post=13147"}],"version-history":[{"count":26,"href":"https:\/\/touk.pl\/blog\/wp-json\/wp\/v2\/posts\/13147\/revisions"}],"predecessor-version":[{"id":14890,"href":"https:\/\/touk.pl\/blog\/wp-json\/wp\/v2\/posts\/13147\/revisions\/14890"}],"wp:attachment":[{"href":"https:\/\/touk.pl\/blog\/wp-json\/wp\/v2\/media?parent=13147"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/touk.pl\/blog\/wp-json\/wp\/v2\/categories?post=13147"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/touk.pl\/blog\/wp-json\/wp\/v2\/tags?post=13147"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}