Replicating MySQL to Snowflake with Kafka and Debezium—Part One: Data Extraction

Posted in: Technical Track

What, Snowflake?

Yes, Snowflake. While my core skills are based on the Oracle database, lately I’ve been working more on platforms like BigQuery and Snowflake—”lately” being the last one-and-a-half years or so.

While this is my very first post related to Snowflake, there are probably more to come … like part two on this topic :).

What is this about?

I’ve been preparing for the SnowPro Core Certification and in the process it’s become clear that one of the main pain points customers face is data ingestion.

Snowflake is very versatile in this regards and offers different options to suit a wide range of scenarios.

For this POC (proof of concept) I chose a somewhat complex option that will result in a close to real-time data replication from MySQL 8 to Snowflake.

This is a simplified diagram of the proposed architecture (click to enlarge).

Diagram of the replication showing MySQL to Debezium to Kafka Broker to Snowflake connector to Snowflake database.

In this first part of the series I’ll demonstrate the steps I followed to set up the CDC (change data capture) data extraction from a MySQL 8 database into Kafka. The second part will, of course, show the second piece of the architecture writing data into Snowflake.

The environment

  1. OS: Ubuntu 20.04.2 LTS
  2. MySQL: Ver 8.0.24 for Linux on x86_64 (MySQL Community Server – GPL)
  3. ZooKeeper: Apache ZooKeeper, version 3.7.0 2021-03-17 09:46 UTC
  4. Kafka: 2.8.0
  5. Scala (included with Kafka): 2.8.0
  6. Debezium: 2.13 final
  7. Snowflake Kafka connector (OSS version): 1.5.2 (Maven)
  8. Snowflake: Enterprise edition (AWS)

Sources

I based my research mainly on these three links:

Action!

The very first thing is to install the required software. This means installing ZooKeeper, Kafka and Debezium connector for MySQL.

ZooKeeper installation

You can find the instructions for the installation in the getting started link here: https://zookeeper.apache.org/doc/r3.7.0/zookeeperStarted.html.

ZooKeeper requires JDK 1.8 or higher, so first install OpenJDK 11. I chose version 11 for no particular reason.

sudo apt install openjdk-11-jre-headless

Download ZooKeeper binaries and the sha256 checksum file.

jose@localhost:~$ wget https://ftp.cixug.es/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz -O apache-zookeeper-3.7.0-bin.tar.gz

jose@localhost:~$ wget https://downloads.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz.sha512

jose@localhost:~$ cat apache-zookeeper-3.7.0-bin.tar.gz.sha512

jose@localhost:~$ sha512sum apache-zookeeper-3.7.0-bin.tar.gz

#Compare that both values returned by the last two commands match exactly to verify that the binaries are fine and not tampered with before continuing.

Now proceed with the installation and configuration of ZooKeeper.

jose@localhost:~$ tar xzf apache-zookeeper-3.7.0-bin.tar.gz 

jose@localhost:~$ cd apache-zookeeper-3.7.0-bin/conf/

jose@localhost:~$ sudo mkdir /var/lib/zookeeper

jose@localhost:~$ grep -v ^# apache-zookeeper-3.7.0-bin/conf/zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181     # This port is used later so keep track of it if you change it

Starting ZooKeeper

Now that I’ve installed ZooKeeper I can start it to verify that everything works as expected. This is very important because of the number of components I’ll be installing for this POC. Validating each one as soon as possible saves me from a long and complex troubleshooting of the whole deployment.

jose@localhost:~$ sudo apache-zookeeper-3.7.0-bin/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: ../conf/zoo.cfg
Starting zookeeper ... STARTED

Here’s a quick check that it’s working:

jose@localhost:~$ apache-zookeeper-3.7.0-bin/bin/zkCli.sh -server 127.0.0.1:2181

