Data Engineering Project: Stream Edition
Table of Contents
- Table of Contents
- Introduction
- Project description and requirements
- Infrastructure overview
- Design
- Prerequisites
- Code
- Spin up local infrastructure
- Design review
- Next steps
- Conclusion
- references
Introduction
With the data engineering field growing significantly, there has been a huge demand for engineers to expand beyond the traditional batch processing model. In most companies there are multiple batch processing data pipelines, but significantly lower number of stream processing data pipelines. This maybe partly due to the complexity of issues involved in getting stream processing to work as expected. This is changing with the introduction of easy to use and powerful stream processing frameworks such as Apache Flink
, Apache Spark
, etc. The main objective of this post is to answer questions like the following
What are some project ideas that I could start working on and gain some DE experience?
Are there any front-to-back tutorials for a basic DE project?
What is a good project to get DE experience for job interviews?
This article is designed to help you
- Understand how to create a data streaming pipeline with state (meaning the process can have some information about a previously processed data point).
- Understand the different components usually involved in a data streaming pipeline.
- Build a system top down and figure out where and why they can fail and understand how to avoid those common issues with distributed systems.
If you are interested in a batch processing project please check out Data Engineering Project for Beginners - Batch Edition
Project description and requirements
Let’s assume you work for a company that offers a paid website service with about 10 Million daily accounts. There has been some reports of multiple people in different countries sharing the same account id and password (this is a naive use case, a website with a good authentication system can usually detect this) and because of this fraudulent behavior the company you work for is loosing a lot of money. The company wants to detect these fraudulent accounts in real time as they are happening. The company also wants to analyze the different actions performed by the users of these accounts such as click, purchase, login, log-out, delete-account, create-account, update-settings and other actions
in real time.
Infrastructure overview
For this project we will assume our website server captures the account actions and pushes them into an Apache kafka
topic, which is treated as a data source for our data streaming pipeline. In our project we will be using Apache Flink
to detect the fraudulent account logins and we will send data about the fraudulent occurrences to another Apache Kafka
topic called alerts
. All the user data is to be stored in a PostgreSQL
database.
Apache Flink
“Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.” - Apache Flink website. The idea is that data flows (or streams) through the different processing tasks and each of those tasks can have a local state that is only available for that task, this is called stateful processing. This state can store values which can be used for processes that require a certain knowledge from a previously processed data. For eg) If you are reading from a source and partitioning based on a key (using keyBy()
), it will look like shown below ref
Check out Apache Flink architecture if you are unfamiliar with the architecture of Apache Flink
Apache Kafka
Check out this article for a quick Kafka introduction.
If you are green fielding a project it is advisable to design the system and then choose the appropriate tech stack.
Design
From the project requirements we can split the ask into 2 tasks, the detection of fraudulent accounts and logging every action an account takes into a database.
Detect fraudulent accounts
Since the requirement is real time
we have to build a data streaming system, that can detect if the same account is being logged into from multiple countries at the same time. The concept of same time across multiple computers is tricky to measure since the login recorded time depends on that user machine’s internal clocks. We can assume that we spoke with the project stakeholders and decided that if the same account is being logged into from a different country within 5 minutes of the first login, then it is decided that it is a fraudulent account.
Log account actions
We want to load the recorded account actions from the Apache Kafka topic into a database to be made available for analysis in real time. Let’s assume that after discussing with the stake holders that we decide a few seconds of delay is an acceptable SLA (service level agreement) for our project.
Prerequisites
You will need to install
- docker (make sure to have docker-compose as well)
- pgcli
to connect to our
postgres
instance - git to clone the starter repo
- Optional: tmux
Knowledge of any programming language. We will be using Scala
, with predominantly object oriented patterns, the code is relatively easy to follow if you are familiar with any OO programming language. You could easily translate this code to Python. We are going to be using docker to set up the required infrastructure locally. Please make sure you have at least 4GB designated to docker.
Code
You can use the start-here
branch from this repository
as shown below
git clone https://github.com/josephmachado/beginner_de_project_stream.git
cd beginner_de_project_stream
git checkout start-here
The project will have the following structure
Defining dependencies
This project uses Maven
, which is a build tool, that can be used to package your Java
or Scala
code into a jar. We will send this jar to the Apache Flink
cluster to be executed. For our use case we will look at only the parts of Maven that are crucial to know for this project. When you look at pom.xml
file, which is used by Maven in your project directory notice the sections
dependencies
: This section defines the different libraries that our code will need to use for its operations.build
: This section defines how our code gets packages into a jar file. Note that themainClass
defines the main function where the code execution is supposed to start.
Inheritance
In our code, we use a lot of inheritance (see extends
). This is a common pattern to ensure that our class has the necessary methods that are required for its operation. For eg) we will see in a later section how inheriting KeyedProcessFunction
and overriding its methods, allows us to write our own logic to process incoming data streams.
Server logs generator
In our project we assumed our website server will be generating account action events. We can simulate this using a simple server log generator. Under src/main/scala/com/startdataengineering
create a model
directory. Within this directory create a file ServerLog.scala
which will be the representation of an account action event. The file has the following code
Note: An event is a data point representing a single action. In our example, an account login action would be considered a login
event, etc.
package com.startdataengineering.model
case class ServerLog(
eventId: String,
accountId: Int,
eventType: String,
locationCountry: String,
eventTimeStamp: Long
) extends Serializable {
override def toString: String = f"$eventId%s,$accountId%s,$eventType%s,$locationCountry%s,$eventTimeStamp%s"
}
object ServerLog {
def fromString(value: String): ServerLog = {
val elements: Array[String] = value.split(",")
ServerLog(elements(0), elements(1).toInt, elements(2), elements(3), elements(4).toLong)
}
}
In the above code we define a case class
, which is a way to represent classes that are logical representation of data in Scala
. Note that it extends Serializable
which let’s us override the toString
method, where we define how to get the string representation of the data in the case class. The object ServerLog, defined in the same file is called a companion object
. It is used to create new instances of ServerLog
from a string representation. We will use this later in our Apache Flink consumer process to convert strings back to ServerLog
objects.
Under src/main/scala/com/startdataengineering
create a file called ServerLogGenerator.scala
to generate random server log events, with the following code
package com.startdataengineering
import java.time.Instant
import java.util.Properties
import java.util.UUID.randomUUID
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.util.Random
import com.startdataengineering.model.ServerLog
object ServerLogGenerator {
private val random = new Random
private val locationCountry: Array[String] = Array(
"USA", "IN", "UK", "CA", "AU", "DE", "ES", "FR", "NL", "SG", "RU", "JP", "BR", "CN", "O")
private val eventType: Array[String] = Array(
"click", "purchase", "login", "log-out", "delete-account", "create-account", "update-settings", "other")
def getServerLog(): ServerLog = {
val eventId = randomUUID().toString
val timestamp = Instant.now.getEpochSecond
val currentCountry: String = locationCountry(random.nextInt(locationCountry.length))
val currentEventType: String = eventType(random.nextInt(eventType.length))
val accountId = random.nextInt(10000)
ServerLog(eventId, accountId, currentEventType, currentCountry, timestamp)
}
def getProperties(): Properties = {
val props = new Properties()
props.put("bootstrap.servers", "kafka:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props
}
def main(args: Array[String]): Unit = {
val props = getProperties()
val topic: String = "server-logs"
val producer = new KafkaProducer[String, String](props)
var i = 0
while(i<100000) {
val log: ServerLog = getServerLog()
val record = new ProducerRecord[String, String](topic, log.eventId, log.toString)
producer.send(record)
i = i + 1
}
producer.close()
}
}
The code execution starts at the main
method, where we
- open a kafka producer
- create 100,000 random server log events
- convert them to strings
- send them as values to the topic
server-logs
, with the key being its eventId which is a UUID(universally unique identifier) - close the producer
We use Instant.now.getEpochSecond
to get the timestamp, this returns a Long
type number which represents the number of seconds that has passed since Jan 01 1970 ref
. If we use the local machines timestamp we may end up with multiple different time zones(since our users are in multiple countries). Epoch time provides an easy way to convert time to a number, without having to worry about time zones.
We are serializing the key and value using StringSerializer
, serialization is the process of converting object into bytes to be transferred efficiently over the network and later in the consumer side we will use string deserialization.
Defining data flow in Apache Flink
We first create a FraudDetectionJob.scala
file at src/main/scala/com/startdataengineering
, where we will define how the data flows through the processing tasks.
package com.startdataengineering
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
object FraudDetectionJob {
@throws[Exception]
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "kafka:9092")
val myConsumer = new FlinkKafkaConsumer[String](
"server-logs", new SimpleStringSchema(), properties)
myConsumer.setStartFromEarliest()
val events = env
.addSource(myConsumer)
.name("incoming-events")
val alerts: DataStream[String] = events
.keyBy(event => event.split(",")(1))
.process(new FraudDetection)
.name("fraud-detector")
val myProducer = new FlinkKafkaProducer[String](
"alerts", new SimpleStringSchema(), properties)
alerts
.addSink(myProducer)
.name("send-alerts")
events
.addSink(new ServerLogSink)
.name("event-log")
env.execute("Fraud Detection")
}
}
Let’s go over the code, one section at a time
Create a streaming environment
This is required to let Apache Flink know that this is a streaming data pipeline. There are other environments for batch and table based executions.
import org.apache.flink.streaming.api.scala._
//
//
//
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
Creating a consumer to read events from Apache Kafka
We do this using FlinkKafkaConsumer
, which is a Apache Flink connector to read data from a Kafka Topic. The properties
value defines the configuration settings to connect to Apache Kafka.
When creating the FlinkKafkaConsumer
, we specify the topic from which to read in data. In our case it is the server-logs
topic. We define the deserialization schema to deserialize the event data from Apache Kafka (remember that we used string serialization to produce the server log events). We configure our consumer to read from the very first event in that topic by using setStartFromEarliest
. After we have set up our consumer we read from it (addSource
) and create a variable representing a data stream of strings called events.
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
//
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
//
//
//
val properties = new Properties()
properties.setProperty("bootstrap.servers", "kafka:9092")
val myConsumer = new FlinkKafkaConsumer[String](
"server-logs", new SimpleStringSchema(), properties)
myConsumer.setStartFromEarliest()
val events = env
.addSource(myConsumer)
.name("incoming-events")
Detecting fraud and generating alert events
In this section we define how our incoming server log events are to be processed. For each serverLog event (remember it is being read in as a string of the format eventId,accountId,eventType,locationCountry,eventTimeStamp
) we split the string using a comma and use the accountId
as the key for the keyBy
function which partitions our data stream by the accountId and sends events with the same accountId to the same task, this way we can ensure that the events generated by the same accountId always get sent to the same task. The task runs the FraudDetection
logic, which we will see soon below. We also set up a FlinkKafkaProducer
to push information about fraudulent logins into another Apache Kafka topic called alerts
. We set the output from alerts to go to our alerts
topic, using addSink
.
//
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
//
//
//
val alerts: DataStream[String] = events
.keyBy(event => event.split(",")(1))
.process(new FraudDetection)
.name("fraud-detector")
val myProducer = new FlinkKafkaProducer[String](
"alerts", new SimpleStringSchema(), properties)
alerts
.addSink(myProducer)
.name("send-alerts")
Writing server logs to a PostgreSQL DB
In this section we add a custom data sink for our incoming events
data stream. This data sink (as we will see later) will enrich the event data, chunk the incoming events into batches of 10000 events and insert them into a PostgreSQL DB.
events
.addSink(new ServerLogSink)
.name("event-log")
And finally we can execute the job as shown below
env.execute("Fraud Detection")
which creates the data pipeline’s data flow graph and lets the JobManager know to start the job. Without this part the code will not execute.
Fraud detection logic
We first create a FraudDetection.scala
file at src/main/scala/com/startdataengineering
to code our logic for fraud detection.
package com.startdataengineering
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import com.startdataengineering.model.ServerLog
class FraudDetection extends KeyedProcessFunction[String, String, String]{
private var loginState: ValueState[java.lang.Boolean] = _
private var prevLoginCountry: ValueState[java.lang.String] = _
private var timerState: ValueState[java.lang.Long] = _
@throws[Exception]
override def open(parameters: Configuration): Unit = {
val loginDescriptor = new ValueStateDescriptor("login-flag", Types.BOOLEAN)
loginState = getRuntimeContext.getState(loginDescriptor)
val prevCountryDescriptor = new ValueStateDescriptor("prev-country", Types.STRING)
prevLoginCountry = getRuntimeContext.getState(prevCountryDescriptor)
val timerStateDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
timerState = getRuntimeContext.getState(timerStateDescriptor)
}
@throws[Exception]
override def processElement(
value: String,
ctx: KeyedProcessFunction[String, String, String]#Context,
out: Collector[String]): Unit = {
val logEvent: ServerLog = ServerLog.fromString(value)
val isLoggedIn = loginState.value
val prevCountry = prevLoginCountry.value
if ((isLoggedIn != null) && (prevCountry != null)){
if ((isLoggedIn == true) && (logEvent.eventType == "login")) {
if (prevCountry != logEvent.locationCountry) {
val alert: String = f"Alert eventID: ${logEvent.eventId}%s, " +
f"violatingAccountId: ${logEvent.accountId}%d, prevCountry: ${prevCountry}%s, " +
f"currentCountry: ${logEvent.locationCountry}%s"
out.collect(alert)
}
}
}
else if (logEvent.eventType == "login"){
loginState.update(true)
prevLoginCountry.update(logEvent.locationCountry)
// 5 * 60 * 1000L -> 5 min
val timer = logEvent.eventTimeStamp + (5 * 60 * 1000L)
ctx.timerService.registerProcessingTimeTimer(timer)
timerState.update(timer)
}
if (logEvent.eventType == "log-out") {
loginState.clear()
prevLoginCountry.clear()
val timer = timerState.value()
if (timer != null){
ctx.timerService.deleteProcessingTimeTimer(timer)
}
timerState.clear()
}
}
@throws[Exception]
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, String, String]#OnTimerContext,
out: Collector[String]): Unit = {
timerState.clear()
loginState.clear()
prevLoginCountry.clear()
}
}
Let’s first go over the override methods and understand what they are. You can see we extend the KeyedProcessFunction
class, with [String, String, String]
types, these types denote the types of the key, the input type and the output type. When an object of our FraudDetection
class is created
- The
open
method is executed first. - Then for each input the
processElement
method gets executed. - The
onTimer
method is a call back method that gets executed when a timer runs out, we will see how and where we set the timer in a following section.
In Apache Flink
we can have local states, what this means is that we can store some data local to that task. Note that the size of this data is bounded by the space available in the disk and/or memory (depending on what kind of state backend we configure). These states are periodically and asynchronously snapshotted into a storage system (eg. HDFS).
Let’s go over our fraud detection logic one more time. We want to check if an account is logged into from a certain country and if we notice another login event for the same account from a different country within 5 min of the first login, we alert this as a fraudulent login. In order to detect this, we will need to remember the login country for each account for 5 min from the first login event.
Open
In the open
overridden method we set 3 states, which are set using the ValueStateDescriptor
which defines the name and types of these states. The states should only be modified by update()
or clear
methods to ensure use of Apache Flink’s fault tolerance guarantee.
private var loginState: ValueState[java.lang.Boolean] = _
private var prevLoginCountry: ValueState[java.lang.String] = _
private var timerState: ValueState[java.lang.Long] = _
@throws[Exception]
override def open(parameters: Configuration): Unit = {
val loginDescriptor = new ValueStateDescriptor("login-flag", Types.BOOLEAN)
loginState = getRuntimeContext.getState(loginDescriptor)
val prevCountryDescriptor = new ValueStateDescriptor("prev-country", Types.STRING)
prevLoginCountry = getRuntimeContext.getState(prevCountryDescriptor)
val timerStateDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
timerState = getRuntimeContext.getState(timerStateDescriptor)
}
The states are
loginState
This is boolean flag used to denote if the account(denoted by accountId) is already logged in.prevLoginCountry
This is used to keep track of the originating country of the most recent login for an account.timerState
This is used to keep track of the timer. For eg if a user logged in 10 min ago we should set the previous 2 states to empty since we are outside our 5 min time limit.
These states will only be created once when the task is instantiated.
processElement
This method defines how we process each individual element in the input data stream
@throws[Exception]
override def processElement(
value: String,
ctx: KeyedProcessFunction[String, String, String]#Context,
out: Collector[String]): Unit = {
val logEvent: ServerLog = ServerLog.fromString(value)
val isLoggedIn = loginState.value
val prevCountry = prevLoginCountry.value
if ((isLoggedIn != null) && (prevCountry != null)){
if ((isLoggedIn == true) && (logEvent.eventType == "login")) {
if (prevCountry != logEvent.locationCountry) {
val alert: String = f"Alert eventID: ${logEvent.eventId}%s, " +
f"violatingAccountId: ${logEvent.accountId}%d, prevCountry: ${prevCountry}%s, " +
f"currentCountry: ${logEvent.locationCountry}%s"
out.collect(alert)
}
}
}
else if (logEvent.eventType == "login"){
loginState.update(true)
prevLoginCountry.update(logEvent.locationCountry)
// 5 * 60 * 1000L -> 5 min, time is expected in Long format
val timer = logEvent.eventTimeStamp + (5 * 60 * 1000L)
ctx.timerService.registerProcessingTimeTimer(timer)
timerState.update(timer)
}
if (logEvent.eventType == "log-out") {
loginState.clear()
prevLoginCountry.clear()
val timer = timerState.value()
if (timer != null){
ctx.timerService.deleteProcessingTimeTimer(timer)
}
timerState.clear()
}
}
The above code does the following for each event.
- Uses the input string to get a server log event.
- Checks if the user is logged in, using the local state
loginState
. - If the current event is a login type and the current country is different from the previous login country. Create an output event that contains details of the event id and account id which triggered the alert, the previous login country and the current login country. Goto step 5.
- Else If the event is a
login
type, then set the login state as true and previous login country to the country the login event is originating from. These states will be used for the next time this account’s event is checked. Also set a timer for 5 min from the time this login event was created, asynchronously after the 5 min theonTimer
method will be triggered for that accountId. The processing of this event is done. Goto step 5. - If the event is a
logout
type clear all the three states and stop the timer. The processing of this event is done.
Timer Callback
We can set timers and override the onTimer
method to execute required logic once the timer runs out for that specific account.
@throws[Exception]
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, String, String]#OnTimerContext,
out: Collector[String]): Unit = {
timerState.clear()
loginState.clear()
prevLoginCountry.clear()
}
In our project we use timer to clear up the data of a logged in accountId 5 min after a login event is detected(note that a different country login within 5 min is our definition of a fraudulent event). We set the timer at registerProcessingTimeTimer
in our process
method. Note that same accountIds go to the same instance of the FraudDetection
class.
There is a big issue with FraudDetection
logic. Take some time to think about the possible scenarios, where it may fail or not work. We will go over these cases in the Design Review
section. Hint: It has to do with how the input is consumed.
Writing server logs to PostgreSQL DB
Create a ServerLogSink.scala
file at src/main/scala/com/startdataengineering
. This file is used to define how we insert our incoming events into a PostgreSQL database.
package com.startdataengineering
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.DriverManager
import java.text.SimpleDateFormat
import java.util.TimeZone
import com.startdataengineering.model.ServerLog
class ServerLogSink extends RichSinkFunction[String] {
private val INSERT_CASE = "INSERT INTO server_log " +
"(eventId, userId, eventType, locationCountry, eventTimeStamp) " +
"VALUES (?, ?, ?, ?, ?)"
private val COUNTRY_MAP = Map(
"USA" -> "United States of America",
"IN" -> "India", "UK" -> "United Kingdom", "CA" -> "Canada",
"AU" -> "Australia", "DE" -> "Germany", "ES" -> "Spain",
"FR" -> "France", "NL" -> "New Zealand", "SG" -> "Singapore",
"RU" -> "Russia", "JP" -> "Japan", "BR" -> "Brazil", "CN" -> "China",
"O" -> "Other")
private val dtFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss")
dtFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
private var stmt: PreparedStatement = _
private var conn: Connection = _
private var batchSize: Int = 0
@throws[Exception]
override def invoke(entity: String, context: SinkFunction.Context): Unit = {
val sl = ServerLog.fromString(entity)
stmt.setString(1, sl.eventId)
stmt.setInt(2, sl.accountId)
stmt.setString(3, sl.eventType)
stmt.setString(4, COUNTRY_MAP.getOrElse(sl.locationCountry, "Other"))
stmt.setString(5, dtFormat.format(sl.eventTimeStamp * 1000L))
stmt.addBatch()
batchSize = batchSize + 1
// write to DB once we have 10k events accumulated
if(batchSize >= 10000) {
stmt.executeBatch()
batchSize = 0
}
}
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:postgresql://postgres:5432/events?user=startdataengineer&password=password")
stmt = conn.prepareStatement(INSERT_CASE)
}
@throws[Exception]
override def close(): Unit = {
conn.close()
}
}
Notice we are extending a RichSinkFunction
class through which we can override the open, invoke and close
methods. As you may have guessed, the open function is executed only once when the object is created, the invoke method is executed for each serverLog event in the data stream. The close
method is invoked at the end.
The logic is as follows
- open a PostgreSQL DB JDBC connection
in the
open
method and create a prepared statement which is used to insert data into the database. - In the invoke method create a batch of statements(each of which are insert statements). Execute the batch after you have 10,000 insert statements. This is done to reduce making a database trip for each event.
- In the close method, close the opened connection from step 1.
Note that we are hardcoding a lot of connection parameters in our code. This is extremely bad and do not do this in a real project.
You maybe wondering, how the table server_log
gets created in our database. We will see this in the next section.
Spin up local infrastructure
If you look at the docker-compose.yaml
file you will see
jobmanager
: Flink component that is responsible for coordinating the execution of the job.taskmanager
: Flink component that is responsible for executing the code.zookeeper
: Required to coordinate Apache Kafka operation.kafka
: Apache Kafka cluster for topics.postgres
: RDBMS database to store our server log events.serverloggenerator
: Simple JVM application to generate fake server log events.frauddetection
: Client service used to submit a job to the Apache Flink cluster. Note the JOB_MANAGER_RPC_ADDRESS environment variable for connecting to the jobmanager.
Out of the above 7, containers 5,6 and 7 are custom ones. The rest are standard docker images for Apache Kafka
and Apache Flink
.
Postgres
Let’s look at the postgres
section in docker-compose.yml. Note the volumes section which syncs our local dbsetup
(which contains a create table server_log sql script) folder into the containers docker-entrypoint-initdb.d
. This initialization script is executed once before starting the service ref
.
postgres:
image: postgres:12.2
ports:
- "5432:5432"
environment:
POSTGRES_DB: "events"
POSTGRES_USER: "startdataengineer"
POSTGRES_PASSWORD: "password"
volumes:
- ./dbsetup:/docker-entrypoint-initdb.d
server log generator
Let’s look at the serverloggenerator
section in docker-compose.yml.
serverloggenerator:
build: ./
image: sde:1-FLINK-1.11-scala_2.11
command: "java -classpath /opt/frauddetection.jar:/opt/flink/lib/* com.startdataengineering.ServerLogGenerator"
depends_on:
- kafka
The docker file used to build the sde:1-FLINK-1.11-scala_2.11
image from above is from dockerfile
in the project repository
FROM maven:3.6-jdk-8-slim AS builder
COPY ./ /opt/frauddetection
WORKDIR /opt/frauddetection
RUN mvn clean install
FROM flink:1.11.0-scala_2.11
WORKDIR /opt/flink/bin
COPY --from=builder /opt/frauddetection/target/frauddetection-*.jar /opt/frauddetection.jar
From the dockerfile above, you can see that
- We use
maven:3.6-jdk-8-slim
which has Apache Maven and Java installed. - We copy our local code base into the docker container (
COPY ./ /opt/frauddetection
). - Run
mvn clean install
to get the jar (frauddetection-*.jar
). - Set the containers
flink
directory as the working directory - Move the jar file to
/opt/frauddetection.jar
which we used in theserverloggenerator
service within docker-compose to run ServerLogGenerator using the commandjava -classpath /opt/frauddetection.jar:/opt/flink/lib/* com.startdataengineering.ServerLogGenerator
.
When we start this container, it will generate 100000 fake server log events and push them into the Apache Kafka topic.
fraud detection
This is a simple service to submit our jar to the Apache Flink cluster and tell it to run with a parallelism of 2 -p 2
.
jobmanager and taskmanager
These are Apache Flink’s services, required for job coordination and execution. They use the configuration settings in ./conf
directory to configure memory required, RPC address, log setting and a place to periodically checkpoint the state of the cluster.
Start all containers
We can start all the containers using
docker-compose up -d # -d mean run in detached mode (in the background)
docker ps # display all running containers
Manual checks
Lets do some simple checks to ensure our fraud detection job is working as expected. In your project directory run the following commands to check for message in the server-logs
and alerts
topic.
docker exec -t beginner_de_project_stream_kafka_1 kafka-console-consumer.sh --bootstrap-server :9092 --topic server-logs --from-beginning --max-messages 10 # used to check the first 10 messages in the server-logs topic
docker exec -t beginner_de_project_stream_kafka_1 kafka-console-consumer.sh --bootstrap-server :9092 --topic alerts --from-beginning --max-messages 10 # used to check the first 10 messages in the alerts topic
We can use the pgcli
tool to check if our data was loaded into server_log
table, as shown below
pgcli -h localhost -p 5432 -U startdataengineer events
select * from server_log limit 5; -- should match the first 5 from the server-logs topic
select count(*) from server_log; -- 100000
\q -- to exit pgcli
Test cases
Let’s look at a case where we should detect fraud. Open two terminals and in the first one(let’s call this the alerts
terminal), observe the alerts topic using
docker exec -t beginner_de_project_stream_kafka_1 kafka-console-consumer.sh --bootstrap-server :9092 --topic alerts --from-beginning
The above command will keep the consumer open and listen for any new events that get pushed into the alerts
topic. In a second terminal(let’s call this the producer
terminal), we can input server-log events. The event time is epoch time, use this converter
for date time to epoch number conversion.
Note that if you enter a event of wrong format eg hello,world
this will break the Apache Flink job and have to restart the containers. We will talk more about this in the design review section.
docker exec -it beginner_de_project_stream_kafka_1 kafka-console-producer.sh --broker-list :9092 --topic server-logs # can be used to enter messages manually
395805b6-7aee-409a-85cc-d2ffb7eb7ef2,9726,login,RU,1601736282
395805b6-7aee-409a-85cc-d2ffb7eb7ef2,9726,login,IN,1601736283
this should produce an alert event that looks like
Alert eventID: 395805b6-7aee-409a-85cc-d2ffb7eb7ef2, violatingAccountId: 9726, prevCountry: RU, currentCountry: IN
In the producer terminal input
395805b6-7aee-409a-85cc-111111111111,9727,login,RU,1601736282
395805b6-7aee-409a-85cc-222222222222,9728,login,IN,1601736283
this will not produce any event in the alerts terminal, as they have different account ids.
In the producer terminal input
395805b6-7aee-409a-85cc-333333333333,9729,login,RU,1601737420
395805b6-7aee-409a-85cc-444444444444,9729,login,IN,1601737840
this should not produce any event in the alerts terminal, but it does.
Alert eventID: 395805b6-7aee-409a-85cc-444444444444, violatingAccountId: 9729, prevCountry: RU, currentCountry: IN
We will see when this can happen in the design review section.
In the producer terminal input
395805b6-7aee-409a-85cc-555555555555,9730,login,RU,1601737420
395805b6-7aee-409a-85cc-666666666666,9730,log-out,IN,1601737840
395805b6-7aee-409a-85cc-777777777777,9730,login,IN,1601737840
this should not produce any output, since we are logging out. You can control + c
to exit the producer and alerts terminal. You can also use your browser to go to http://localhost:8081/
which will show Apache Flink UI and you can look at the job and the tasks that are being run.
and finally to take down all running containers
docker-compose down
Helpful commands
docker-compose up --build --force-recreate --no-deps -d # recreate docker containers when there is a change made
docker build --tag sde:1-FLINK-1.11-scala_2.11 . # build a docker image from a local Dockerfile
Design review
Out of order or late arriving events
: We made an assumption that the input events coming intoFraudDetection
will be in order. But in reality the events may be out of order ie) a logout event may come before a log in event. Apache Kafka assuresat least once
delivery, meaning your event will be delivered at least once, but it is also possible that it is delivered more than once and out of order. Late arriving events are always an issue with data processing systems. We can define watermarks to establish a tradeoff between completeness and correctness of the system. If 2 same account login events which are more than 5 min apart arrive within 5 minutes of each other due to late arriving events, our code would flag them as a fraudulent account, this is the wrong behavior. If you consider the case that when a log out happens we are supposed to clear the sate along with the fact that the events may be out of order or late, the logic we have in the code will not hold.- The PostgreSQL DB connection is opened and kept open until closed. What happens if the connection drops?, How many such connections can be kept open simultaneously ? The tradeoffs need to be carefully evaluated when using a OLTP type database in a data streaming pipeline. Since we are saving the batch in task memory, it is not fault tolerant. How would be make it fault tolerant? (hint: think local state)
- In most event logging systems, its recommend to use a OLAP database. Usually the data is streamed into a distributed file system such as S3 or HDFS on top of which an external table is defined. This is to save on the expensive commit operation.
- The data pipeline will break if you enter a string that has lesser number of items as compared to the string representation of serverLog events. For eg) try inputing the string “one,two” into the input queue, this will break the job. The code needs to be robust enough to handle such errors.
Schema evolution
, In real systems the serverLog event schema may change, our system needs to be able to handle this. There are data serialization systems to deal with schema evolution, such as Apache Avro.- When you are keeping local state
loginState, prevLoginCountry, timerState
be mindful of the total size it may require. You are bounded by the memory or disk size (depending on the local state storage type you chose). - The fraud detection job always starts with the earliest message in the Apache Kafka
server-logs
topic. If the process restarts after having inserted some data into PostgreSQL DB the job will be processing duplicates due to the job reading from the earliest available message. How would you prevent this? An approach would be to keep track of the last message that was read from Apache Kafka. - Job restarts are very common in distributed systems, if we want only non duplicates inserted into the
PostgreSQL
database, we can useupsert
logic.Upsert
stands for update or insert, it needs a key to figure out if a certain row is new(insert) or if an existing row needs to be updated. PostgreSQL DB has insert .. on conflict to handle such cases. - This is a simple example, but you can imagine having a very complicated fraud detection logic. What happens if the data coming into the fraud detection task is very fast, but the processing of them is slow, how can the task handle all the incoming data? Will it get a out of memory error ?. In such cases Apache Flink handles it using BackPressure .
- Apache Flink has support for more complex pattern detection type applications using its Complex event processing library.
- Our project SLA requires that the fraudulent account be flagged in seconds, how can we ensure this if the size of incoming data increases 10X, 100X, etc? Will this involve just increasing the cluster size and parallelism of the tasks?
Next steps
Now that you have understood how to build a simple streaming data processing pipeline and the common issues associated with it, the next steps can be to have
- Infrastructure as code, In this project we are working locally with docker containers. To setup and deploy these services to a cloud provider(AWS, GCP, Azure, etc). We can use tools like Terraform and Jenkins for this.
- Stricter type checking with schema. In our example a simple change in the input string breaks our entire job.
- Handle schema evolution using Avro/ protobuf, etc.
- A OLAP table to store events in an external table, by sinking to a distributed file store.
- Handling of late arriving / out of order events.
- Writing test cases for the code.
Conclusion
Hope this article provides you a good understanding of how to write a stream data processing pipeline. Please let me know if you have any questions or comments in the comment section below.