Create Multi-Region Topics Programmatically With Python and confluent-kafka-python
Photo of Prague by Anthony Delanoix

Create Multi-Region Topics Programmatically With Python and confluent-kafka-python

Confluent Platform 5.4 introduced Multi-Region Clusters (MRC). MRC adds numerous cool features such as Follower-Fetching or asynchronous Observers.

This article explains how to create topics with replicas and observers programmatically using python.

More details on MRC clusters: https://docs.confluent.io/current/multi-dc-deployments/multi-region.html

To create a topic with replicas and (asynchronous) observers, you have to pass replica and observers placement details in the form of a JSON object.

Here is an example using the command-line:

$ cat placement.json
{"version":1,"replicas":[{"count":2,"constraints":{"rack":"east"}}],"observers":[{"count":1,"constraints":{"rack":"west"}}]}
$
$ kafka-topics  --create --bootstrap-server <IP address>:9092 --topic test2 --partitions 5 --replica-placement placement.json
$
$ kafka-topics --zookeeper <IP address>:2181 --describe --topic test2

Topic: test2	PartitionCount: 5	ReplicationFactor: 3	Configs: confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"east"}}],"observers":[{"count":1,"constraints":{"rack":"west"}}]}
	Topic: test2	Partition: 0	Leader: 2	Replicas: 2,3,4	Isr: 2,3	Offline:
	Topic: test2	Partition: 1	Leader: 3	Replicas: 3,1,5	Isr: 3,1	Offline:
	Topic: test2	Partition: 2	Leader: 1	Replicas: 1,2,4	Isr: 1,2	Offline:
	Topic: test2	Partition: 3	Leader: 2	Replicas: 2,1,5	Isr: 2,1	Offline:
	Topic: test2	Partition: 4	Leader: 3	Replicas: 3,2,4	Isr: 3,2	Offline:

To do the same programmatically with python, you have to use the confluent-kafka-python python library.

Specific to MRC is that you have to pass a config dict with confluent.placement.constraints instead of the replication_factor argument. See line 5 below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from confluent_kafka.admin import AdminClient, NewTopic

placement = "{\"version\":1,\"replicas\":[{\"count\":2,\"constraints\":{\"rack\":\"east\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"west\"}}]}"

new_topic = [NewTopic("test1", num_partitions=5, config={'confluent.placement.constraints': placement})]

a = AdminClient({'bootstrap.servers': '<IP address>:9092'})
res = a.create_topics(new_topic)

for topic, f in res.items():
    try:
        f.result()
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

That’s it!

Written by Ldom //