AutoMQ 完全兼容 Apache Kafka,支持使用 Apache Kafka 的客户端 SDK 进行集成和收发消息。本文档介绍 AutoMQ 推荐的客户端 SDK 以及使用注意事项。
兼容性说明
AutoMQ 完全兼容 Apache Kafka (0.10 ~ 3.9 版本),推荐使用 Apache kafka 的客户端 SDK 访问 AutoMQ 进行收发消息。根据 Apache Kafka 的协议协商原则,在上述范围内的客户端版本均可以正常接入。
建议应用侧尽可能升级到更新的 SDK 版本,以修复历史版本的隐藏缺陷。
推荐的客户端 SDK 列表
Apache Kafka 社区针对不同的开发语言提供了多种 SDK 选择,用户可查看社区的文档信息。AutoMQ 技术团队基于大量的生产实践,针对不同的开发语言推荐客户使用如下 SDK 接入 AutoMQ。
在 AutoMQ Labs 准备了对应客户端的快速开始样例,作为接入参考。
开发语言
| 客户端 SDK
| 推荐版本
| 样例
|
|---|
Java
| Apache Kafka Java Client
| >= 3.2.0
| Demo
|
C/C++
| librdkafka
| >= 2.8.0
| Demo
|
Go
| franz-go
| >= 1.17.1
| Demo
|
Python
| kafka-python
| >= 2.2.3
| Demo
|
NodeJS
| KafkaJS
| >= 2.2.4
| Demo
|
如果目前使用的 SDK 不在上述推荐列表范围内,只要兼容 Apache Kafka 协议,则仍然可以继续访问 AutoMQ,建议查看下方的已知问题列表,进行适当的参数调整。
客户端参数调优
使用 SDK 收发消息时,除了选择推荐的 SDK 版本,还需要针对 Producer 的关键参数进行调整,以达到最佳的性能。以下是各 SDK 和 Kafka Output 的 Producer 推荐参数配置。
Apache Kafka Java Client > 2.1
# https://kafka.apache.org/documentation/#producerconfigs
# 批量大小,一次性最多积累 1MiB 的数据。
batch.size=1048576
# 攒批时间,对于 Producer,如果在指定的时间内没有达到攒批的上限,也会触发发送。这个时间代表了发送的最大延迟。
linger.ms=1000
Apache Kafka Java Client <= 2.1
# https://kafka.apache.org/21/documentation.html#producerconfigs
# 最大重试次数
retries=2147483647
# 初始退避延迟时间
retry.backoff.ms=100
# 最大退避延迟时间
retry.backoff.max.ms=1000
# 每次发送请求的 RPC 超时时间
request.timeout.ms=30000
# 整个发送调用整体超时时间,超过这个时间后即不再重试,返回调用侧失败异常。
delivery.timeout.ms=120000
# 批量大小,一次性最多积累 1MiB 的数据。
batch.size=1048576
# 攒批时间,对于 Producer,如果在指定的时间内没有达到攒批的上限,也会触发发送。这个时间代表了发送的最大延迟。
linger.ms=1000
franz-go
package main
import (
"time"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
// 对应 Kafka 配置中的 linger.ms,例如 1000 毫秒。
kgo.ProducerLinger(1*time.Second),
)
if err != nil {
panic(err)
}
defer client.Close()
}
kafka-go
// https://pkg.go.dev/github.com/segmentio/kafka-go#Writer
// 推荐配置示例:
w := &kafka.Writer{
// 消息发送最大重试次数。
// 参考 Apache Kafka Java Client 的 retries 参数,设置为 Integer.MAX_VALUE。
MaxAttempts: 2147483647,
// 触发一次发送的最大消息条数。
BatchSize: 4096,
// 需要的分区副本确认数。
RequiredAcks: kafka.RequireAll,
}
Sarama
package main
import (
"time"
"github.com/IBM/sarama"
)
func main() {
// https://pkg.go.dev/github.com/IBM/sarama#NewConfig
config := sarama.NewConfig()
// 触发一次 flush 的字节数阈值。
config.Producer.Flush.Bytes = 1048576
// 触发一次 flush 的消息条数阈值。
config.Producer.Flush.Messages = 4096
// flush 的最大等待时间,等同于 Kafka Java Producer 的 linger.ms。
config.Producer.Flush.Frequency = 1 * time.Second
// 消息发送最大重试次数(默认 3),等同于 Kafka Java Producer 的 retries。
config.Producer.Retry.Max = 2147483647
// 重试之间的退避等待时间(默认 100ms),等同于 Kafka Java Producer 的 retry.backoff.ms。
config.Producer.Retry.Backoff = 1 * time.Second
// TODO: 使用 config 创建 producer,例如 sarama.NewSyncProducer(brokers, config)
_ = config
}
Filebeat Kafka Output
# https://www.elastic.co/docs/reference/beats/filebeat/kafka-output
# Leader 选举期间的重试退避等待时间。
retry.backoff: 3s
# 批量发送 Kafka 请求的等待时间。
bulk_flush_frequency: 1s
附录:其他已知问题
已知缺陷
| 缺陷信息
| 修复方法
|
|---|
Async Producer 在发送重试时没有限制内存占用
| | |
已知缺陷
| 缺陷信息
| 修复方法
|
|---|
Consumer 无法识别 Group Coordinator 变更
| - 缺陷版本:V0.4.47
- 现象:__consumer_offsets Topic 发生分区迁移后,部分 Group 消费中断,日志报错
- 原因:当 __consumer_offsets 分区迁移后,会导致部分 Group 的 Coordinator 发生变更。而 kafka-go 没有正确地处理该情况——在 Coordinator 变更后,仍然向老的 Coordinator 发送请求,导致了 NOT_COORDINATOR 报错。
| - 临时重启 Consumer 恢复。
- 更换 SDK ,推荐使用 Franz-Go SDK,参考本文档推荐版本。
|
已知缺陷
| 缺陷信息
| 修复方法
|
|---|
Consumer 配置 heartbeatInterval 不准确
| - 缺陷版本:V2.2.4
- 社区说明:https://github.com/tulios/kafkajs/issues/130#issuecomment-422024849
- 现象:当 Consumer 配置的
heartbeatInterval 和 sessionTimeout 过于接近时,Consumer 会周期性 Leave & Rejoin Group(但基本不影响消费),日志报错。 - 原因:kafkajs 中的配置
heartbeatInterval 仅保证了“两次心跳间的最小间隔时间”,即在某些场景下(例如,Consumer 消费到 Topic 末尾,没有新的消息可供消费),Consumer 向 Coordinator 发送心跳的间隔可能超过 heartbeatInterval ;而如果该值配置地过大,就可能超过 sessionTimeout 进而导致 Coordinator 将 Consumer 驱逐。
| - 调低
heartbeatInterval 或调高 sessionTimeout (建议heartbeat.interval.ms 不应超过 session.timeout.ms 的 1/3)。
|