/usr/bin/java
Connecting to 127.0.0.1:2181
2021-04-22 16:21:38,148 [myid:] - INFO  [main:Environment@98] - Client environment:zookeeper.version=3.7.0-e3704b390a6697bfdf4b0bef79e3da7a4f6bac4b, built on 2021-03-17 09:46 UTC
2021-04-22 16:21:38,168 [myid:] - INFO  [main:Environment@98] - Client environment:host.name=flask
(...)
2021-04-22 16:21:38,358 [myid:127.0.0.1:2181] - INFO  [main-SendThread(127.0.0.1:2181):ClientCnxn$SendThread@1005] - Socket connection established, initiating session, client: /127.0.0.1:59528, server: localhost/127.0.0.1:2181
JLine support is enabled
2021-04-22 16:21:38,467 [myid:127.0.0.1:2181] - INFO  [main-SendThread(127.0.0.1:2181):ClientCnxn$SendThread@1438] - Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x100008bb5170000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[zookeeper]
[zk: 127.0.0.1:2181(CONNECTED) 2] quit

WATCHER::

WatchedEvent state:Closed type:None path:null
2021-04-22 16:22:49,733 [myid:] - INFO  [main:ZooKeeper@1232] - Session: 0x100008bb5170000 closed
2021-04-22 16:22:49,733 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@570] - EventThread shut down for session: 0x100008bb5170000
2021-04-22 16:22:49,735 [myid:] - ERROR [main:ServiceUtils@42] - Exiting JVM with code 0

Installing and starting Kafka

I obtained the steps to install and start Kafka from the Kafka quick start guide: https://kafka.apache.org/quickstart.

jose@localhost:~$ wget https://ftp.cixug.es/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
jose@localhost:~$ tar xzf kafka_2.13-2.8.0.tgz

Review the default server configuration and note the zookeeper.connect parameter. It must match with the configuration used for ZooKeeper.

jose@localhost:~$ egrep -v '^[[:space:]]*$|^#' kafka_2.13-2.8.0/config/server.properties 
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

Make sure ZooKeeper is up—it should be if you followed the steps so far—and start Kafka:

jose@localhost:~$ kafka_2.13-2.8.0/bin/kafka-server-start.sh config/server.properties
(...)
[2021-04-22 16:39:58,981] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2021-04-22 16:39:58,987] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2021-04-22 16:39:58,990] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-04-22 16:39:59,055] INFO Updated cache from existing  to latest FinalizedFeaturesAndEpoch(features=Features{}, epoch=0). (kafka.server.FinalizedFeatureCache)
    [2021-04-22 16:39:59,115] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
    [2021-04-22 16:39:59,174] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
    [2021-04-22 16:39:59,192] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Starting socket server acceptors and processors (kafka.network.SocketServer)
    [2021-04-22 16:39:59,207] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
    [2021-04-22 16:39:59,208] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started socket server acceptors and processors (kafka.network.SocketServer)
    [2021-04-22 16:39:59,232] INFO Kafka version: 2.8.0 (org.apache.kafka.common.utils.AppInfoParser)
    [2021-04-22 16:39:59,233] INFO Kafka commitId: ebb1d6e21cc92130 (org.apache.kafka.common.utils.AppInfoParser)
    [2021-04-22 16:39:59,233] INFO Kafka startTimeMs: 1619102399208 (org.apache.kafka.common.utils.AppInfoParser)
    [2021-04-22 16:39:59,238] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
    [2021-04-22 16:39:59,463] INFO [broker-0-to-controller-send-thread]: Recorded new controller, from now on will use broker flask:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)

At this point I have ZooKeeper and Kafka up and running.

I also installed Kafka connect as part of this process.

For this POC I’ll be using the Kafka standalone connector. This differs from the distributed connector in the format of the configuration files, among other things. The former uses properties files, while the latter uses JSON files and requires a call to the REST API in order to add them.

This is a deviation from the documentation I’m using for Debezium configuration (see below) where a JSON file is demonstrated as a configuration example for a distributed Kafka Connector.

For the sake of simplicity, I’ll stick to the standalone connector.

You can use the “-daemon” parameter to start Kafka in the background once the installation has been determined correct.

jose@localhost:~$ bin/kafka-server-start.sh -daemon config/server.properties

Installing Debezium connector

These steps come from the Debezium install documentation: https://debezium.io/documentation/reference/1.5/install.html.

Download the connector for MySQL and decompress it into a specifically created directory

jose@localhost:~$ mkdir kafka_plugins
jose@localhost:~$ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.5.0.Final/debezium-connector-mysql-1.5.0.Final-plugin.tar.gz
jose@localhost:~$ tar xzf debezium-connector-mysql-1.5.0.Final-plugin.tar.gz --directory kafka_plugins

