Skip to Main Content

CloudCanal

Introduction

With the rapid development of big data technology, Apache Kafka® has become a core component for real-time data processing in enterprises due to its high throughput and low latency as a distributed messaging system. However, as businesses expand and technology evolves, enterprises are facing increasing storage costs and operational complexity. To optimize system performance and reduce operational costs, enterprises are seeking more advantageous messaging system solutions. Among these solutions, AutoMQ [1], a cloud-based redesigned messaging system, stands out as an ideal choice for enterprises due to its significant cost benefits and elastic capabilities.

Introduction to AutoMQ

AutoMQ has redesigned Kafka based on the cloud by decoupling storage to object storage, offering users up to 10 times cost advantages and 100 times elasticity advantages while maintaining 100% compatibility with Apache Kafka®. By building the S3Stream, a streaming storage repository on S3, AutoMQ offloads storage to shared cloud storage provided by cloud providers like EBS and S3, providing low-cost, low-latency, high-availability, high-durability, and unlimited capacity streaming storage capabilities. Compared to the traditional Shared Nothing architecture, AutoMQ adopts a Shared Storage architecture, significantly reducing storage and operational complexity while enhancing system elasticity and reliability.

The design philosophy and technical advantages of AutoMQ make it an ideal choice for replacing existing Kafka clusters in enterprises. By adopting AutoMQ, enterprises can significantly reduce storage costs, simplify operations, and achieve automatic scaling and traffic self-balancing of clusters, thus more efficiently responding to changes in business demands.

CloudCanal Overview

CloudCanal [2] is a data synchronization and migration tool that helps enterprises build high-quality data pipelines, featuring real-time efficiency, precise interconnectivity, stability, scalability, one-stop, hybrid deployment, and complex data transformation advantages. CloudCanal supports data migration, data synchronization, schema migration and synchronization, data verification, and correction functions, meeting high requirements for data quality and stability in enterprise data management processes. By consuming incremental operation logs from source data sources, CloudCanal can replay operations on the target data sources in near real-time to achieve data synchronization.

The Necessity of Data Migration

In the daily operations of a business, system upgrades and migrations are inevitable. For instance, when an enterprise's e-commerce platform faces a surge in traffic and an explosive growth in data volume, the existing Kafka cluster may not suffice, leading to performance bottlenecks and significantly increased storage costs. To address these challenges, the business might decide to migrate to a more cost-effective and flexible AutoMQ system.

During such migrations, full synchronization and incremental synchronization are crucial steps. Full synchronization can migrate all existing data from Kafka to AutoMQ, ensuring the integrity of the foundational data. Incremental synchronization captures and synchronizes new and changed data in Kafka in real-time after the full synchronization is completed, ensuring data consistency between the two systems during the migration process. This article will take incremental synchronization as an example to detail how to use CloudCanal to achieve data migration from Kafka to AutoMQ, ensuring data consistency and completeness during the migration.

Prerequisites

Before performing the data migration, ensure the following prerequisites are met. This example will demonstrate the incremental synchronization process using one Kafka node and one AutoMQ node.

  1. Kafka Node: A deployed and running Kafka node, ensuring the Kafka node can normally receive and process messages, and its network configuration allows communication with the CloudCanal service.

  2. AutoMQ Node: A deployed and running AutoMQ node, ensuring the AutoMQ node can normally receive and process messages, and its network configuration allows communication with the CloudCanal service.

  3. CloudCanal Service: A deployed and configured CloudCanal service.

Deploy AutoMQ, Kafka, and CloudCanal

Deploying AutoMQ

Refer to the official AutoMQ documentation: QuickStart | AutoMQ [3]

Deploying Kafka

Refer to the official Apache Kafka® documentation: QuickStart | Kafka [4]

Deploying CloudCanal

Installation and Startup

  1. Install basic tools

## Ubuntu
sudo apt update
sudo apt install apt-transport-https ca-certificates curl gnupg-agent software-properties-common
sudo apt-get install -y lsof
sudo apt-get install -y bc
sudo apt-get install -y p7zip-full

  1. Download and install the package

