Skip to main content

Databend

Databend 是基于 Rust 开发的、面向云架构的新一代云原生数据仓库,采用对象存储构建,为企业提供了湖仓一体化、计算与存储分离的大数据分析平台。

本文将介绍如何通过 bend-ingest-kafka 将数据从 AutoMQ 导入 Databend。

环境准备

准备 Databend Cloud 和测试数据

首先,前往 Databend Cloud 开启 Warehouse ,并在 worksheet 中创建数据库库和测试表。


create database automq_db;
create table users (
id bigint NOT NULL,
name string NOT NULL,
ts timestamp,
status string
)

准备 AutoMQ 和测试数据

参考 本地部署▸ 部署 AutoMQ,确保 AutoMQ 与 Databend 之间保持网络连通。

在 AutoMQ 中快速创建一个名为 example_topic 的主题,并向其中写入一条测试 JSON 数据,按照以下步骤操作。

创建 Topic

使用 Apache Kafka 命令行工具创建主题,需要确保当前拥有 Kafka 环境的访问权限并且 Kafka 服务正在运行。以下是创建主题的命令示例:


./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1

在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

创建完主题后,可以使用以下命令来验证主题是否已成功创建。


./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

生成测试数据

生成一条 JSON 格式的测试数据,和前文的表需要对应。


{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

写入测试数据

通过 Kafka 的命令行工具或编程方式将测试数据写入到名为 example_topic 的主题中。下面是一个使用命令行工具的示例:


echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

使用如下命令可以查看刚写入的 topic 数据:


sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

创建 bend-ingest-databend job

bend-ingest-kafka 能够监控 kafka 并将数据批量写入 Databend Table。部署 bend-ingest-kafka 之后,即可开启数据导入 job。


bend-ingest-kafka --kafka-bootstrap-servers="localhost:9094" --kafka-topic="example_topic" --kafka-consumer-group="Consumer Group" --databend-dsn="https://cloudapp:password@host:443" --databend-table="automq_db.users" --data-format="json" --batch-size=5 --batch-max-interval=30s

在执行命令时,需要将 kafka-bootstrap-servers 替换为实际使用的 Kafka 服务器地址。

参数说明

databend-dsn

Databend Cloud 提供的连接到 warehouse 的 DSN,可以参考该文档 获取。

batch-size

bend-ingest-kafka 会积攒到 batch-size 条数据再触发一次数据同步。

验证数据导入

前往 Databend Cloud worksheet 中查询 automq_db.users 表,可以看到数据已经从 AutoMQ 同步到 Databend Table。