跳转到主要内容

Documentation Index

Fetch the complete documentation index at: https://docs.automq.com/llms.txt

Use this file to discover all available pages before exploring further.

AutoMQ 完全兼容 Apache Kafka,支持使用 Apache Kafka 的客户端 SDK 进行集成和收发消息。本文档介绍 AutoMQ 推荐的客户端 SDK 以及使用注意事项。

兼容性说明

AutoMQ 完全兼容 Apache Kafka (0.10 ~ 3.9 版本),推荐使用 Apache kafka 的客户端 SDK 访问 AutoMQ 进行收发消息。根据 Apache Kafka 的协议协商原则,在上述范围内的客户端版本均可以正常接入。
建议应用侧尽可能升级到更新的 SDK 版本,以修复历史版本的隐藏缺陷。

推荐的客户端 SDK 列表

Apache Kafka 社区针对不同的开发语言提供了多种 SDK 选择,用户可查看社区的文档信息。AutoMQ 技术团队基于大量的生产实践,针对不同的开发语言推荐客户使用如下 SDK 接入 AutoMQ。 在 AutoMQ Labs 准备了对应客户端的快速开始样例,作为接入参考。
开发语言
客户端 SDK
推荐版本
样例
Java
Apache Kafka Java Client
>= 3.2.0
Demo
C/C++
librdkafka
>= 2.8.0
Demo
Go
franz-go
>= 1.17.1
Demo
Python
kafka-python
>= 2.2.3
Demo
NodeJS
KafkaJS
>= 2.2.4
Demo
如果目前使用的 SDK 不在上述推荐列表范围内,只要兼容 Apache Kafka 协议,则仍然可以继续访问 AutoMQ,建议查看下方的已知问题列表,进行适当的参数调整。

客户端参数调优

使用 SDK 收发消息时,除了选择推荐的 SDK 版本,还需要针对 Producer 的关键参数进行调整,以达到最佳的性能。以下是各 SDK 和 Kafka Output 的 Producer 推荐参数配置。

Apache Kafka Java Client > 2.1

# https://kafka.apache.org/documentation/#producerconfigs
# 批量大小,一次性最多积累 1MiB 的数据。
batch.size=1048576
# 攒批时间,对于 Producer,如果在指定的时间内没有达到攒批的上限,也会触发发送。这个时间代表了发送的最大延迟。
linger.ms=1000

Apache Kafka Java Client <= 2.1

# https://kafka.apache.org/21/documentation.html#producerconfigs
# 最大重试次数
retries=2147483647

# 初始退避延迟时间
retry.backoff.ms=100
# 最大退避延迟时间
retry.backoff.max.ms=1000
# 每次发送请求的 RPC 超时时间
request.timeout.ms=30000
# 整个发送调用整体超时时间,超过这个时间后即不再重试,返回调用侧失败异常。
delivery.timeout.ms=120000
# 批量大小,一次性最多积累 1MiB 的数据。
batch.size=1048576
# 攒批时间,对于 Producer,如果在指定的时间内没有达到攒批的上限,也会触发发送。这个时间代表了发送的最大延迟。
linger.ms=1000

franz-go

package main

import (
	"time"

	"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		// 对应 Kafka 配置中的 linger.ms,例如 1000 毫秒。
		kgo.ProducerLinger(1*time.Second),
	)
	if err != nil {
		panic(err)
	}
	defer client.Close()
}

kafka-go

// https://pkg.go.dev/github.com/segmentio/kafka-go#Writer
// 推荐配置示例:
w := &kafka.Writer{
	// 消息发送最大重试次数。
	// 参考 Apache Kafka Java Client 的 retries 参数,设置为 Integer.MAX_VALUE。
	MaxAttempts: 2147483647,
	// 触发一次发送的最大消息条数。
	BatchSize: 4096,
	// 需要的分区副本确认数。
	RequiredAcks: kafka.RequireAll,
}

Sarama

package main

import (
	"time"

	"github.com/IBM/sarama"
)

func main() {
	// https://pkg.go.dev/github.com/IBM/sarama#NewConfig
	config := sarama.NewConfig()

	// 触发一次 flush 的字节数阈值。
	config.Producer.Flush.Bytes = 1048576
	// 触发一次 flush 的消息条数阈值。
	config.Producer.Flush.Messages = 4096
	// flush 的最大等待时间,等同于 Kafka Java Producer 的 linger.ms。
	config.Producer.Flush.Frequency = 1 * time.Second

	// 消息发送最大重试次数(默认 3),等同于 Kafka Java Producer 的 retries。
	config.Producer.Retry.Max = 2147483647
	// 重试之间的退避等待时间(默认 100ms),等同于 Kafka Java Producer 的 retry.backoff.ms。
	config.Producer.Retry.Backoff = 1 * time.Second

	// TODO: 使用 config 创建 producer,例如 sarama.NewSyncProducer(brokers, config)
	_ = config
}