Log in to the CloudCanal official website [5], and click the download button for the private deployment version to get the software package download link. Download and extract it to the /opt/ directory.


cd /opt
# Download
wget -cO cloudcanal.7z "${software_package_download_link}"
# Extract
7z x cloudcanal.7z -o./cloudcanal_home
cd cloudcanal_home/install_on_docker

The install_on_docker directory contains the following:

  • Images: Four compressed files ending with tar in the images directory

  • Docker container orchestration file: docker-compose.yml file

  • Scripts: Several scripts for managing and maintaining CloudCanal containers

  1. Preparing the Docker Environment

Ensure the following ports are not occupied.

If you do not have Docker and Docker Compose environments, refer to the Docker Official Documentation [6] (version 17.x.x and above). You can also use the provided script in the directory for installation:


## For Ubuntu, Navigate to the Install_on_docker Directory.
bash ./support/install_ubuntu_docker.sh

  1. Launch CloudCanal, execute the installation script to start:

## Ubuntu
bash install.sh

The following indicator will confirm a successful installation.

Activate CloudCanal.

After a successful installation, you can access the CloudCanal console in your browser via http://{ip}:8111.

Note: If you cannot access the page properly, try updating the current version of CloudCanal via script with the following commands:


# Navigate to the Installation Directory
cd /opt/cloudcanal_home/install_on_docker
# Stop the Current CloudCanal
sudo bash stop.sh
# Update and Start the New CloudCanal
sudo bash upgrade.sh

  1. After entering the login interface, log in with the trial account

    • Account: test@clougence.com

    • Password: clougence2021

    • Default verification code: 777777

  2. After successfully logging in, you need to activate your CloudCanal account to use it normally. Apply for a free license and activate: License Acquisition | CloudCanal [7]. Once activation is successful, the main interface status will be:

Data Migration Process

Prepare Source Kafka Data

You can choose the following methods:

  • The MySQL-to-Kafka data synchronization process provided by CloudCanal, refer to: MySQL to Kafka Sync | CloudCanal [8]

  • Prepare data via Kafka SDK

  • Manually produce messages using scripts provided by Kafka

Here you can prepare data using the Kafka SDK method. Below is a reference code:


import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaTest {

private static final String BOOTSTRAP_SERVERS = "${kafka_broker_ip:port}"; // Modify to your own Kafka node address
private static final int NUM_TOPICS = 50;
private static final int NUM_MESSAGES = 500;

public static void main(String[] args) throws Exception {
KafkaTest test = new KafkaTest();
test.createTopics();
test.sendMessages();
}

// Create 50 Topics, named Topic-n
public void createTopics() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

try (AdminClient adminClient = AdminClient.create(props)) {
List<NewTopic> topics = new ArrayList<>();
for (int i = 1; i <= NUM_TOPICS; i++) {
topics.add(new NewTopic("Topic-" + i, 1, (short) 1));
}
adminClient.createTopics(topics).all().get();
System.out.println("Topics created successfully");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// Send 1000 messages with serial numbers from 1 to 1000 to each of the 50 Topic-n, with messages in JSON format
public void sendMessages() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 1; i <= NUM_TOPICS; i++) {
String topic = "Topic-" + i;
for (int j = 1; j <= NUM_MESSAGES; j++) {
String key = "key-" + j;
String value = "{\"userId\": " + j + ", \"action\": \"visit\", \"timestamp\": " + System.currentTimeMillis() + "}";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, (RecordMetadata metadata, Exception exception) -> {
if (exception == null) {
System.out.printf("Sent message to topic %s partition %d with offset %d%n", metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
}
}
System.out.println("Messages sent successfully");
}
}
}

After creation, you can use various visualization tools to check the Kafka node status, such as Redpanda Console [9] and Kafdrop [10]. Here, using Redpanda Console as an example, you can see there are already 50 Topics, and each Topic has 500 initial messages.

The message format is JSON:


{
"action": "INSERT/UPDATE/DELETE",
"bid": 1,
"before": [],
"data": [{
"id":"string data",
"username":"string data",
"user_id":"string data",
"ip":"string data",
"request_time":"1608782968300","request_type":"string data"}],
"db": "access_log_db",
"schema": "",
"table":"access_log",
"dbValType": {
"id":"INT",
"username":"VARCHAR",
"user_id":"INT",
"ip":"VARCHAR",
"request_time":"TIMESTAMP",
"request_type":"VARCHAR",},
"jdbcType": {
"id":"0",
"username":"0",
"user_id":"0",
"ip":"0",
"request_time":"0",
"request_type":"0",},
"entryType": "ROWDATA",
"isDdl": false,
"pks": ["id"],
"execTs": 0,
"sendTs": 0,
"sql": ""}

Additionally, the AutoMQ node currently has no data:

Add CloudCanal Data Source

On the CloudCanal interface, go to Data Source Management -> Add Data Source

Similarly, adding a Kafka data source and testing connections on both nodes yields the following results:

Create Data Reassignment Task

  1. On the CloudCanal interface, navigate to Sync Tasks -> Create Task
  1. Select task specifications based on the volume of data you need to reassign:
  1. Choose the Topics for data reassignment:
  1. Confirm the task:
  1. Once the task is created, it will automatically start by default, and you will be redirected to the task list. You need to modify the source data source configuration to enable heartbeat settings to update task status promptly. Steps: Task Details -> Source Data Source Configuration -> Modify Configuration -> Apply Configuration:
  1. After the task restarts, you will see the following situation:

Note: If you encounter connection issues or high task latency, please refer to the CloudCanal official documentation: FAQ Index | CloudCanal [11]

  1. Verify whether the Topic structure has been correctly created in AutoMQ

Prepare Incremental Data

The task is already running normally, and the next step is to prepare incremental data so that the reassignment task can synchronize the incremental data to AutoMQ. Here, we still use the Kafka SDK to add new data. After adding new data, you can check the task execution status through Task Details->Incremental Synchronization->View Logs->Task Running Log:


2024-07-11 17:16:45.995 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.995 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.996 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.996 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.996 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.997 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.997 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.997 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.998 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.998 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.998 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64
2024-07-11 17:16:45.999 [incre-fetch-from-buffer-14-thd-0] INFO c.c.c.mq.worker.reader.kafka.KafkaIncreEventBroker - getWithoutAck successfully, batch:64, real:64

Verify the Reassignment Results

Verify whether AutoMQ has correctly synchronized the messages:

Repeatedly adding new data still completes the reassignment normally:

It can be observed that during the execution of the incremental synchronization task, the new data added to Kafka has been synchronized to AutoMQ. At this point, the data reassignment process from Kafka to AutoMQ is fully complete.

Summary

As enterprise data scales and business needs diversify, data migration and synchronization become increasingly important. This article explores in detail how to use CloudCanal to perform incremental synchronization data reassignment from Kafka to AutoMQ, addressing issues related to storage costs and operational complexity. During the reassignment process, incremental synchronization technology ensures data consistency and business continuity, providing enterprises with an efficient and reliable solution.

We hope this article offers valuable reference and guidance for your data reassignment and synchronization efforts, assisting in a smooth system transition and performance optimization!

References

[1] AutoMQ: https://docs.automq.com/automq/what-is-automq/overview

[2] CloudCanal: https://www.clougence.com/?src=cc-doc

[3] QuickStart | AutoMQ: https://docs.automq.com/automq/getting-started

[4] QuickStart | Kafka: https://kafka.apache.org/quickstart

[5] CloudCanal Official Website: https://www.clougence.com/?src=cc-doc-install-linux

[6] Docker Official Documentation: https://docs.docker.com/engine/install/

[7] License Acquisition | CloudCanal: https://www.clougence.com/cc-doc/license/license_use

[8] MySQL to Kafka Synchronization | CloudCanal: https://www.clougence.com/cc-doc/bestPractice/mysql_kafka_sync

[9] Redpanda Console: https://redpanda.com/redpanda-console-kafka-ui

[10] Kafdrop: https://github.com/obsidiandynamics/kafdrop

[11] FAQ Index | CloudCanal: https://www.clougence.com/cc-doc/faq/cloudcanal_faq_list