Data Engineering Project: Stream Edition

Table of Contents

Introduction

With the data engineering field growing significantly, there has been a huge demand for engineers to expand beyond the traditional batch processing model. In most companies there are multiple batch processing data pipelines, but significantly lower number of stream processing data pipelines. This maybe partly due to the complexity of issues involved in getting stream processing to work as expected. This is changing with the introduction of easy to use and powerful stream processing frameworks such as Apache Flink, Apache Spark, etc. The main objective of this post is to answer questions like the following

What are some project ideas that I could start working on and gain some DE experience?

Are there any front-to-back tutorials for a basic DE project?

What is a good project to get DE experience for job interviews?

This article is designed to help you

  1. Understand how to create a data streaming pipeline with state (meaning the process can have some information about a previously processed data point).
  2. Understand the different components usually involved in a data streaming pipeline.
  3. Build a system top down and figure out where and why they can fail and understand how to avoid those common issues with distributed systems.

If you are interested in a batch processing project please check out Data Engineering Project for Beginners - Batch Edition

Project description and requirements

Let’s assume you work for a company that offers a paid website service with about 10 Million daily accounts. There has been some reports of multiple people in different countries sharing the same account id and password (this is a naive use case, a website with a good authentication system can usually detect this) and because of this fraudulent behavior the company you work for is loosing a lot of money. The company wants to detect these fraudulent accounts in real time as they are happening. The company also wants to analyze the different actions performed by the users of these accounts such as click, purchase, login, log-out, delete-account, create-account, update-settings and other actions in real time.

Infrastructure overview

For this project we will assume our website server captures the account actions and pushes them into an Apache kafka topic, which is treated as a data source for our data streaming pipeline. In our project we will be using Apache Flink to detect the fraudulent account logins and we will send data about the fraudulent occurrences to another Apache Kafka topic called alerts. All the user data is to be stored in a PostgreSQL database.

“Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.” - Apache Flink website. The idea is that data flows (or streams) through the different processing tasks and each of those tasks can have a local state that is only available for that task, this is called stateful processing. This state can store values which can be used for processes that require a certain knowledge from a previously processed data. For eg) If you are reading from a source and partitioning based on a key (using keyBy()), it will look like shown below ref

StateFul Stream Processing

Check out Apache Flink architecture if you are unfamiliar with the architecture of Apache Flink

Apache Kafka

Check out this article for a quick Kafka introduction.

If you are green fielding a project it is advisable to design the system and then choose the appropriate tech stack.

Design

From the project requirements we can split the ask into 2 tasks, the detection of fraudulent accounts and logging every action an account takes into a database.

Detect fraudulent accounts

Since the requirement is real time we have to build a data streaming system, that can detect if the same account is being logged into from multiple countries at the same time. The concept of same time across multiple computers is tricky to measure since the login recorded time depends on that user machine’s internal clocks. We can assume that we spoke with the project stakeholders and decided that if the same account is being logged into from a different country within 5 minutes of the first login, then it is decided that it is a fraudulent account.

Log account actions

We want to load the recorded account actions from the Apache Kafka topic into a database to be made available for analysis in real time. Let’s assume that after discussing with the stake holders that we decide a few seconds of delay is an acceptable SLA (service level agreement) for our project.

System Design

Prerequisites

You will need to install

  1. docker (make sure to have docker-compose as well)
  2. pgcli to connect to our postgres instance
  3. git to clone the starter repo
  4. Optional: tmux

Knowledge of any programming language. We will be using Scala, with predominantly object oriented patterns, the code is relatively easy to follow if you are familiar with any OO programming language. You could easily translate this code to Python. We are going to be using docker to set up the required infrastructure locally. Please make sure you have at least 4GB designated to docker.

Code

You can use the start-here branch from this repository as shown below

git clone https://github.com/josephmachado/beginner_de_project_stream.git
cd beginner_de_project_stream
git checkout start-here

The project will have the following structure

Project Structure

Defining dependencies

This project uses Maven, which is a build tool, that can be used to package your Java or Scala code into a jar. We will send this jar to the Apache Flink cluster to be executed. For our use case we will look at only the parts of Maven that are crucial to know for this project. When you look at pom.xml file, which is used by Maven in your project directory notice the sections

  1. dependencies: This section defines the different libraries that our code will need to use for its operations.
  2. build: This section defines how our code gets packages into a jar file. Note that the mainClass defines the main function where the code execution is supposed to start.