Logstash Kafka Output

# https://www.elastic.co/docs/reference/logstash/plugins/plugins-outputs-kafka
output {
  kafka {
    # 其他连接配置 ...
    # 批量大小(字节),一次最多可累积 1MiB 的数据。
    batch_size => 1048576
    # 批量等待时间(毫秒)。如果在此时间内未达到批量限制,仍会触发发送。
    linger_ms => 1000
    # 生产者要求 Leader 在确认请求完成之前已收到的确认数。
    acks => "all"
    # 生产者生成的所有数据的压缩类型。
    compression_type => "lz4"
    # 重试失败的生产请求之前的等待时间。
    retry_backoff_ms => 1000
  }
}

Filebeat Kafka Output

# https://www.elastic.co/docs/reference/beats/filebeat/kafka-output
# Leader 选举期间的重试退避等待时间。
retry.backoff: 3s
# 批量发送 Kafka 请求的等待时间。
bulk_flush_frequency: 1s

Fluentd Kafka Output

# https://github.com/fluent/fluent-plugin-kafka
<match **>
  @type kafka2
  # 其他连接配置 ...

  <buffer>
    # 使用默认配置
  </buffer>
</match>

Fluent Bit Kafka Output

# https://docs.fluentbit.io/manual/pipeline/outputs/kafka
pipeline:
  outputs:
    - name: kafka
      match: '*'
      # 其他连接配置 ...
      # 批量等待时间(毫秒)。
      rdkafka.queue.buffering.max.ms: 1000
      # 压缩类型。
      rdkafka.compression.type: lz4

Vector Kafka Sink

# https://vector.dev/docs/reference/configuration/sinks/kafka/
[sinks.my_kafka]
type = "kafka"
inputs = ["my-source-or-transform-id"]
# 其他连接配置 ...
# 生产者生成的所有数据的压缩类型。
compression = "lz4"

# 批量等待时间通过 librdkafka 选项控制。
[sinks.my_kafka.librdkafka_options]
"queue.buffering.max.ms" = "1000"

附录:其他已知问题

Sarama Go SDK

已知缺陷
缺陷信息
修复方法
Async Producer 在发送重试时没有限制内存占用
  • 缺陷版本:V1.43.3
  • 社区 Issue:https://github.com/IBM/sarama/issues/1358
  • 社区 PR:https://github.com/IBM/sarama/pull/3026
  • 现象:在使用 async_producer 发送消息,且对应 Topic 发生分区迁移时,Producer 内存占用会异常升高,甚至导致 OOM。
  • 原因:当 async_producer 接收到服务端的可重试错误(如分区迁移导致的 NOT_LEADER_OR_FOLLOWER)时,会将消息暂存于内存中,并根据策略进行重试。然而,该重试缓存没有大小限制,若大量且持续的重试发生,内存使用可能意外增加。
  • 升级版本到 >= V1.44.0

Kafka-Go SDK

已知缺陷
缺陷信息
修复方法
Consumer 无法识别 Group Coordinator 变更
  • 缺陷版本:V0.4.47
  • 现象:__consumer_offsets Topic 发生分区迁移后,部分 Group 消费中断,日志报错
  • 原因:当 __consumer_offsets 分区迁移后,会导致部分 Group 的 Coordinator 发生变更。而 kafka-go 没有正确地处理该情况——在 Coordinator 变更后,仍然向老的 Coordinator 发送请求,导致了 NOT_COORDINATOR 报错。
  • 临时重启 Consumer 恢复。
  • 更换 SDK ,推荐使用 Franz-Go SDK,参考本文档推荐版本。

Kafka JS SDK

已知缺陷
缺陷信息
修复方法
Consumer 配置 heartbeatInterval 不准确
  • 缺陷版本:V2.2.4
  • 社区说明:https://github.com/tulios/kafkajs/issues/130#issuecomment-422024849
  • 现象:当 Consumer 配置的 heartbeatIntervalsessionTimeout 过于接近时,Consumer 会周期性 Leave & Rejoin Group(但基本不影响消费),日志报错。
  • 原因:kafkajs 中的配置 heartbeatInterval 仅保证了“两次心跳间的最小间隔时间”,即在某些场景下(例如,Consumer 消费到 Topic 末尾,没有新的消息可供消费),Consumer 向 Coordinator 发送心跳的间隔可能超过 heartbeatInterval ;而如果该值配置地过大,就可能超过 sessionTimeout 进而导致 Coordinator 将 Consumer 驱逐。
  • 调低 heartbeatInterval 或调高 sessionTimeout (建议heartbeat.interval.ms 不应超过 session.timeout.ms 的 1/3)。