How to do Change Data Capture (CDC), using Singer

Introduction

Change data capture is a software design pattern used to track every change(update, insert, delete) to the data in a database. In most databases these types of changes are added to an append only log (Binlog in MySQL, Write Ahead Log in PostgreSQL). These logs can be used to track the data changes without having to query via the SQL interface.

Singer is an open source standard that formalizes the data schema for reading data from multiple types of input sources(aka tap) such as databases, csv files, SAAS apps and the data format for writing to different destinations(target) such as databases, text files, etc.

Singer has multiple taps and targets available and it’s relatively simple to write your own.

Why Change Data Capture

You may be wondering why someone would need to capture data changes in a specific database. The main reason is to move data between different types of databases. Some use cases are

  1. tracking data changes to feed into an elastic search index for text based querying.
  2. moving data changes from OLTP to OLAP without querying the OLTP tables directly (LOG_BASED load).
  3. creating audit logs, etc

Setup

MySQL to PG

Prerequisites

  1. docker
  2. pgcli
  3. Python3
  4. pip
  5. jq (optional)

In a directory of your choosing, create a folder singercdc. This will be the project folder for this post.

mkdir singercdc && cd singercdc
pip install tap-mysql
pip install singer-target-postgres

Virtual environment or docker is recommended, but for this simple example, direct installation should be sufficient.

Source setup

We will be using a MySQL database as our source database. Let’s spin up a MySQL docker container

docker run --name source-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=Password1234 -d mysql:8.0
# log into the docker machine
docker exec -it source-mysql bash
# give it 1-2 min for MySQL to start
mysql -h 127.0.0.1 -P 3306 -u root -p # password is Password1234

Now you are in your MySQL console. Let’s create a simple animals table and populate it with one row.

CREATE DATABASE source_db;
USE source_db;
DROP TABLE IF EXISTS animals;
CREATE TABLE IF NOT EXISTS animals (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(10),
    likes_getting_petted BOOLEAN
);
INSERT INTO animals (name, likes_getting_petted)
values ('porcupine', false);
\q -- to quit mysql client
exit -- to exit the docker container

In your project folder, create a src_config.json file to specify the MySQL connection parameters. The contents of this file should be as follows

{
  "host": "127.0.0.1",
  "port": "3306",
  "user": "root",
  "password": "Password1234"
}

Destination setup

We will use a PostgreSQL database as our destination data warehouse. Let’s start a PostgreSQL docker container and log into it as shown below.

docker run --name dest-pg -p 5432:5432 -e POSTGRES_USER=start_data_engineer -e POSTGRES_PASSWORD=Password1234 -e POSTGRES_DB=destination_db -d postgres:12.2

pgcli -h localhost -p 5432 -U start_data_engineer destination_db # password is Password1234

Let’s create a destination schema.

create schema destination;
\q --to quit pgcli

In your project folder create a dest_config.json file to specify the connection parameters for PostgreSQL. It must contain the following content.

{
  "postgres_host": "localhost",
  "postgres_port": 5432,
  "postgres_database": "destination_db",
  "postgres_username": "start_data_engineer",
  "postgres_password": "Password1234",
  "postgres_schema": "destination",
  "disable_collection": true
}

Your project folder should contain src_config.json and dest_config.json files.

Source, MySQL

There are 3 main ways in which we can extract data from the MySQL source, they are

  1. FULL_TABLE: Reads the entire table data from SQL. Eg): SELECT C1, C2 FROM table1;.
  2. INCREMENTAL: Reads table data from SQL using an ordered key. Eg): SELECT C1, C2 FROM table1 WHERE C1 >= last_read_C1;. Loads incrementally using the last read key.
  3. LOG_BASED: Reads data directly from the binlog. Loads incrementally using the last read log number. Logs are numbered sequentially in order.

We will use LOG_BASED approach in this example. The other 2 approaches will affect the database performance as they require querying via the SQL interface.

When you make changes to a table or its data in MySQL they are logged in a Binary Log. These logs are located at /var/lib/mysql inside the MySQL docker container. The database creates new binary log files(named binlog.number) as an individual file hits the log file size threshold. We can see the list of binary log files in binlog.index. We can use the mysqlbinlog util to read the binary logs as shown below.

docker exec -it source-mysql bash
cd /var/lib/mysql
ls binlog* # see the binary log files and the index file
mysqlbinlog -d source_db --base64-output=decode-rows --verbose binlog.000002 # we use the 2nd binlog, since the first one may be full
exit

BinLog

The Singer MySQL tap uses this library to read data from the binary log file and create events that can be fed to a target. In order to use the tap, we need to specify which tables to track using a properties.json file. We can generate this file using the command shown below.

tap-mysql --config src_config.json --discover  > properties.json

Edit the properties.json and add these two key value pairs to "selected": true, and "replication-method": "LOG_BASED" to the dictionary in streams which has the table_name value as animals. By default the tables are not selected for replication. The changes are shown below.

Properties

Now that we have the properties setup, we can run the tap to extract the animals table records. The jq is optional, we use it here to format our json output.

tap-mysql -c src_config.json --properties properties.json | grep record | jq .

You will see this record in the text:

{
    "type": "RECORD",
    "stream": "animals",
    "record": {
        "id": 1,
        "name": "porcupine",
        "likes_getting_petted": false
    },
    "version": 1609685450096,
    "time_extracted": "2021-01-03T14:50:50.215366Z"
}

Let’s insert a few records in to our animals table. In the source database, do the following