Inheritance

In our code, we use a lot of inheritance (see extends). This is a common pattern to ensure that our class has the necessary methods that are required for its operation. For eg) we will see in a later section how inheriting KeyedProcessFunction and overriding its methods, allows us to write our own logic to process incoming data streams.

Server logs generator

In our project we assumed our website server will be generating account action events. We can simulate this using a simple server log generator. Under src/main/scala/com/startdataengineering create a model directory. Within this directory create a file ServerLog.scala which will be the representation of an account action event. The file has the following code

Note: An event is a data point representing a single action. In our example, an account login action would be considered a login event, etc.

package com.startdataengineering.model

case class ServerLog(
                    eventId: String,
                    accountId: Int,
                    eventType: String,
                    locationCountry: String,
                    eventTimeStamp: Long
                    ) extends Serializable {
  override def toString: String = f"$eventId%s,$accountId%s,$eventType%s,$locationCountry%s,$eventTimeStamp%s"
}

object ServerLog {
  def fromString(value: String): ServerLog = {
    val elements: Array[String] = value.split(",")
    ServerLog(elements(0), elements(1).toInt, elements(2), elements(3), elements(4).toLong)
  }
}

In the above code we define a case class, which is a way to represent classes that are logical representation of data in Scala. Note that it extends Serializable which let’s us override the toString method, where we define how to get the string representation of the data in the case class. The object ServerLog, defined in the same file is called a companion object. It is used to create new instances of ServerLog from a string representation. We will use this later in our Apache Flink consumer process to convert strings back to ServerLog objects.

Under src/main/scala/com/startdataengineering create a file called ServerLogGenerator.scala to generate random server log events, with the following code

package com.startdataengineering

import java.time.Instant
import java.util.Properties
import java.util.UUID.randomUUID
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.util.Random

import com.startdataengineering.model.ServerLog

object ServerLogGenerator {

  private val random = new Random

  private val locationCountry: Array[String] = Array(
    "USA", "IN", "UK", "CA", "AU", "DE", "ES", "FR", "NL", "SG", "RU", "JP", "BR", "CN", "O")

  private val eventType: Array[String] = Array(
    "click", "purchase", "login", "log-out", "delete-account", "create-account", "update-settings", "other")

  def getServerLog(): ServerLog = {
    val eventId = randomUUID().toString
    val timestamp = Instant.now.getEpochSecond
    val currentCountry: String = locationCountry(random.nextInt(locationCountry.length))
    val currentEventType: String = eventType(random.nextInt(eventType.length))
    val accountId = random.nextInt(10000)

    ServerLog(eventId, accountId, currentEventType, currentCountry, timestamp)
  }

  def getProperties(): Properties = {
    val props = new Properties()
    props.put("bootstrap.servers", "kafka:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props
  }

  def main(args: Array[String]): Unit = {
    val props = getProperties()
    val topic: String = "server-logs"

    val producer = new KafkaProducer[String, String](props)
    var i = 0

    while(i<100000) {
      val log: ServerLog = getServerLog()
      val record = new ProducerRecord[String, String](topic, log.eventId, log.toString)
      producer.send(record)
      i = i + 1
    }

    producer.close()
  }

}

The code execution starts at the main method, where we

  1. open a kafka producer
  2. create 100,000 random server log events
  3. convert them to strings
  4. send them as values to the topic server-logs, with the key being its eventId which is a UUID(universally unique identifier)
  5. close the producer

We use Instant.now.getEpochSecond to get the timestamp, this returns a Long type number which represents the number of seconds that has passed since Jan 01 1970 ref. If we use the local machines timestamp we may end up with multiple different time zones(since our users are in multiple countries). Epoch time provides an easy way to convert time to a number, without having to worry about time zones.

We are serializing the key and value using StringSerializer, serialization is the process of converting object into bytes to be transferred efficiently over the network and later in the consumer side we will use string deserialization.

We first create a FraudDetectionJob.scala file at src/main/scala/com/startdataengineering, where we will define how the data flows through the processing tasks.

Data Flow

package com.startdataengineering

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}

