Skip to Main Content

Example: Continuous Data Self-Balancing

You can create an unbalanced distribution of partitions by establishing a Topic with multiple partitions and manually reassigning partitions to specific nodes. Then, send a balanced load to all partitions and observe if the partitions autonomously reassign among different Brokers. This auto-reassignment is a built-in feature of AutoMQ, designed to ensure automatic equitable distribution of data across the cluster. By monitoring the distribution of partitions and the load on Brokers, you can confirm if the self-balancing is functioning as expected.

After successfully installing AutoMQ using the Cluster Deployment on Linux▸ method, you will receive a list of Bootstrap Server addresses similar to the following:

192.168.0.1:9092,192.168.0.2:9092

If you installed via Deploy Locally▸, the Bootstrap Server address will be:

broker1:9092,broker2:9092

Throughout all steps, ensure you replace the Bootstrap Server address with the one you actually obtained.

Prerequisites

  • Linux/Mac/Windows Subsystem for Linux

  • Docker

If the download speed of container images is slow, refer to Docker Hub Mirror Configuration▸

  • Ensure your AutoMQ cluster has the self-balancing feature enabled. If deploying locally, this feature is automatically activated. For other deployment methods, add the parameter --override autobalancer.controller.enable=true when starting the Controller.

Create Topic


CMD='docker run --network automq_net automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --partitions 8 --create --topic continuous-self-balancing-topic --bootstrap-server broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD

View the Distribution of Partitions


CMD='docker run --network automq_net automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-topics.sh --topic continuous-self-balancing-topic --describe --bootstrap-server broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD


Topic: continuous-self-balancing-topic TopicId: DNZe6gBQTrCOEAruQ_y2tg PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=1073741824
Topic: continuous-self-balancing-topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: continuous-self-balancing-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: continuous-self-balancing-topic Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: continuous-self-balancing-topic Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 6 Leader: 1 Replicas: 1 Isr: 1
Topic: continuous-self-balancing-topic Partition: 7 Leader: 2 Replicas: 2 Isr: 2

Manually Reassign Partitions

To observe continuous self-balancing of data, we manually reassign the partitions to node2.


echo '{
"partitions": [
{"topic": "continuous-self-balancing-topic", "partition": 0, "replicas": [2]},
{"topic": "continuous-self-balancing-topic", "partition": 1, "replicas": [2]},
{"topic": "continuous-self-balancing-topic", "partition": 2, "replicas": [2]},
{"topic": "continuous-self-balancing-topic", "partition": 3, "replicas": [2]},
{"topic": "continuous-self-balancing-topic", "partition": 4, "replicas": [2]},
{"topic": "continuous-self-balancing-topic", "partition": 5, "replicas": [2]},
{"topic": "continuous-self-balancing-topic", "partition": 6, "replicas": [2]},
{"topic": "continuous-self-balancing-topic", "partition": 7, "replicas": [2]}
],
"version": 1
}' > move.json && (CMD='docker run --network automq_net -v $(pwd)/move.json:/move.json automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092,broker2:9092 --reassignment-json-file /move.json --execute"' ; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD) && rm move.json

The distribution of partitions after manual reassignment is as follows


Topic: continuous-self-balancing-topic TopicId: HtVB3bM7TYaNKKKmm7khQw PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=1073741824
Topic: continuous-self-balancing-topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 4 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 6 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 7 Leader: 2 Replicas: 2 Isr: 2

Start the Producer


CMD='docker run --network automq_net automqinc/automq:latest /bin/bash -c "/opt/kafka/kafka/bin/kafka-producer-perf-test.sh --topic continuous-self-balancing-topic --num-records=1024000 --throughput 5120 --record-size 1024 --producer-props bootstrap.servers=broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD

Start the Consumer


CMD='docker run --network automq_net automqinc/automq:1.0.4 /bin/bash -c "/opt/kafka/kafka/bin/kafka-consumer-perf-test.sh --topic continuous-self-balancing-topic --show-detailed-stats --timeout 300000 --messages=1024000 --reporting-interval 1000 --bootstrap-server=broker1:9092,broker2:9092"'; [ "$(uname)" = "Linux" ] && eval "sudo $CMD" || eval $CMD

View the Distribution of Partitions again

After a period of time, you will observe the following logs produced by the producer.


[2024-05-16 10:29:50,448] 25622 records sent, 5123.4 records/sec (5.00 MB/sec), 15.7 ms avg latency, 41.0 ms max latency.
[2024-05-16 10:30:00,372] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10354 on topic-partition continuous-self-balancing-topic-7, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,373] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-7 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,373] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10354 on topic-partition continuous-self-balancing-topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,373] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10356 on topic-partition continuous-self-balancing-topic-7, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-7 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10356 on topic-partition continuous-self-balancing-topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10356 on topic-partition continuous-self-balancing-topic-6, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,384] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-6 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,385] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10357 on topic-partition continuous-self-balancing-topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,385] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10358 on topic-partition continuous-self-balancing-topic-7, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-7 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10358 on topic-partition continuous-self-balancing-topic-6, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-6 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10358 on topic-partition continuous-self-balancing-topic-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,397] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,398] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10360 on topic-partition continuous-self-balancing-topic-6, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,398] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-6 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,398] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10360 on topic-partition continuous-self-balancing-topic-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,398] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,411] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10361 on topic-partition continuous-self-balancing-topic-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,412] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,412] WARN [Producer clientId=perf-producer-client] Got error produce response with correlation id 10362 on topic-partition continuous-self-balancing-topic-4, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:30:00,412] WARN [Producer clientId=perf-producer-client] Received invalid metadata error in produce request on partition continuous-self-balancing-topic-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
[2024-05-16 10:29:55,450] 25327 records sent, 5064.4 records/sec (4.95 MB/sec), 15.3 ms avg latency, 80.0 ms max latency.

After a brief pause, production will continue as usual. Then, revisit the partition status.


Topic: continuous-self-balancing-topic TopicId: HtVB3bM7TYaNKKKmm7khQw PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=1073741824
Topic: continuous-self-balancing-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: continuous-self-balancing-topic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: continuous-self-balancing-topic Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: continuous-self-balancing-topic Partition: 6 Leader: 1 Replicas: 1 Isr: 1
Topic: continuous-self-balancing-topic Partition: 7 Leader: 1 Replicas: 1 Isr: 1

It was noted that since all partitions had been transferred to node2, all incoming messages were routed to node2, causing a localized hotspot and activating AutoMQ's Self-Balancing feature. AutoMQ then redistributed the partitions to ensure an even load across all nodes.