Skip to main content

Timeplus

Timeplus 是一款专注于流处理的数据分析平台,采用开源流式数据库 Proton,提供端到端的强大功能,支持团队快速、直观地处理流数据和历史数据。适用于各种规模和行业的组织,让数据工程师和平台工程师能够通过 SQL 充分释放流数据的价值。

本文介绍如何利用 Timeplus 控制台将 AutoMQ 数据导入 Timeplus。由于 AutoMQ 完全兼容 Apache Kafka,因此也可以创建 Kafka 外部流来分析 AutoMQ 数据,无需移动数据。

准备 AutoMQ 和测试数据

参考 本地部署▸ 部署 AutoMQ,确保 AutoMQ 与 Timeplus 之间保持网络连通。

如果需要保持 IP 白名单,则需要将 Timeplus 服务的静态 IP 列入白名单:

52.83.159.13 对于 cloud.timeplus.com.cn

在 AutoMQ 中快速创建一个名为 example_topic 的主题,并向其中写入一条测试 JSON 数据,按照以下步骤操作。

创建 Topic

使用 Apache Kafka 命令行工具创建主题,需要确保当前拥有 Kafka 环境的访问权限并且 Kafka 服务正在运行。以下是创建主题的命令示例:


./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1

在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

创建完主题后,可以使用以下命令来验证主题是否已成功创建。


./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

生成测试数据

生成一条 JSON 格式的测试数据,和前文的表需要对应。


{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

写入测试数据

通过 Kafka 的命令行工具或编程方式将测试数据写入到名为 example_topic 的主题中。下面是一个使用命令行工具的示例:


echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

使用如下命令可以查看刚写入的 topic 数据:


sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

AutoMQ 数据源

  1. 在左侧导航菜单中,点击“数据摄取”,再点击右上角的“添加数据”按钮。

  2. 在弹出窗口中,查看可连接的数据源和其他添加数据的方式。由于 AutoMQ 完全兼容 Apache Kafka,直接选择 Apache Kafka。

  3. 输入 broker URL,关闭 TLS 和身份验证。

  4. 输入 AutoMQ 主题名称,并选择“读取为”的数据格式,支持 JSON、AVRO 和文本格式。

    1. 建议选择 Text 以保存整个 JSON 文档为字符串,便于处理架构变化。

    2. 选择 AVRO 时,可开启“自动提取”选项,将顶级属性存储为不同列。需指定 schema 注册表地址、API 密钥和密钥。

  5. 在“预览”步骤中展示至少一个事件,新数据源默认在 Timeplus 中创建新流。

  6. 命名流并验证列信息,可设置事件时间列。若不设置,系统将使用摄取时间。也可选择现有流。

  7. 预览数据后,为源命名、添加描述并审查配置。点击“完成”,流数据将立即在指定流中可用。

AutoMQ 源说明

使用 AutoMQ 数据源,需要遵守如下约束:

  1. 当前仅支持 AutoMQ Kafka 主题中的消息采用 JSON 和 AVRO 格式。

  2. 主题级别 JSON 属性将被转换为流列。 对于嵌套属性,元素将被保存为 String 列,然后可以用 JSON functions 之一来查询它们。

  3. JSON 消息中的数值或布尔值类型将被转换为流中的对应类型。

  4. 日期时间或时间戳将被保存为字符串列。 可以通过 to_time function 将它们转换回 DateTime。