object FraudDetectionJob {

  @throws[Exception]
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka:9092")

    val myConsumer = new FlinkKafkaConsumer[String](
      "server-logs", new SimpleStringSchema(), properties)
    myConsumer.setStartFromEarliest()

    val events = env
      .addSource(myConsumer)
      .name("incoming-events")

    val alerts: DataStream[String] = events
      .keyBy(event => event.split(",")(1))
      .process(new FraudDetection)
      .name("fraud-detector")

    val myProducer = new FlinkKafkaProducer[String](
      "alerts", new SimpleStringSchema(), properties)

    alerts
      .addSink(myProducer)
      .name("send-alerts")

    events
      .addSink(new ServerLogSink)
      .name("event-log")

    env.execute("Fraud Detection")

  }

}

Let’s go over the code, one section at a time

Create a streaming environment

This is required to let Apache Flink know that this is a streaming data pipeline. There are other environments for batch and table based executions.

import org.apache.flink.streaming.api.scala._
//
//
//
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

Creating a consumer to read events from Apache Kafka

We do this using FlinkKafkaConsumer, which is a Apache Flink connector to read data from a Kafka Topic. The properties value defines the configuration settings to connect to Apache Kafka.

When creating the FlinkKafkaConsumer, we specify the topic from which to read in data. In our case it is the server-logs topic. We define the deserialization schema to deserialize the event data from Apache Kafka (remember that we used string serialization to produce the server log events). We configure our consumer to read from the very first event in that topic by using setStartFromEarliest. After we have set up our consumer we read from it (addSource) and create a variable representing a data stream of strings called events.

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
//
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}

//
//
//

val properties = new Properties()
properties.setProperty("bootstrap.servers", "kafka:9092")

val myConsumer = new FlinkKafkaConsumer[String](
  "server-logs", new SimpleStringSchema(), properties)
myConsumer.setStartFromEarliest()

val events = env
  .addSource(myConsumer)
  .name("incoming-events")

Detecting fraud and generating alert events

In this section we define how our incoming server log events are to be processed. For each serverLog event (remember it is being read in as a string of the format eventId,accountId,eventType,locationCountry,eventTimeStamp) we split the string using a comma and use the accountId as the key for the keyBy function which partitions our data stream by the accountId and sends events with the same accountId to the same task, this way we can ensure that the events generated by the same accountId always get sent to the same task. The task runs the FraudDetection logic, which we will see soon below. We also set up a FlinkKafkaProducer to push information about fraudulent logins into another Apache Kafka topic called alerts. We set the output from alerts to go to our alerts topic, using addSink.

Fraud detection data flow

//
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
//
//
//
val alerts: DataStream[String] = events
  .keyBy(event => event.split(",")(1))
  .process(new FraudDetection)
  .name("fraud-detector")

val myProducer = new FlinkKafkaProducer[String](
  "alerts", new SimpleStringSchema(), properties)

alerts
  .addSink(myProducer)
  .name("send-alerts")

Writing server logs to a PostgreSQL DB

In this section we add a custom data sink for our incoming events data stream. This data sink (as we will see later) will enrich the event data, chunk the incoming events into batches of 10000 events and insert them into a PostgreSQL DB.

events
  .addSink(new ServerLogSink)
  .name("event-log")

And finally we can execute the job as shown below

env.execute("Fraud Detection")

which creates the data pipeline’s data flow graph and lets the JobManager know to start the job. Without this part the code will not execute.

Fraud detection logic

We first create a FraudDetection.scala file at src/main/scala/com/startdataengineering to code our logic for fraud detection.

package com.startdataengineering

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector

import com.startdataengineering.model.ServerLog

class FraudDetection extends KeyedProcessFunction[String, String, String]{

  private var loginState: ValueState[java.lang.Boolean] = _
  private var prevLoginCountry: ValueState[java.lang.String] = _
  private var timerState: ValueState[java.lang.Long] = _

