
Create Multi-Region Topics Programmatically With Python and confluent-kafka-python
Confluent Platform 5.4 introduced Multi-Region Clusters (MRC). MRC adds numerous cool features such as Follower-Fetching or asynchronous Observers.
This article explains how to create topics with replicas and observers programmatically using python.
More details on MRC clusters: https://docs.confluent.io/current/multi-dc-deployments/multi-region.html
To create a topic with replicas and (asynchronous) observers, you have to pass replica and observers placement details in the form of a JSON object.
Here is an example using the command-line:
$ cat placement.json
{"version":1,"replicas":[{"count":2,"constraints":{"rack":"east"}}],"observers":[{"count":1,"constraints":{"rack":"west"}}]}
$
$ kafka-topics --create --bootstrap-server <IP address>:9092 --topic test2 --partitions 5 --replica-placement placement.json
$
$ kafka-topics --zookeeper <IP address>:2181 --describe --topic test2
Topic: test2 PartitionCount: 5 ReplicationFactor: 3 Configs: confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"east"}}],"observers":[{"count":1,"constraints":{"rack":"west"}}]}
Topic: test2 Partition: 0 Leader: 2 Replicas: 2,3,4 Isr: 2,3 Offline:
Topic: test2 Partition: 1 Leader: 3 Replicas: 3,1,5 Isr: 3,1 Offline:
Topic: test2 Partition: 2 Leader: 1 Replicas: 1,2,4 Isr: 1,2 Offline:
Topic: test2 Partition: 3 Leader: 2 Replicas: 2,1,5 Isr: 2,1 Offline:
Topic: test2 Partition: 4 Leader: 3 Replicas: 3,2,4 Isr: 3,2 Offline:
To do the same programmatically with python, you can use the confluent-kafka-python python library.
Specific to MRC is that you have to pass a config dict with confluent.placement.constraints
instead of the replication_factor
argument. See line 5 below.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from confluent_kafka.admin import AdminClient, NewTopic
placement = "{\"version\":1,\"replicas\":[{\"count\":2,\"constraints\":{\"rack\":\"east\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"west\"}}]}"
new_topic = [NewTopic("test1", num_partitions=5, config={'confluent.placement.constraints': placement})]
a = AdminClient({'bootstrap.servers': '<IP address>:9092'})
res = a.create_topics(new_topic)
for topic, f in res.items():
try:
f.result()
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
That’s it!