CloudCanal
Introduction
With the rapid development of big data technology, Apache Kafka®, as a high-throughput, low-latency distributed messaging system, has become a core component for real-time data processing in enterprises. However, as businesses expand and technology advances, enterprises face increasing storage costs and operational complexities. To better optimize system performance and reduce operational expenses, businesses are seeking more advantageous messaging system solutions. Among these, AutoMQ [1], a cloud-native redesigned messaging system, has emerged as an ideal choice for enterprises due to its significant cost and elasticity advantages.
Introduction to AutoMQ
AutoMQ has redesigned Kafka for the cloud, separating storage to object storage. While maintaining 100% compatibility with Apache Kafka®, it offers users up to 10 times cost savings and 100 times elasticity advantage. AutoMQ, by building the S3Stream log store on S3, offloads storage to shared cloud storage solutions provided by cloud providers like EBS and S3. This provides low-cost, low-latency, highly available, highly durable, and infinitely scalable stream storage capabilities. Compared to the traditional Shared Nothing architecture, AutoMQ adopts a Shared Storage architecture, significantly reducing storage and operational complexities while enhancing system elasticity and durability.
The design philosophy and technical advantages of AutoMQ make it an ideal choice for replacing existing Kafka clusters in enterprises. By adopting AutoMQ, enterprises can significantly reduce storage costs, simplify operations, and achieve automated scaling and self-balancing of clusters, thereby more efficiently responding to changing business demands.
Overview of CloudCanal
CloudCanal [2] is a data synchronization and migration tool that helps enterprises build high-quality data pipelines. It features real-time efficiency, precise interconnectivity, stability and scalability, one-stop solutions, hybrid deployment, and complex data transformation. CloudCanal supports data migration, data synchronization, structural migration and synchronization, data verification, and correction functionalities, meeting the high standards of data quality and stability required by enterprises in their data management processes. By consuming incremental operation logs from the source data source, CloudCanal can nearly real-time replay operations on the target data source to achieve data synchronization.
The Necessity of Data Migration
In the daily operations of enterprises, upgrading and reassigning data systems are inevitable. For instance, when an enterprise's e-commerce platform faces surges in traffic and explosive data growth, the existing Kafka cluster may become inadequate, leading to performance bottlenecks and significant increases in storage costs. To address these challenges, enterprises might decide to reassign to a more cost-effective and scalable AutoMQ system.
In this reassignment process, both full synchronization and incremental synchronization are crucial steps. Full synchronization allows the transfer of all existing data from Kafka to AutoMQ, ensuring the integrity of foundational data. Incremental synchronization, on the other hand, captures and synchronizes new and changed data in Kafka in real-time after the full synchronization is complete, ensuring data consistency between the two systems during the reassignment. Next, I will use incremental synchronization as an example to detail how to use CloudCanal to achieve data reassignment from Kafka to AutoMQ, ensuring data consistency and integrity throughout the process.
Prerequisites
Before conducting the data reassignment, ensure the following prerequisites are met. This article will demonstrate the incremental synchronization process using one Kafka node and one AutoMQ node as examples.
Kafka Node: A deployed and running Kafka node, ensuring the Kafka node can normally receive and process messages, and its network configuration allows communication with the CloudCanal service.
AutoMQ Node: A deployed and running AutoMQ node, ensuring the AutoMQ node can normally receive and process messages, and its network configuration allows communication with the CloudCanal service.
CloudCanal Service: A deployed and configured CloudCanal service.
Deploying AutoMQ, Kafka, and CloudCanal
Deploy AutoMQ
Refer to the AutoMQ official documentation: QuickStart | AutoMQ [3]
Deploy Kafka
Refer to the Apache Kafka official documentation: QuickStart | Kafka [4]
Deploy CloudCanal
Installation and Startup
- Install basic tools
## Ubuntu
sudo apt update
sudo apt install apt-transport-https ca-certificates curl gnupg-agent software-properties-common
sudo apt-get install -y lsof
sudo apt-get install -y bc
sudo apt-get install -y p7zip-full
- Download and install the package
Log in to the CloudCanal official website [5], click the button to download the Private Deployment Edition, and obtain the software package download link. Download and extract it to the /opt/ folder.
cd /opt
# Download
wget -cO cloudcanal.7z "${software package download link}"
# Extract
7z x cloudcanal.7z -o./cloudcanal_home
cd cloudcanal_home/install_on_docker
The install_on_docker
directory includes the following contents:
Images: Four compressed files with tar extensions under the images directory
Docker Container Orchestration Files: docker-compose.yml file
Scripts: Some scripts for managing and maintaining CloudCanal containers
- Prepare Docker Environment
Ensure the following ports are not occupied
If you don't have a Docker and Docker Compose environment, please refer to the Docker Official Documentation (version 17.x.x and above). You can also use the script provided in the directory to install it directly:
## On Ubuntu, Navigate to the Install_on_docker Directory
bash ./support/install_ubuntu_docker.sh
- Start CloudCanal, execute the installation script to start:
## Ubuntu
bash install.sh
The following indication means the installation was successful
Activate CloudCanal
After a successful installation, you can access the CloudCanal console in your browser via http://{ip}:8111.
Note: If you are unable to access the page, you can try updating the current version of CloudCanal via script using the following commands:
# Navigate to the Installation Directory
cd /opt/cloudcanal_home/install_on_docker
# Stop the Current CloudCanal
sudo bash stop.sh
# Update and Start the New CloudCanal
sudo bash upgrade.sh
After entering the login interface, log in using the trial account
Account: test@clougence.com
Password: clougence2021
Default verification code: 777777
Upon successful login, you need to activate your CloudCanal account to use it normally. Apply for a free license and activate it: License Acquisition | CloudCanal [7]. Once activated successfully, the main interface status will be:
Data Migration Process
Preparing Source Kafka Data
You can choose from the following methods:
The MySQL-to-Kafka data synchronization process provided by CloudCanal, refer to: MySQL to Kafka Synchronization | CloudCanal [8]
Prepare data through Kafka SDK
Manually produce messages using Kafka provided scripts
Here, I will prepare the data through the Kafka SDK method. Below is the reference code:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaTest {
private static final String BOOTSTRAP_SERVERS = "${kafka_broker_ip:port}"; // Modify this to your own Kafka broker address
private static final int NUM_TOPICS = 50;
private static final int NUM_MESSAGES = 500;
public static void main(String[] args) throws Exception {
KafkaTest test = new KafkaTest();
test.createTopics();
test.sendMessages();
}
// Create 50 Topics, formatted as Topic-n
public void createTopics() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
try (AdminClient adminClient = AdminClient.create(props)) {
List<NewTopic> topics = new ArrayList<>();
for (int i = 1; i <= NUM_TOPICS; i++) {
topics.add(new NewTopic("Topic-" + i, 1, (short) 1));
}
adminClient.createTopics(topics).all().get();
System.out.println("Topics created successfully");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// Send 1,000 messages to each of the 50 Topic-n, with sequence numbers from 1 to 1000. The messages should be in JSON format
public void sendMessages() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 1; i <= NUM_TOPICS; i++) {
String topic = "Topic-" + i;
for (int j = 1; j <= NUM_MESSAGES; j++) {
String key = "key-" + j;
String value = "{\"userId\": " + j + ", \"action\": \"visit\", \"timestamp\": " + System.currentTimeMillis() + "}";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, (RecordMetadata metadata, Exception exception) -> {
if (exception == null) {
System.out.printf("Sent message to topic %s partition %d with offset %d%n", metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
}
}
System.out.println("Messages sent successfully");
}
}
}
After creation, you can check the Kafka node status through various GUI-based management tools such as Redpanda Console [9], Kafdrop [10], etc. Here I choose Redpanda Console, where you can see that there are now 50 Topics, each with 1,000 initial messages.
The messages are in JSON format:
{
"action": "INSERT/UPDATE/DELETE",
"bid": 1,
"before": [],
"data": [{
"id":"string data",
"username":"string data",
"user_id":"string data",
"ip":"string data",
"request_time":"1608782968300","request_type":"string data"}],
"db": "access_log_db",
"schema": "",
"table":"access_log",
"dbValType": {
"id":"INT",
"username":"VARCHAR",
"user_id":"INT",
"ip":"VARCHAR",
"request_time":"TIMESTAMP",
"request_type":"VARCHAR",},
"jdbcType": {
"id":"0",
"username":"0",
"user_id":"0",
"ip":"0",
"request_time":"0",
"request_type":"0",},
"entryType": "ROWDATA",
"isDdl": false,
"pks": ["id"],
"execTs": 0,
"sendTs": 0,
"sql": ""}
Meanwhile, the AutoMQ nodes currently have no data:
Add CloudCanal Data Source
In the CloudCanal interface, navigate to Data Source Management -> Add Data Source
Adding the Kafka data source similarly and conducting connection tests on both nodes can yield the following results:
Creating a Data Migration Task
- On the CloudCanal interface, navigate to Sync Tasks -> Create Task
- Choose the task specifications based on the amount of data you need to migrate:
- Select the Topics that need data migration:
- Confirm the task:
- After the task is created, it will automatically start by default, and you will be redirected to the task list. You will need to update the source data source configuration to enable heartbeat configuration for timely task status updates. The steps are Task Details -> Source Data Source Configuration -> Modify Configuration -> Apply Configuration:
- Then, wait for the task to restart, and you will see the following situation:
Note: If you encounter connection issues or high task latency, please refer to the CloudCanal official documentation: FAQ Index | CloudCanal [11]
Verify whether the Topic structure has been correctly created in AutoMQ
Prepare Incremental Data
With the task running normally, we now need to prepare incremental data so that the reassignment task can synchronize the incremental data to AutoMQ. We will continue to use the Kafka SDK to add new data. After adding the data, you can check the task execution status via Task Details -> Incremental Sync -> View Logs -> Task Run Logs:
2024-07-11 17:16:45.995 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.995 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.996 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.996 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.996 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.997 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.997 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.997 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.998 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.998 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.998 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.999 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
Verify the Reassignment Result
Verify whether AutoMQ has correctly synchronized the messages:
After adding data multiple times, the reassignment still completes successfully:
You can see that the new data added to Kafka during the incremental sync task has been synchronized to AutoMQ. At this point, our reassignment process is fully complete.
Summary
As enterprises experience data growth and diversified business needs, data migration and synchronization become increasingly critical. This article delves into how to leverage CloudCanal for incremental data reassignment from Apache Kafka® to AutoMQ, addressing storage costs and operational complexities. The incremental reassignment technique ensures data consistency and business continuity during the process, offering an efficient and reliable solution for enterprises.
We hope this article provides valuable insights and guidance on data migration and synchronization, aiding in the seamless transition and performance optimization of your systems!
References
[1] AutoMQ: https://docs.automq.com/automq/what-is-automq/overview
[2] CloudCanal: https://www.clougence.com/?src=cc-doc
[3] QuickStart | AutoMQ: https://docs.automq.com/docs/automq-opensource/EvqhwAkpriAomHklOUzcUtybn7g
[4] QuickStart | Kafka: https://kafka.apache.org/quickstart
[5] CloudCanal Official Website: https://www.clougence.com/?src=cc-doc-install-linux
[6] Docker Official Documentation: https://docs.docker.com/engine/install/
[7] License Acquisition | CloudCanal: https://www.clougence.com/cc-doc/license/license_use
[8] MySQL to Kafka Synchronization | CloudCanal: https://www.clougence.com/cc-doc/bestPractice/mysql_kafka_sync
[9] Redpanda Console: https://redpanda.com/redpanda-console-kafka-ui
[10] Kafdrop: https://github.com/obsidiandynamics/kafdrop
[11] FAQ Index | CloudCanal: https://www.clougence.com/cc-doc/faq/cloudcanal_faq_list