  @throws[Exception]
  override def open(parameters: Configuration): Unit = {
    val loginDescriptor = new ValueStateDescriptor("login-flag", Types.BOOLEAN)
    loginState = getRuntimeContext.getState(loginDescriptor)

    val prevCountryDescriptor = new ValueStateDescriptor("prev-country", Types.STRING)
    prevLoginCountry = getRuntimeContext.getState(prevCountryDescriptor)

    val timerStateDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
    timerState = getRuntimeContext.getState(timerStateDescriptor)
  }

  @throws[Exception]
  override def processElement(
                               value: String,
                               ctx: KeyedProcessFunction[String, String, String]#Context,
                               out: Collector[String]): Unit = {
    val logEvent: ServerLog = ServerLog.fromString(value)

    val isLoggedIn = loginState.value
    val prevCountry = prevLoginCountry.value

    if ((isLoggedIn != null) && (prevCountry != null)){
      if ((isLoggedIn == true) && (logEvent.eventType == "login")) {
        if (prevCountry != logEvent.locationCountry) {
          val alert: String = f"Alert eventID: ${logEvent.eventId}%s, " +
            f"violatingAccountId: ${logEvent.accountId}%d, prevCountry: ${prevCountry}%s, " +
            f"currentCountry: ${logEvent.locationCountry}%s"
          out.collect(alert)
        }
      }
    }
    else if (logEvent.eventType == "login"){
      loginState.update(true)
      prevLoginCountry.update(logEvent.locationCountry)

      // 5 * 60 * 1000L -> 5 min
      val timer = logEvent.eventTimeStamp + (5 * 60 * 1000L)
      ctx.timerService.registerProcessingTimeTimer(timer)
      timerState.update(timer)
    }
    if (logEvent.eventType == "log-out") {
      loginState.clear()
      prevLoginCountry.clear()

      val timer = timerState.value()
      if (timer != null){
        ctx.timerService.deleteProcessingTimeTimer(timer)
      }
      timerState.clear()
    }
  }

  @throws[Exception]
  override def onTimer(timestamp: Long,
                       ctx: KeyedProcessFunction[String, String, String]#OnTimerContext,
                       out: Collector[String]): Unit = {
    timerState.clear()
    loginState.clear()
    prevLoginCountry.clear()
  }
}

Let’s first go over the override methods and understand what they are. You can see we extend the KeyedProcessFunction class, with [String, String, String] types, these types denote the types of the key, the input type and the output type. When an object of our FraudDetection class is created

  1. The open method is executed first.
  2. Then for each input the processElement method gets executed.
  3. The onTimer method is a call back method that gets executed when a timer runs out, we will see how and where we set the timer in a following section.

In Apache Flink we can have local states, what this means is that we can store some data local to that task. Note that the size of this data is bounded by the space available in the disk and/or memory (depending on what kind of state backend we configure). These states are periodically and asynchronously snapshotted into a storage system (eg. HDFS).

Let’s go over our fraud detection logic one more time. We want to check if an account is logged into from a certain country and if we notice another login event for the same account from a different country within 5 min of the first login, we alert this as a fraudulent login. In order to detect this, we will need to remember the login country for each account for 5 min from the first login event.

Open

In the open overridden method we set 3 states, which are set using the ValueStateDescriptor which defines the name and types of these states. The states should only be modified by update() or clear methods to ensure use of Apache Flink’s fault tolerance guarantee.

private var loginState: ValueState[java.lang.Boolean] = _
private var prevLoginCountry: ValueState[java.lang.String] = _
private var timerState: ValueState[java.lang.Long] = _

@throws[Exception]
override def open(parameters: Configuration): Unit = {
  val loginDescriptor = new ValueStateDescriptor("login-flag", Types.BOOLEAN)
  loginState = getRuntimeContext.getState(loginDescriptor)

  val prevCountryDescriptor = new ValueStateDescriptor("prev-country", Types.STRING)
  prevLoginCountry = getRuntimeContext.getState(prevCountryDescriptor)

  val timerStateDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
  timerState = getRuntimeContext.getState(timerStateDescriptor)
}

The states are

  1. loginState This is boolean flag used to denote if the account(denoted by accountId) is already logged in.
  2. prevLoginCountry This is used to keep track of the originating country of the most recent login for an account.
  3. timerState This is used to keep track of the timer. For eg if a user logged in 10 min ago we should set the previous 2 states to empty since we are outside our 5 min time limit.

