Kafka to SQL - easy & dirty, done


graph LR Kafka -->|events| SQL[SQL Database]

Kafka messaging system is commonly used for communication between services.

Often times, we need to get messages from Kafka into some materialized form, i.e. SQL database, for analytical purposes. In this article, we discuss a way to achieve this in a very simple and quick way, without the need to deploy new frameworks.

When your use-case:

  • handles low volume of data, lets say at most hundred messages per second
  • analytical SQL database is not required to have the data in near real-time manner, minute of a delay is not an issue

You may consider our approach as valid solution, avoiding coding and deploying:

  • Kafka Consumer/Kafka Connect
  • Beam/Flink/Kafka Streams/Spark Structured Streaming

saving a lot of effort maintaining these deployments.

Curious how? Continue reading.

Introduction

When it comes to transferring data from Kafka to relational database system, such as MariaDB/MySQL or PostgreSQL, we have couple of options. But before we commit to a solution, lets make feasibility assessment together.

Lets imagine we are tasked to transfer data from a Kafka topic, with JSON messages, for example about purchases on our web page. Such a message could look like:

{
  "order_id": "123456789",
  "customer_id": "987654321",
  "product_id": "p001",
  "total": "39.98",
  "order_date": "2023-12-08T12:30:00Z"
}

Capacity Planning

Lets make capacity planning for a couple of scenarios.

Messages per second Bytes per second
10 1KB
100 10KB
1 000 100KB
10 000 1MB
100 000 10MB
1 000 000 100MB

We are getting to a limit of a single machine, lets stop here. If our use-case fits into one machine, we might actually avoid deploying cluster handling solutions.

Also, if we do not need real-time, but for example 1 minute lateness of the data is fine, we may actually run a piece of code on a single machine, to fill in last minute of data into database. And run it every minute.

We will design the ETL job, as per diagram:

graph LR Kafka -->|events| job[ETL Job] job --> SQL[SQL Database]

Kafka Offsets

Let us examine how the ETL job could work. We would need just three basic steps.

  1. Figure out the offset to start reading from Kafka, this is usually part of consumer group protocol
  2. Obtain some events from Kafka since the stored offset
  3. Publish the batch into SQL database

And in the next run, just pick up, where left. The basic code of the ETL job is:

# Get Kafka consumer
consumer = Consumer(group_id='orders_to_database')
consumer.subscribe(['orders'])

# Connect to the database
connection = mysql.connector.connect()
cursor = connection.cursor()

# Poll for messages in batches (max 10 messages per poll)
events = consumer.poll(max_records=10, timeout=1000) # Step 1

# Publish to database
# Use a prepared statement for INSERT
insert_query = """INSERT INTO orders (order_id, customer_id, product_id, total, order_date )
                  VALUES (%s, %s, %s, %s, %s);"""

# Execute the batch of INSERT statements
cursor.executemany(insert_query, events) # Step 2

# Commit the changes
connection.commit() # Step 3

# Disconnect nicely
connection.close() # Step 4
consumer.close()

Lets describe how this works.

  1. In first step, we get a batch of at most 10 events from Kafka (limit chosen arbitrarily, not important at the moment).
  2. In second step, we prepare and send insert statements to the database
  3. In the third step, we commit database transaction.

When we start the job for the first time, the starting offsets are from 0 (beginning). But the algorithm has one problem.

Problems with auto-commit

The solution relies on auto commit in Kafka Consumer. It can’t be seen from the example, but the default value for argument enable_auto_commit is True, so in fact, we do:

    consumer = Consumer(group_id='orders_to_database', enable_auto_commit=True)

and depending on the interval auto_commit_interval_ms which defaults to 5000, offsets commits are triggered in the background.

This is a rather risky choice when it comes to reliability. What happens if the script crashes, lets say in step 2 and offsets are already committed to Kafka?

Lets model the example:

  1. In step 1, we take 10 events, offsets 0 to 9.
  2. Step 2, we create INSERT statements, somewhere in the background, auto commit sent commit to Kafka, that consumer group orders_to_database new offset position is 10.
  3. Step 3 crashes.

The sequence of what happened is depicted in figure:

sequenceDiagram Kafka->>ETL: 10 events, offset 0 to 9 activate Kafka ETL->>Kafka: Commit offset 10 deactivate Kafka ETL->>Database: Insert 10 rows activate Database Note over ETL: Crash ETL ETL-->>Database: Commit 10 rows Database-->>ETL: Transaction commit deactivate Database

When we restart the ETL job:

  1. In step 1, we take 10 events from Kafka, but using orders_to_database consumer group, therefore we get events since offset 10, events with offsets 10 to 19.
  2. Step 2, we create INSERT statements
  3. Step 3, we commit transaction on SQL database