docker exec -it source-mysql bash
mysql -h 127.0.0.1 -P 3306 -u root -p # password is Password1234
USE source_db;
INSERT INTO animals (name, likes_getting_petted)
values ('bear', false);
INSERT INTO animals (name, likes_getting_petted)
values ('cow', true);
\q
exit

Now lets run the tap again with

tap-mysql -c src_config.json --properties properties.json | grep record | jq .

This time you will notice 3 records: porcupine, bear and cow. But we have already seen porcupine. If you want to do an incremental load you need to let the tap know which log event in the binlog to start from.

We can do this by creating a state file, which will keep track of the latest log read from the binlog. In subsequent loads we will use this state file and only read logs that come after that specific log. This way we only read the incremental data changes. We can create a state file as shown below.

tap-mysql -c src_config.json --properties properties.json | tail -1 | jq .value > state.json

If you did not install jq, you can copy the dictionary of the "value" key from the last line of the output.

The state.json file will contain the last read log position as shown below.

{
    "currently_syncing": null,
    "bookmarks": {
        "source_db-animals": {
            "version": 1609687092788,
            "log_file": "binlog.000002",
            "log_pos": 1798
        }
    }
}

Let’s insert a few more records in to our animals table to test incremental loads. In the source database, do the following

docker exec -it source-mysql bash
mysql -h 127.0.0.1 -P 3306 -u root -p # password is Password1234
USE source_db;
INSERT INTO animals (name, likes_getting_petted)
values ('dog', true),
    ('elephant', true),
    ('frog', false);
\q
exit

Now if we tap the source while specifying the state the tap will only read the new records.

tap-mysql -c src_config.json --properties properties.json --state state.json | grep record | jq .

Now you will only see the incremental records of dog, elephant and frog show up. You can use state file to keep track of the latest record that was read. You can either version it and have a script to pick the right version or naively use unix to overwrite state file as shown below.

tap-mysql -c src_config.json --properties properties.json --state state.json | tail -1 | jq .value > statetemp.json && mv statetemp.json state.json

tap-mysql -c src_config.json --properties properties.json --state state.json | grep record | jq . 
# no new record

CDC, MySQL => PostgreSQL

Now that we have the tap(MySQL) set up to work incrementally we can look at how the tap events are converted into DML statements for the target. This is as simple as running a unix pipe.

tap-mysql -c src_config.json --properties properties.json | target-postgres --config dest_config.json | tail -1 > tap_target_state.json

Let’s log into our PostgreSQL instance and check the table.

pgcli -h localhost -p 5432 -U start_data_engineer destination_db # password in Password1234
select * from destination.animals;

The table animals has been created in the destination database and the 6 rows have been inserted.

If you run the tap ... | target ... pipe again you will notice that the records get overwritten in the target. Let’s use the state functionality to only do incremental loads. First let’s insert some new records into MySQL as shown below

docker exec -it source-mysql bash
mysql -h 127.0.0.1 -P 3306 -u root -p # password is Password1234
USE source_db;
INSERT INTO animals (name, likes_getting_petted)
values ('cat', true),
    ('zebra', true),
    ('tiger', false);
\q
exit

Let’s setup incremental load to target as shown below.

tap-mysql -c src_config.json --properties properties.json --state tap_target_state.json | target-postgres --config dest_config.json | tail -1 > tap_target_state_tmp.json && mv tap_target_state_tmp.json tap_target_state.json

If you look at the _sdc_sequence column in the target table, you will see that the last 3 records have a different number compared to the previously loaded records.

pgcli -h localhost -p 5432 -U start_data_engineer destination_db # password in Password1234
select * from destination.animals;
\q

You can insert new records in the source and rerun the incremental load to target to test.

Once you are done experimenting you can stop the running docker containers as shown below.

docker stop source-mysql 
docker rm source-mysql
docker stop dest-pg
docker rm dest-pg

Pros and Cons

Let’s look at some pros and cons of this approach.

Pros

  1. Open source.
  2. Enables in order processing of change data events, as opposed to something like debezium on kafka which introduces event ordering issues.
  3. Streaming pattern from tap to target, keeping memory usage low.
  4. Creating taps and targets are simple.
  5. Can be scheduled with cron, Airflow, etc.

Cons

  1. Managing state involves a lot of extra work.
  2. Not all data types are supported, Ref.
  3. DDL statements are not supported.
  4. The tap and target run as a simple scripts and are not distributed. We can pipe the output to a Kafka topic, but this involves extra code.
  5. The data types between source and destination may vary.

In addition to this there are Tap and Target specific concerns.

Conclusion

To recap, we saw

  1. What CDC is.
  2. What Binlog in MySQL is.
  3. How Singer reads data change events from the binlog.
  4. How to manage a Singer CDC pipeline with state to do incremental loads.

Hope this article gives you a good idea of how you can use Singer to automate CDC. Although Singer is a bit rough around the edges it provides a good framework to build on top of. There are some open source projects like Pipelinewise and Meltano which uses Singer specs for EL. You can easily write an Airflow operator using Singer specs which keeps its state in a cloud storage this would be a good option as well.

Next time you are required to build a EL pipeline try out Singer and see if it can fit your use case, before writing your own logic. Let me know if you have any questions or comments in the comment section below.

References

  1. Binary Log
  2. Read Binlog
  3. Singer
  4. MySQL Tap
  5. PostgreSQL Target
  6. Python MySQL binlog replication