I’ll be using the Kafka standalone connector so I used a properties file instead of the provided JSON file for configuration. Continue reading for details.

Preparing the MySQL database for replication

I’ll assume that a MySQL8 database is up and running in your system and you have a user with enough grants to execute these steps.

Here, I’m following the steps included in the official Debezium documentation: https://debezium.io/documentation/reference/1.5/connectors/mysql.html#setting-up-mysql.

Creation of a test table
jose@localhost:[]> create database snowflake_source;
jose@localhost:[]> use database snowflake_source;
jose@localhost:[snowflake_source]> CREATE TABLE animals (
    ->      id MEDIUMINT NOT NULL AUTO_INCREMENT,
    ->      name CHAR(30) NOT NULL,
    ->      PRIMARY KEY (id)
    -> );
Query OK, 0 rows affected (0.03 sec)


jose@localhost:[snowflake_source]> INSERT INTO animals (name) VALUES
    ->     ('dog'),('cat'),('penguin'),
    ->     ('lax'),('whale'),('ostrich');
Query OK, 6 rows affected (0.01 sec)
Records: 6  Duplicates: 0  Warnings: 0


jose@localhost:[snowflake_source]> SELECT * FROM animals;
+----+---------+
| id | name    |
+----+---------+
|  1 | dog     |
|  2 | cat     |
|  3 | penguin |
|  4 | lax     |
|  5 | whale   |
|  6 | ostrich |
+----+---------+
6 rows in set (0.00 sec)
Creation of a replication user

These commands change slightly from the ones in the documentation as I’m using MySQL 8.

CREATE USER 'snowflakerep'@'localhost' IDENTIFIED BY 'SafePassword';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'snowflakerep'@'localhost';

Binary logging must be enabled for Debezium to work as it relies on it to capture the data changes. Given that I’m running this POC on MySQL 8, binary logging is enabled by default so there’s nothing to be done here:

mysql> show global variables where Variable_name='log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0,01 sec)

Other options mentioned in the documentation that I’m going to skip are:

  • Enabling Global Transaction Identifiers (GTDs). This would be necessary in a complex cluster MySQL setup which I don’t have in place.
  • Configuring session timeouts. This may be necessary if the initial snapshot times out due to the amount of data to replicate.
  • Enabling query log events. This is useful for troubleshooting and we won’t need it for now.
Changing MySQL time_zone to UTC to avoid time zone related issues

The first time I started the connector, once the setup was complete, I hit the following error message:

[2021-04-23 11:45:57,623] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:117)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 1 error(s):
Unable to connect: The server time zone value 'CEST' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the 'serverTimezone' configuration property) to use a more specifc time zone value if you want to utilize time zone support.
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
	at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
	at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:99)
	at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:114)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 1 error(s):
Unable to connect: The server time zone value 'CEST' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the 'serverTimezone' configuration property) to use a more specifc time zone value if you want to utilize time zone support.
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

After reviewing the documentation I decided that the fastest and easiest way to work around this issue was to change the time_zone of my MySQL database.
Of course, in a live production system this approach may not be valid but that’s out of scope of this blog post.

The following command executed in MySQL fixed the problem until the next restart of the MySQL daemon.

SET time_zone = '+0:00'; --Named TZ are not available because the time zone tables are not loaded in this testing DB.

I also added the configuration to my my.sql file for the change to be persistent over reboots.

[mysqld]
!includedir /etc/mysql/conf.d/
!includedir /etc/mysql/mysql.conf.d/
default-time-zone = "+00:00"

Debezium connector configuration

The installation provides a sample configuration file for the connector which I adapted to match my needs with the following content:

jose@localhost:~$ egrep -v '^[[:space:]]*$|^#' kafka_2.13-2.8.0/config/connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=~/kafka-plugins
topic.creation.enable=true

Note the two highlighted lines in the code block.

The first one tells the connector where to find our plugins, in this case Debezium.

The second one is an addition of mine which was not included in the sample file. The parameter topic.creation.enable allows the connector to create Kafka topics if they don’t exist in the broker. For simplicity I chose to enable this parameter rather than creating the topics in Kafka myself, but this is probably not a good idea in a real production system.

A second configuration file is required for the standalone connector. Now, in the documentation they provide a JSON configuration sample file. This file is meant to be used with Kafka connector in distributed mode, which is a best practice for Kafka connectors.