In database we now have only events 10 to 19. Events 0 to 9 are lost. Our algorithm is causing data loss.

Problems with manual offset commit

Fixing and altering the algorithm with manual offset commit, we can put the offset commit just after we commit to the SQL database:

# Get Kafka consumer
consumer = Consumer(group_id='orders_to_database', enable_auto_commit=False) # We change to **not use auto commit**
consumer.subscribe(['orders'])

# Connect to the database
connection = mysql.connector.connect()
cursor = connection.cursor()

# Poll for messages in batches (max 10 messages per poll)
events = consumer.poll(max_records=10, timeout=1000) # Step 1

# Publish to database
# Use a prepared statement for INSERT
insert_query = """INSERT INTO orders (order_id, customer_id, product_id, total, order_date )
                  VALUES (%s, %s, %s, %s, %s);"""

# Execute the batch of INSERT statements
cursor.executemany(insert_query, events) # Step 2

# Commit the changes
consumer.commit() # Step 3a - commit Kafka
connection.commit() # Step 3b - commit Database

# Disconnect nicely
connection.close() # Step 4
consumer.close()

Repeating our simulated crash, lets put a crash just after Step 3a, commiting to Kafka:

  1. In step 1, we take 10 events, offsets 0 to 9.
  2. Step 2, we create INSERT statements, somewhere in the background, auto commit sent commit to Kafka, that consumer group orders_to_database new offset position is 10.
  3. Step 3a commits to Kafka
  4. Crash, commit to Database never happens

Situation depicted in a figure:

sequenceDiagram Kafka->>ETL: 10 events, offset 0 to 9 activate Kafka ETL->>Database: Insert 10 rows activate Database ETL->>Kafka: Commit offset 10 deactivate Kafka Note over ETL: Crash ETL ETL-->>Database: Commit 10 rows Database-->>ETL: Transaction commit deactivate Database

Not exactly an improvement. And it does not even have to be ETL crashing, it is sufficient that Database does a rollback on our transaction, see:

sequenceDiagram Kafka->>ETL: 10 events, offset 0 to 9 activate Kafka ETL->>Database: Insert 10 rows activate Database ETL->>Kafka: Commit offset 10 deactivate Kafka ETL->>Database: Commit 10 rows Note over Database: Rollback Database->>ETL: Transaction rollback deactivate Database Note over ETL: Now what?

Even if we put whole action into try-catch-finally block, where we commit to both Kafka and Database:

# Get Kafka consumer
consumer = Consumer(group_id='orders_to_database', enable_auto_commit=False) # We change to **not use auto commit**
consumer.subscribe(['orders'])

# Connect to the database
connection = mysql.connector.connect()
cursor = connection.cursor()

try:
  # Poll for messages in batches (max 10 messages per poll)
  events = consumer.poll(max_records=10, timeout=1000) # Step 1

  # Publish to database
  # Use a prepared statement for INSERT
  insert_query = """INSERT INTO orders (order_id, customer_id, product_id, total, order_date )
                    VALUES (%s, %s, %s, %s, %s);"""

  # Execute the batch of INSERT statements
  cursor.executemany(insert_query, events) # Step 2

  # Commit the changes
  consumer.commit() # Step 3a - commit Kafka
  connection.commit() # Step 3b - commit Database
except:  
  connection.rollback()
finally:
  # Disconnect nicely
  connection.close() # Step 4
  consumer.close()

We can still crash in between Step 3a and 3b, for example on out of memory error.

Ok, we could switch the Step 3a and Step 3b:

# Commit the changes
connection.commit() # Step 3b - commit Database
consumer.commit() # Step 3a - commit Kafka

To first commit to Database and then to Kafka.

  1. In step 1, we take 10 events, offsets 0 to 9.
  2. Step 2, we create INSERT statements, somewhere in the background, auto commit sent commit to Kafka, that consumer group orders_to_database new offset position is 10.
  3. Step 3b commits to Database
  4. Crash, commit to Kafka never happens
sequenceDiagram Kafka->>ETL: 10 events, offset 0 to 9 activate Kafka ETL->>Database: Insert 10 rows activate Database ETL->>Database: Commit 10 rows Database->>ETL: Transaction commit deactivate Database Note over ETL: Crash ETL ETL-->>Kafka: Commit offset 10 deactivate Kafka

Now we ended having data in the Database, but when we restart the ETL job, we consume same 10 events again. Either we end-up creating duplicates in the Database, but if there are some constraints regarding UNIQUE column combinations, we might as well not be able to proceed at all.