These states will only be created once when the task is instantiated.

processElement

This method defines how we process each individual element in the input data stream

@throws[Exception]
override def processElement(
                            value: String,
                            ctx: KeyedProcessFunction[String, String, String]#Context,
                            out: Collector[String]): Unit = {
  val logEvent: ServerLog = ServerLog.fromString(value)

  val isLoggedIn = loginState.value
  val prevCountry = prevLoginCountry.value

  if ((isLoggedIn != null) && (prevCountry != null)){
    if ((isLoggedIn == true) && (logEvent.eventType == "login")) {
      if (prevCountry != logEvent.locationCountry) {
        val alert: String = f"Alert eventID: ${logEvent.eventId}%s, " +
          f"violatingAccountId: ${logEvent.accountId}%d, prevCountry: ${prevCountry}%s, " +
          f"currentCountry: ${logEvent.locationCountry}%s"
        out.collect(alert)
      }
    }
  }
  else if (logEvent.eventType == "login"){
    loginState.update(true)
    prevLoginCountry.update(logEvent.locationCountry)

    // 5 * 60 * 1000L -> 5 min, time is expected in Long format
    val timer = logEvent.eventTimeStamp + (5 * 60 * 1000L)
    ctx.timerService.registerProcessingTimeTimer(timer)
    timerState.update(timer)
  }
  if (logEvent.eventType == "log-out") {
    loginState.clear()
    prevLoginCountry.clear()

    val timer = timerState.value()
    if (timer != null){
      ctx.timerService.deleteProcessingTimeTimer(timer)
    }
    timerState.clear()
  }
}

The above code does the following for each event.

  1. Uses the input string to get a server log event.
  2. Checks if the user is logged in, using the local state loginState.
  3. If the current event is a login type and the current country is different from the previous login country. Create an output event that contains details of the event id and account id which triggered the alert, the previous login country and the current login country. Goto step 5.
  4. Else If the event is a login type, then set the login state as true and previous login country to the country the login event is originating from. These states will be used for the next time this account’s event is checked. Also set a timer for 5 min from the time this login event was created, asynchronously after the 5 min the onTimer method will be triggered for that accountId. The processing of this event is done. Goto step 5.
  5. If the event is a logout type clear all the three states and stop the timer. The processing of this event is done.

Timer Callback

We can set timers and override the onTimer method to execute required logic once the timer runs out for that specific account.

@throws[Exception]
override def onTimer(timestamp: Long,
                      ctx: KeyedProcessFunction[String, String, String]#OnTimerContext,
                      out: Collector[String]): Unit = {
  timerState.clear()
  loginState.clear()
  prevLoginCountry.clear()
}

In our project we use timer to clear up the data of a logged in accountId 5 min after a login event is detected(note that a different country login within 5 min is our definition of a fraudulent event). We set the timer at registerProcessingTimeTimer in our process method. Note that same accountIds go to the same instance of the FraudDetection class.

There is a big issue with FraudDetection logic. Take some time to think about the possible scenarios, where it may fail or not work. We will go over these cases in the Design Review section. Hint: It has to do with how the input is consumed.

Writing server logs to PostgreSQL DB

Create a ServerLogSink.scala file at src/main/scala/com/startdataengineering. This file is used to define how we insert our incoming events into a PostgreSQL database.

package com.startdataengineering

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.DriverManager
import java.text.SimpleDateFormat
import java.util.TimeZone

import com.startdataengineering.model.ServerLog

class ServerLogSink extends RichSinkFunction[String] {

  private val INSERT_CASE = "INSERT INTO server_log " +
    "(eventId, userId, eventType, locationCountry, eventTimeStamp) " +
    "VALUES (?, ?, ?, ?, ?)"

  private val COUNTRY_MAP = Map(
    "USA" -> "United States of America",
    "IN" -> "India", "UK" -> "United Kingdom", "CA" -> "Canada",
    "AU" -> "Australia", "DE" -> "Germany", "ES" -> "Spain",
    "FR" -> "France", "NL" -> "New Zealand", "SG" -> "Singapore",
    "RU" -> "Russia", "JP" -> "Japan", "BR" -> "Brazil", "CN" -> "China",
    "O" -> "Other")