In my case, again for simplicity, I’m using the standalone connector so I’ll turn the JSON file into a properties file with the following details:

jose@localhost:~$ cat kafka_2.13-2.8.0/config/mysql-debezium.properties
name=mysql-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=localhost
database.port=3306
database.user=snowflakerep
database.password=SafePassword
database.server.name=snowflake_source
database.include.list=snowflake_source
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=dbhistory.mysql_source
include.schema.changes=true

I can now start the connector but one more caveat. In this POC I opted for running everything in a single VM with a single Kafka deployment. This means, at the time of this writing, whatever the number of connectors I bring up, they’ll all write to the same log file. This, of course, makes troubleshooting a bit messy so I opted for an approach that allows me to keep the log files separated for each connector.

This is how I bring up the Debezium MySQL connector:

jose@localhost:~$ nohup ./kafka_2.13-2.8.0/bin/connect-standalone.sh ./kafka_2.13-2.8.0/config/connect-standalone.properties ./kafka_2.13-2.8.0/config/mysql-debezium.properties > debezium_connector_`date "+%F_%H-%M"`.log 2>&1 &

Well done!

If you made it all the way down here, thank you! Also, congratulations for having your first part of the replication in place. If you found any issues or problems, please add a comment so I can improve the blog post and help others.

P.S.

I can’t close this without showing off a bit so, here it is.

With all the components I’ve discussed so far up and running, what we have is a system that captures data in the MySQL database and writes it into a Kafka topic, no matter whether we have a consumer for the topic or not.

So, to validate the installation so far, let’s see what it looks like. Once all the components are up and running we can use the following command to list the existing topics:

jose@localhost:~$ kafka_2.13-2.8.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
dbhistory.mysql_source
snowflake_source
snowflake_source.snowflake_source.animals

We have quite a few topics automatically created for us.

The dbhistory.mysql_source is internally used by the Debezium connector itself to track schema changes in the source system.

The snowflake_source topic is created to track DDL changes in the source. I’m not using it in this POC.

Finally, the snowflake_source.snowflake_source.animals is the topic that actually stores the CDC data for our table. Should we have multiple tables, a topic will be created for each one matching the table name.

With the topic name identified we can see its contents without consuming the messages with the following command:

jose@localhost:~$ ./kafka_2.13-2.8.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic snowflake_source.snowflake_source.animals

Note the use of –from-beginning. The information shown on the screen includes all the messages in the topic, which includes the snapshot of the table Debezium creates the very first time it’s started for a given table.

And this is what one of the messages looks like:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    }
                ],
                "optional": true,
                "name": "snowflake_source.snowflake_source.animals.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    }
                ],
                "optional": true,
                "name": "snowflake_source.snowflake_source.animals.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "table"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "server_id"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "gtid"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "file"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "pos"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "row"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "thread"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "query"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.mysql.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "snowflake_source.snowflake_source.animals.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "name": "puppy"
        },
        "source": {
            "version": "1.5.0.Final",
            "connector": "mysql",
            "name": "snowflake_source",
            "ts_ms": 1619618211737,
            "snapshot": "true",
            "db": "snowflake_source",
            "sequence": null,
            "table": "animals",
            "server_id": 0,
            "gtid": null,
            "file": "binlog.000057",
            "pos": 156,
            "row": 0,
            "thread": null,
            "query": null
        },
        "op": "r",
        "ts_ms": 1619618211755,
        "transaction": null
    }
}

If you made it all the way down here, well, thank you again and good luck with your replication. Should you face any issues, leave a comment or reach out to our sales people.

Please watch for part two of this series, which will be published soon.

email
Want to talk with an expert? Schedule a call with our team to get the conversation started.

About the Author

Data Project Engineer
First of all father, then husband and finally Oracle database consultant. I love technology in general and managing data in particular. Trying to learn one new thing every day.

2 Comments. Leave new

what is the meaning of “op” : “r” in the final message displayed at the end? and what are the other values and their meaning

Reply
Jose Rodriguez
June 16, 2021 5:36 am

It means “operation” = “read”, which indicates this particular message is part of the initial data snapshot.
For the meaning of the rest of the payload tags I suggest you review Debezium documentation: https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-schema-change-topic

Reply

Leave a Reply

Your email address will not be published. Required fields are marked *