Offsets In Destination

So it all boils down to a problem, that we can’t maintain distributed transaction across Database and Kafka. If we would have one, we can do commit on both at the same time.

Fact, that such a transaction system does not exist, is probably result of performance. It would be huge performance penalty to implement 2-phase commit on top of many disparate systems. We may choose any Database and messaging combination, and who would maintain such software. If you are interested in more thoughts on this problem, refer to the Consistency and Consensus in the great book Designing Data-Intensive Applications by Martin Kleppmann.

But there is a solution for our ETL script. We may ignore Kafka consumer group protocol and offset commiting completely. We build the offset management ourselves and to keep data and offset position consistent, either we have both events and updated offsets, or nothing, we use the Database. Yes, the database, it has transactions to rescue us.

Offsets With Data in One Table

To manage the script progress, we can just glue offset with the data itself. In Kafka, the address of the message, or its primary key is triplet:

(topic, partition, offset)

Having this 3 values, you can request any exact message from Kafka, that is the only primary key, when we would on Kafka as a database. When creating the destination table for our orders, we can include these 3 columns:

CREATE TABLE orders (
  order_id VARCHAR(20) PRIMARY KEY,
  customer_id VARCHAR(20),
  product_id VARCHAR(20),
  total DECIMAL(10, 2),
  order_date TIMESTAMP,
  topic VARCHAR(20),
  partition INT,
  offset INT
  )

When our script starts, we supply None as the consumer group name, which turns off the consumer group protocol in Python library. Apparently, in KafkaJS, this can’t be turned off, but you can create a new unique consumer group name every time you start.

We commit our offsets to the SQL database, how do we retrieve them, when the ETL job is running again according to cron? Fortunately, the offset is ever growing integer (per topic-partition), we can just ask SQL database to retrieve the maximum offset per partition:

SELECT topic, partition, MAX(offset) FROM orders GROUP by topic, partition

Lets alter our script putting the pieces together:

# Connect to the database
connection = mysql.connector.connect()
cursor = connection.cursor()

# Obtain offsets from the database
cursor.execute("""SELECT topic, partition, MAX(offset) FROM orders GROUP by topic, partition""")
rows = cursor.fetchall()

# Transform rows into TopicPartition together with offset for Kafka library
offset_map = {
    TopicPartition(row['topic'], row['partition']): row['offset']
    for row in data
}

# Get Kafka consumer
consumer = Consumer(group_id=None, enable_auto_commit=False) #  Do not use auto commit, do not use consumer group protocol
consumer.assign(list(offset_map.keys()))
for topic_partition, offset in offset_map.items():
    consumer.seek(topic_partition, offset)

try:
  # Poll for messages in batches (max 10 messages per poll)
  events = consumer.poll(max_records=10, timeout=1000) # Step 1

  # Publish to database
  # Use a prepared statement for INSERT
  insert_query = """INSERT INTO orders (order_id, customer_id, product_id, total, order_date, topic, partition, offset )
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s);"""

  # Execute the batch of INSERT statements
  cursor.executemany(insert_query, events) # Step 2 TODO transformation

  # Commit the changes
  connection.commit() # Step 3 - commit Database
except:  
  connection.rollback()
finally:
  # Disconnect nicely
  connection.close() # Step 4
  consumer.close()

Now there is just one commit and one transaction, the Database transaction. We either get all - data and offsets, or nothing.

Separate Offsets Table

Another option on handling offsets in destination database is to have a separate table:

CREATE TABLE offsets (
  topic VARCHAR(20),
  partition INT,
  offset INT
  )

and make sure we write both inserts for data and offsets in one transaction.

Scheduling the Job

Having the etl job written, we just need to make sure we schedule it every, lets say 1 minute. And we must assure, there is exactly one instance running and not more. Otherwise, we would end up with duplicates anyway. As this can be achieved with modern cloud scheduling tools, we do not need to solve any locking mechanism for running at most one instance, every 1 minute.

Pros and cons

So far, we only discussed use-cases, which are small enough to fit on the single machine, so that our script can directly fetch the data. And that is the most obvious con, you can’t use this approach for larger datasets. Also, the script is dirty, it is not a super robust solution. One can also become limited by the offset fetch query, when the destination table becomes large to query, maybe a where clause can help.

On the other hand, in 100 lines, we can solve whole ETL topic. And it does not need to be Kafka only, this works for anything that has replayability on the source and transactions on the destination. We do not necessarily need to deploy any 3rd party framework. More or less, we do not need to learn anything new, or maintain it.

Conclusion

Sometimes, avoiding deploying a new framework, might be better.