  private val dtFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss")
  dtFormat.setTimeZone(TimeZone.getTimeZone("UTC"))

  private var stmt: PreparedStatement = _
  private var conn: Connection = _
  private var batchSize: Int = 0

  @throws[Exception]
  override def invoke(entity: String, context: SinkFunction.Context): Unit = {
    val sl = ServerLog.fromString(entity)

    stmt.setString(1, sl.eventId)
    stmt.setInt(2, sl.accountId)
    stmt.setString(3, sl.eventType)
    stmt.setString(4, COUNTRY_MAP.getOrElse(sl.locationCountry, "Other"))
    stmt.setString(5, dtFormat.format(sl.eventTimeStamp * 1000L))
    stmt.addBatch()
    batchSize = batchSize + 1

    // write to DB once we have 10k events accumulated
    if(batchSize >= 10000) {
      stmt.executeBatch()
      batchSize = 0
    }

  }

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:postgresql://postgres:5432/events?user=startdataengineer&password=password")
    stmt = conn.prepareStatement(INSERT_CASE)
  }

  @throws[Exception]
  override def close(): Unit = {
    conn.close()
  }
}

Notice we are extending a RichSinkFunction class through which we can override the open, invoke and close methods. As you may have guessed, the open function is executed only once when the object is created, the invoke method is executed for each serverLog event in the data stream. The close method is invoked at the end.

The logic is as follows

  1. open a PostgreSQL DB JDBC connection in the open method and create a prepared statement which is used to insert data into the database.
  2. In the invoke method create a batch of statements(each of which are insert statements). Execute the batch after you have 10,000 insert statements. This is done to reduce making a database trip for each event.
  3. In the close method, close the opened connection from step 1.

Note that we are hardcoding a lot of connection parameters in our code. This is extremely bad and do not do this in a real project.

You maybe wondering, how the table server_log gets created in our database. We will see this in the next section.

Spin up local infrastructure

If you look at the docker-compose.yaml file you will see

  1. jobmanager: Flink component that is responsible for coordinating the execution of the job.
  2. taskmanager: Flink component that is responsible for executing the code.
  3. zookeeper: Required to coordinate Apache Kafka operation.
  4. kafka: Apache Kafka cluster for topics.
  5. postgres: RDBMS database to store our server log events.
  6. serverloggenerator: Simple JVM application to generate fake server log events.
  7. frauddetection: Client service used to submit a job to the Apache Flink cluster. Note the JOB_MANAGER_RPC_ADDRESS environment variable for connecting to the jobmanager.

Out of the above 7, containers 5,6 and 7 are custom ones. The rest are standard docker images for Apache Kafka and Apache Flink.

Postgres

Let’s look at the postgres section in docker-compose.yml. Note the volumes section which syncs our local dbsetup (which contains a create table server_log sql script) folder into the containers docker-entrypoint-initdb.d. This initialization script is executed once before starting the service ref.

postgres:
  image: postgres:12.2
  ports:
    - "5432:5432"
  environment:
    POSTGRES_DB: "events"
    POSTGRES_USER: "startdataengineer"
    POSTGRES_PASSWORD: "password"
  volumes:
    - ./dbsetup:/docker-entrypoint-initdb.d

server log generator

Let’s look at the serverloggenerator section in docker-compose.yml.

serverloggenerator:
  build: ./
  image: sde:1-FLINK-1.11-scala_2.11
  command: "java -classpath /opt/frauddetection.jar:/opt/flink/lib/* com.startdataengineering.ServerLogGenerator"
  depends_on:
    - kafka

The docker file used to build the sde:1-FLINK-1.11-scala_2.11 image from above is from dockerfile in the project repository

FROM maven:3.6-jdk-8-slim AS builder

COPY ./ /opt/frauddetection
WORKDIR /opt/frauddetection
RUN mvn clean install

FROM flink:1.11.0-scala_2.11

WORKDIR /opt/flink/bin

COPY --from=builder /opt/frauddetection/target/frauddetection-*.jar /opt/frauddetection.jar

