RisingWave
RisingWave 是一个分布式流数据库,提供标准的 SQL 接口,与 PostgreSQL 生态系统兼容,无需改动代码即可集成。RisingWave 将流视作表,允许用户以优雅方式在流数据和历史数据上编写复杂查询。借助 RisingWave,用户可以专注于查询分析逻辑,而无需学习 Java 或特定系统的底层 API。
本文将介绍如何通过 RisingWave Cloud 将数据从 AutoMQ 导入 RisingWave 数据库。
准备 AutoMQ 和测试数据
参考 本地部署▸ 部署 AutoMQ,确保 AutoMQ 与 RisingWave 之间保持网络连通。
在 AutoMQ 中快速创建一个名为 example_topic 的主题,并向其中写入一条测试 JSON 数据,按照以下步骤操作。
创建 Topic
使用 Apache Kafka 命令行工具创建主题,需要确保当前拥有 Kafka 环境的访问权限并且 Kafka 服务正在运行。以下是创建主题的命令示例:
./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1
在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。
创建完主题后,可以使用以下命令来验证主题是否已成功创建。
./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092
生成测试数据
生成一条 JSON 格式的测试数据,和前文的表需要对应。
{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}
写入测试数据
通过 Kafka 的命令行工具或编程方式将测试数据写入到名为 example_topic 的主题中。下面是一个使用命令行工具的示例:
echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic
使用如下命令可以查看刚写入的 topic 数据:
sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning
在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。
在 RisingWave Cloud 上创建 AutoMQ 源
前往 RisingWave Cloud Clusters 创建集群。
前往 RisingWave Cloud Source 创建源。
指定集群和数据库,并登入数据库。
AutoMQ 100% 兼容 Apache Kafka, 因此只需要点击 Create source 并且选择 Kafka。
根据 RisingWave Cloud 的引导界面配置连接器,设置源信息和 schema 信息。
确认生成的 SQL 语句,点击 Confirm 完成源的创建。
AutoMQ 默认端口是 9092 并且没有开启 SSL。 如果需要启用 SSL,请参考文档 Apache Kafka Documentation。
本示例中可以通过设置启动模式为 earliest,并使用 JSON 格式来从头访问 topic 中的所有数据。
查询数据
前往 RisingWave Cloud Console ,登入集群。
运行下方 SQL 语句,访问已经导入的数据,其中替换 your_source_name 变量为创建源时指定的自定义名称。
SELECT * from {your_source_name} limit 1;