Write your first Kafka producer and Kafka consumer in Python

After spending time researching why to use Apache Kafka, it would be cool if you could produce and consume data from it, right?

I will help you produce and consume very basic samples. If you need more in-depth help, I can recommend my courses about Apache Kafka for Beginners.

Before you start, you are expected to have access to a running Apache Kafka cluster locally or remotely and know the basics of Python.

You can install the needed dependencies using my Poetry toml file below.

[tool.poetry]
name = "lesson-02"
version = "0.1.0"
description = ""
authors = ["Paris Nakita Kejser <contact@datacraftbackbone.com>"]
readme = "README.md"
package-mode = false

[tool.poetry.dependencies]
python = "^3.12"
pydantic-settings = "^2.5.2"
six = "^1.16.0"
kafka-python-ng = "^2.2.2"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

Produce messages to Kafka Topics.

Before you start producing data, you need to know that all data is saved as bytes into Kafka Topics. That means when you have the right data to send to Kafka, you must ensure its bytes before sending it.

If you have worked with event or queue systems before, such as SQS on AWS or RabbitMQ, you will like the benefits of Kafka with the freedom you get by using an event streaming platform.

I will stop to talk, and let’s write to our producer so you can see how simple it is to use Apache Kafka and how fast you can get up and running with it and send messages into a Kafka Topic.

Remember the limit pr Kafka message to a topic is 1MB, so if you have a vast message bigger than 1MB, it’s recommended to point to a second location where it’s possible to read the data, but this is a side note to you 🙂

from kafka import KafkaProducer
import json

kafka_bootstrap_servers = 'localhost:29092'
kafka_security_protocol = 'PLAINTEXT'
kafka_topic = 'sample-topic'

producer = KafkaProducer(
    bootstrap_servers=kafka_bootstrap_servers,
    security_protocol=kafka_security_protocol,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

producer.send(
 kafka_topic,
 value={
    "guid": "d0d637e7-9357-475b-9625-5b9075b82f2b",
    "name": "Burns Rosa",
    "address": "94 Ralph Avenue, Hemlock, Vermont",
    "company": "Myopium"
})
producer.flush()

Consuming Message(s) from Kafka Topic

Now that you have learned how to produce messages into a Kafka Topic, it will be nice if you can pull them again, right? So, let’s learn how to consume from a Kafka Topic and do something with the messages.

from kafka import KafkaConsumer
import json

kafka_bootstrap_servers = 'localhost:29092'
kafka_security_protocol = 'PLAINTEXT'
kafka_topic = 'sample-topic'

consumer = KafkaConsumer(
    bootstrap_servers=kafka_bootstrap_servers,
    security_protocol=kafka_security_protocol,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

consumer.subscribe(topics=kafka_topic)

for message in consumer:
    print(f"{message.partition}:{message.offset} v={message.value}")

When you run this code, the output will show you the partition, the offset, and the message value. The message value is your JSON from our producer, which you can use and do whatever you want with your consuming event.


Related Courses

Apache Kafka for Beginners

BETA COURSES – EARLY BIRD
  • Apache Kafka Fundamentals
  • Setting Up a Kafka Cluster
  • Topics and Partitioning Strategies
1h
1
11

Leave a Reply

Your email address will not be published. Required fields are marked *