From the dockerfile above, you can see that

  1. We use maven:3.6-jdk-8-slim which has Apache Maven and Java installed.
  2. We copy our local code base into the docker container (COPY ./ /opt/frauddetection).
  3. Run mvn clean install to get the jar (frauddetection-*.jar).
  4. Set the containers flink directory as the working directory
  5. Move the jar file to /opt/frauddetection.jar which we used in the serverloggenerator service within docker-compose to run ServerLogGenerator using the command java -classpath /opt/frauddetection.jar:/opt/flink/lib/* com.startdataengineering.ServerLogGenerator.

When we start this container, it will generate 100000 fake server log events and push them into the Apache Kafka topic.

fraud detection

This is a simple service to submit our jar to the Apache Flink cluster and tell it to run with a parallelism of 2 -p 2.

jobmanager and taskmanager

These are Apache Flink’s services, required for job coordination and execution. They use the configuration settings in ./conf directory to configure memory required, RPC address, log setting and a place to periodically checkpoint the state of the cluster.

Start all containers

We can start all the containers using

docker-compose up -d # -d mean run in detached mode (in the background)
docker ps # display all running containers

Manual checks

Lets do some simple checks to ensure our fraud detection job is working as expected. In your project directory run the following commands to check for message in the server-logs and alerts topic.

docker exec -t beginner_de_project_stream_kafka_1 kafka-console-consumer.sh --bootstrap-server :9092  --topic server-logs --from-beginning --max-messages 10 # used to check the first 10 messages in the server-logs  topic
docker exec -t beginner_de_project_stream_kafka_1 kafka-console-consumer.sh --bootstrap-server :9092  --topic alerts --from-beginning --max-messages 10 # used to check the first 10 messages in the alerts topic

We can use the pgcli tool to check if our data was loaded into server_log table, as shown below

pgcli -h localhost -p 5432 -U startdataengineer events
select * from server_log limit 5; -- should match the first 5 from the server-logs topic
select count(*) from server_log; -- 100000
\q -- to exit pgcli

Test cases

Let’s look at a case where we should detect fraud. Open two terminals and in the first one(let’s call this the alerts terminal), observe the alerts topic using

docker exec -t beginner_de_project_stream_kafka_1 kafka-console-consumer.sh --bootstrap-server :9092  --topic alerts --from-beginning

The above command will keep the consumer open and listen for any new events that get pushed into the alerts topic. In a second terminal(let’s call this the producer terminal), we can input server-log events. The event time is epoch time, use this converter for date time to epoch number conversion.

Note that if you enter a event of wrong format eg hello,world this will break the Apache Flink job and have to restart the containers. We will talk more about this in the design review section.

docker exec -it beginner_de_project_stream_kafka_1 kafka-console-producer.sh --broker-list :9092  --topic server-logs # can be used to enter messages manually
395805b6-7aee-409a-85cc-d2ffb7eb7ef2,9726,login,RU,1601736282
395805b6-7aee-409a-85cc-d2ffb7eb7ef2,9726,login,IN,1601736283

this should produce an alert event that looks like

Alert eventID: 395805b6-7aee-409a-85cc-d2ffb7eb7ef2, violatingAccountId: 9726, prevCountry: RU, currentCountry: IN

In the producer terminal input

395805b6-7aee-409a-85cc-111111111111,9727,login,RU,1601736282
395805b6-7aee-409a-85cc-222222222222,9728,login,IN,1601736283

this will not produce any event in the alerts terminal, as they have different account ids.

In the producer terminal input

395805b6-7aee-409a-85cc-333333333333,9729,login,RU,1601737420
395805b6-7aee-409a-85cc-444444444444,9729,login,IN,1601737840

this should not produce any event in the alerts terminal, but it does.

Alert eventID: 395805b6-7aee-409a-85cc-444444444444, violatingAccountId: 9729, prevCountry: RU, currentCountry: IN

We will see when this can happen in the design review section.

In the producer terminal input

395805b6-7aee-409a-85cc-555555555555,9730,login,RU,1601737420
395805b6-7aee-409a-85cc-666666666666,9730,log-out,IN,1601737840
395805b6-7aee-409a-85cc-777777777777,9730,login,IN,1601737840

this should not produce any output, since we are logging out. You can control + c to exit the producer and alerts terminal. You can also use your browser to go to http://localhost:8081/ which will show Apache Flink UI and you can look at the job and the tasks that are being run.

and finally to take down all running containers

docker-compose down

Helpful commands

docker-compose up --build --force-recreate --no-deps -d # recreate docker containers when there is a change made
docker build --tag sde:1-FLINK-1.11-scala_2.11 . # build a docker image from a local Dockerfile

Design review

Design Review

  1. Out of order or late arriving events: We made an assumption that the input events coming into FraudDetection will be in order. But in reality the events may be out of order ie) a logout event may come before a log in event. Apache Kafka assures at least once delivery, meaning your event will be delivered at least once, but it is also possible that it is delivered more than once and out of order. Late arriving events are always an issue with data processing systems. We can define watermarks to establish a tradeoff between completeness and correctness of the system. If 2 same account login events which are more than 5 min apart arrive within 5 minutes of each other due to late arriving events, our code would flag them as a fraudulent account, this is the wrong behavior. If you consider the case that when a log out happens we are supposed to clear the sate along with the fact that the events may be out of order or late, the logic we have in the code will not hold.
  2. The PostgreSQL DB connection is opened and kept open until closed. What happens if the connection drops?, How many such connections can be kept open simultaneously ? The tradeoffs need to be carefully evaluated when using a OLTP type database in a data streaming pipeline. Since we are saving the batch in task memory, it is not fault tolerant. How would be make it fault tolerant? (hint: think local state)
  3. In most event logging systems, its recommend to use a OLAP database. Usually the data is streamed into a distributed file system such as S3 or HDFS on top of which an external table is defined. This is to save on the expensive commit operation.
  4. The data pipeline will break if you enter a string that has lesser number of items as compared to the string representation of serverLog events. For eg) try inputing the string “one,two” into the input queue, this will break the job. The code needs to be robust enough to handle such errors.
  5. Schema evolution, In real systems the serverLog event schema may change, our system needs to be able to handle this. There are data serialization systems to deal with schema evolution, such as Apache Avro.
  6. When you are keeping local state loginState, prevLoginCountry, timerState be mindful of the total size it may require. You are bounded by the memory or disk size (depending on the local state storage type you chose).
  7. The fraud detection job always starts with the earliest message in the Apache Kafka server-logs topic. If the process restarts after having inserted some data into PostgreSQL DB the job will be processing duplicates due to the job reading from the earliest available message. How would you prevent this? An approach would be to keep track of the last message that was read from Apache Kafka.
  8. Job restarts are very common in distributed systems, if we want only non duplicates inserted into the PostgreSQL database, we can use upsert logic. Upsert stands for update or insert, it needs a key to figure out if a certain row is new(insert) or if an existing row needs to be updated. PostgreSQL DB has insert .. on conflict to handle such cases.
  9. This is a simple example, but you can imagine having a very complicated fraud detection logic. What happens if the data coming into the fraud detection task is very fast, but the processing of them is slow, how can the task handle all the incoming data? Will it get a out of memory error ?. In such cases Apache Flink handles it using BackPressure.
  10. Apache Flink has support for more complex pattern detection type applications using its Complex event processing library.
  11. Our project SLA requires that the fraudulent account be flagged in seconds, how can we ensure this if the size of incoming data increases 10X, 100X, etc? Will this involve just increasing the cluster size and parallelism of the tasks?

Next steps

Now that you have understood how to build a simple streaming data processing pipeline and the common issues associated with it, the next steps can be to have

  1. Infrastructure as code, In this project we are working locally with docker containers. To setup and deploy these services to a cloud provider(AWS, GCP, Azure, etc). We can use tools like Terraform and Jenkins for this.
  2. Stricter type checking with schema. In our example a simple change in the input string breaks our entire job.
  3. Handle schema evolution using Avro/ protobuf, etc.
  4. A OLAP table to store events in an external table, by sinking to a distributed file store.
  5. Handling of late arriving / out of order events.
  6. Writing test cases for the code.

Conclusion

Hope this article provides you a good understanding of how to write a stream data processing pipeline. Please let me know if you have any questions or comments in the comment section below.

references

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.11/
  2. https://tech.signavio.com/2017/postgres-flink-sink
  3. https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-docker.html