Skip to main content

StarRocks

StarRocks 是一款高性能的分析型数据仓库,采用向量化、MPP 架构、CBO、智能物化视图以及可实时更新的列式存储引擎等先进技术,能够支持多维、实时和高并发的数据分析。

本文将介绍如何使用 StarRocks Routine Load 将 AutoMQ 中的数据导入 StarRocks。详细了解 Routine Load 的基本原理,请参考 Routine Load 基本原理文档。

环境准备

准备 StarRocks 和测试数据

确保当前已准备好可用的 StarRocks 集群。为了便于演示,我们参考 使用 Docker 部署 StarRocks 在一台 Linux 机器上安装用于演示的集群。

创建库和主键模型的测试表:



create database automq_db;
create table users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id)
PROPERTIES (
"replication_num" = "1",
"enable_persistent_index" = "true"
);

准备 AutoMQ 和测试数据

参考 本地部署▸ 部署 AutoMQ,确保 AutoMQ 与 StarRocks 之间保持网络连通。

在 AutoMQ 中快速创建一个名为 example_topic 的主题,并向其中写入一条测试 JSON 数据,按照以下步骤操作。

创建 Topic

使用 Apache Kafka 命令行工具创建主题,需要确保当前拥有 Kafka 环境的访问权限并且 Kafka 服务正在运行。以下是创建主题的命令示例:


./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1

在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

创建完主题后,可以使用以下命令来验证主题是否已成功创建。


./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

生成测试数据

生成一条 JSON 格式的测试数据,和前文的表需要对应。


{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

写入测试数据

通过 Kafka 的命令行工具或编程方式将测试数据写入到名为 example_topic 的主题中。下面是一个使用命令行工具的示例:


echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

使用如下命令可以查看刚写入的 topic 数据:


sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

创建 Routine Load 导入作业

在 StarRocks 的命令行中创建一个 Routine Load 作业,用来持续导入 AutoMQ Kafka topic 中的数据。


CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.96.4:9092",
"kafka_topic" = "example_topic",
"kafka_partitions" = "0",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

在执行命令时,需要将 kafka_broker_list 替换为实际使用的 Kafka 服务器地址。

参数说明

数据格式

需要PROPERTIES子句的"format" = "json"中指定数据格式为 JSON。

数据提取和转换

如果需要指定源数据和目标表之间列的映射和转换关系,可以配置COLUMNS 和 jsonpaths参数。在 COLUMNS 中,列名对应目标表 的列名,列的顺序对应源数据 中的列的顺序。而 jsonpaths 参数则用于提取 JSON 数据中所需的字段数据,类似于新生成的 CSV 数据。接着,COLUMNS 参数会按照 jsonpaths 中字段的顺序进行临时命名。想了解更多关于数据转换的内容,请查看导入时实现数据转换文档

如果每行一个 JSON 对象中 key 的名称和数量(顺序不需要对应)都能对应目标表中列,则无需配置 COLUMNS 。

验证数据导入

首先,检查 Routine Load 导入作业的状态,确保任务正在运行中。


show routine load\G;

然后查询 StarRocks 数据库中的相关表,可以看到数据已经被成功导入。


StarRocks > select * from users;
+------+--------------+---------------------+--------+
| id | name | timestamp | status |
+------+--------------+---------------------+--------+
| 1 | 测试用户 | 2023-11-10T12:00:00 | active |
| 2 | 测试用户 | 2023-11-10T12:00:00 | active |
+------+--------------+---------------------+--------+
2 rows in set (0.01 sec)