AutoMQ is fully compatible with the Apache Kafka protocol, allowing you to connect and exchange messages using standard Kafka client SDKs. This guide outlines the recommended clients for various programming languages and highlights important considerations for their use with AutoMQ.
Compatibility Notes
AutoMQ is compatible with the Apache Kafka protocol, supporting clients from Kafka versions 0.10 to 3.9. This compatibility ensures that any standard Kafka client for these versions can connect to AutoMQ without modification.
We strongly recommend using the latest stable version of your chosen client SDK to leverage recent performance enhancements, features, and critical bug fixes.
Recommended Client SDK List
The Apache Kafka community maintains a list of client SDKs for various programming languages, which you can find in the official documentation. Based on extensive testing and production use, we recommend the following clients for optimal performance and stability with AutoMQ. To help you get started, we provide code samples for each recommended client in our AutoMQ Labs repository.
Programming Language
| Client SDK
| Recommended Version
| Sample
|
|---|
Java
| Apache Kafka Java Client
| >= 3.2.0
| Demo
|
C/C++
| librdkafka
| >= 2.8.0
| Demo
|
Go
| franz-go
| >= 1.17.1
| Demo
|
Python
| kafka-python
| >= 2.2.3
| Demo
|
NodeJS
| KafkaJS
| >= 2.2.4
| Demo
|
If the SDK you are using is not among the recommended ones listed above, as long as it is compatible with the Apache Kafka protocol, you can continue to access AutoMQ. We suggest reviewing the known issues list below and adjusting parameters accordingly.
Client Parameter Tuning
When using the SDK to send and receive messages, in addition to selecting the recommended SDK version, you need to tune the key parameters of the Producer to achieve optimal performance. Below are the recommended Producer configurations for each SDK and Kafka Output.
Apache Kafka Java Client > 2.1
# https://kafka.apache.org/documentation/#producerconfigs
# Batch size, up to 1MiB of data can be accumulated at a time.
batch.size=1048576
# Batch linger time. If the batch limit is not reached within this time, sending is triggered anyway. This represents the maximum send delay.
linger.ms=1000
Apache Kafka Java Client <= 2.1
# https://kafka.apache.org/21/documentation.html#producerconfigs
# Maximum retry count
retries=2147483647
# Initial backoff delay time
retry.backoff.ms=100
# Maximum backoff delay time
retry.backoff.max.ms=1000
# RPC timeout for each send request
request.timeout.ms=30000
# Total timeout for the entire send call. No further retries after this time; an exception is returned to the caller.
delivery.timeout.ms=120000
# Batch size, up to 1MiB of data can be accumulated at a time.
batch.size=1048576
# Batch linger time. If the batch limit is not reached within this time, sending is triggered anyway. This represents the maximum send delay.
linger.ms=1000
franz-go
package main
import (
"time"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
// Equivalent to the linger.ms configuration in the Java client, set to 1 second.
kgo.ProducerLinger(1*time.Second),
)
if err != nil {
panic(err)
}
defer client.Close()
}
kafka-go
// https://pkg.go.dev/github.com/segmentio/kafka-go#Writer
// Configure kafka.Writer fields; values align with Apache Kafka Java producer defaults.
writer := &kafka.Writer{
// Maximum number of attempts to deliver a message.
// Aligned with the Apache Kafka Java Client retries parameter (Integer.MAX_VALUE).
MaxAttempts: 2147483647,
// Maximum number of messages to trigger a flush.
BatchSize: 4096,
// Number of acknowledges from partition replicas required before responding.
RequiredAcks: kafka.RequireAll,
}
Sarama
package main
import (
"time"
"github.com/IBM/sarama"
)
func main() {
// https://pkg.go.dev/github.com/IBM/sarama#Config
config := sarama.NewConfig()
// Byte threshold to trigger a flush.
config.Producer.Flush.Bytes = 1048576
// Message count threshold to trigger a flush.
config.Producer.Flush.Messages = 4096
// Maximum flush wait time, equivalent to the Kafka Java producer's linger.ms.
config.Producer.Flush.Frequency = 1 * time.Second
// Maximum retry count (default 3), equivalent to the Kafka Java producer's retries.
config.Producer.Retry.Max = 2147483647
// Backoff wait time between retries (default 100ms), equivalent to the Kafka Java producer's retry.backoff.ms.
config.Producer.Retry.Backoff = 1 * time.Second
// TODO: use config to create a producer, e.g. sarama.NewSyncProducer(brokers, config)
_ = config
}
Filebeat Kafka Output
# https://www.elastic.co/docs/reference/beats/filebeat/kafka-output
# Retry backoff wait time during leader elections.
retry.backoff: 3s
# Wait time before sending bulk Kafka requests.
bulk_flush_frequency: 1s
Appendix: Other Known Issues
Known Defects
| Defect Information
| Resolution Methods
|
|---|
Async Producer has no memory usage limit during send retries
| - Defect Version: V1.43.3
- Community Issue: https://github.com/IBM/sarama/issues/1358
- Community PR: https://github.com/IBM/sarama/pull/3026
- Phenomenon: When using async_producer to send messages and the corresponding Topic undergoes partition reassignment, the Producer’s memory usage can increase abnormally, potentially leading to OOM.
- Cause: When async_producer receives server-side retryable errors (like NOT_LEADER_OR_FOLLOWER due to partition reassignment), it temporarily stores the messages in memory and retries according to the policy. However, this retry cache has no size limit, and if substantial and continuous retries occur, memory usage may unexpectedly increase.
| - Upgrade to version >= V1.44.0
|
Known Defects
| Defect Information
| Resolution Methods
|
|---|
Consumers Unable to Identify Group Coordinator Changes
| - Defect Version: V0.4.47
- Issue: After partition reassignment of the __consumer_offsets topic, some groups experience consumption interruptions, with errors logged.
- Cause: When the __consumer_offsets partitions are reassigned, the coordinator for some groups changes. Kafka-go does not correctly handle this situation, as it continues to send requests to the old coordinator following a change, resulting in NOT_COORDINATOR errors.
| - Temporarily restart the Consumer to recover.
- Switch SDKs, and it is recommended to use the Franz-Go SDK as per the recommended versions in this document.
|
Known Defects
| Defect Information
| Resolution Methods
|
|---|
Inaccurate Consumer Configuration of heartbeatInterval
| - Defect Version: V2.2.4
- Community Notes: https://github.com/tulios/kafkajs/issues/130#issuecomment-422024849
- Issue: When the Consumer’s configured
heartbeatInterval is too close to sessionTimeout, the Consumer may periodically Leave & Rejoin the Group (though it generally does not affect consumption), logging errors. - Cause: In kafkajs, the configuration of
heartbeatInterval only guarantees the “minimum interval between two heartbeats.” This means that in certain scenarios (e.g., the Consumer reaches the end of the topic with no new messages to consume), the interval between the Consumer sending heartbeats to the Coordinator may exceed the heartbeatInterval; if set too large, it may exceed sessionTimeout, causing the Coordinator to evict the Consumer.
| - Lower the
heartbeatInterval or increase the sessionTimeout (it is advised that heartbeat.interval.ms should not exceed 1/3 of session.timeout.ms).
|