Skip to Main Content

Storage Layer Design: S3Stream

The term "AutoMQ Kafka" mentioned in this article specifically refers to the source available project automq-for-kafka under the GitHub AutoMQ organization by AutoMQ CO., LTD.

Introduction

Apache Kafka exposes stream semantics, and its underlying storage, LocalLog, is an infinite length stream abstraction based on files. Higher-level features like Compact Topic and Transaction are also built based on streams. Essentially, Apache Kafka is a system designed around streams. Therefore, AutoMQ for Kafka (short for AutoMQ Kafka) is a cloud-native reshaping of Apache Kafka based on the Stream concept, creating a low-latency, high-throughput, and low-cost S3 Stream abstraction based on cloud disks and object storage.

Overview

S3 Stream consists of Metadata, Delta WAL, and Main Storage:

  • Metadata: Integrated in the Controller, it manages the relationship between S3 Stream and Object. Metadata is saved in Kraft;
  • Delta WAL: A persistent, low-latency, high-performance write buffer implemented based on the cloud disk;
  • Main Storage: The primary storage with high throughput and low cost, implemented based on object storage;

Delta WAL

Delta WAL is a persistent, low-latency, high-performance write buffer of S3 Stream implemented based on the cloud disk:

  • Persistence: Unlike the operating system file write buffer, Delta WAL is a non-volatile buffer. Once S3 Stream data is written into Delta WAL, it can return success to the upper layer.

  • Low latency and high performance: Delta WAL persists data through the cloud disk, and its storage performance is consistent with the cloud disk. For instance, AWS EBS GP3 has a write latency of about 1ms, a free quota of 3000 IOPS 125MB/s, and can reach up to 260000 IOPS 10000MB/s.

  • Write Buffer: Delta WAL is the write buffer for Main Storage. Similar to the operating system file write buffer, Delta WAL transforms high-frequency, small data block (KB level) writes into low-frequency, large data block (500MB) object writes, significantly reducing the API call cost of object storage.

    All Streams on the AutoMQ Kafka Broker are sequentially written into the Broker's Delta WAL. When the buffered data in Delta WAL exceeds 500MB, Delta WAL will upload the buffered data as a WAL Object to object storage and mark the uploaded data for deletion. The data marked for deletion can be overwritten later. Through a rotating write mode, the overall space occupied by Delta WAL only needs 2GB.

    Each AutoMQ Kafka Broker node has an independent Delta WAL, and the overall write buffer capacity of the cluster will scale horizontally with the size of the cluster.

    AutoMQ Kafka's Serverless uses small machine types to achieve fine-grained elasticity. Most of the cloud disk's free quota can meet the IOPS and throughput requirements of a single machine, so the horizontal expansion of AutoMQ Kafka only requires a small amount (2GB) of storage cost. For instance, with AWS EBS GP3, expanding to 10 machines can give the cluster 30000 IOPS 1250MB/s Delta WAL throughput. The cost of EBS is only 2GB 10 0.08 USD/GB/month = 1.6 USD/GB/month, so the cost of horizontally expanding Delta WAL is very low.

Main Storage

S3 Stream uses object storage as its main storage, and the cost of using object storage includes storage fees and API call fees.

Almost all data of AutoMQ Kafka is stored in object storage, so the storage cost of AutoMQ Kafka is approximately equal to the storage cost of object storage.

So, the key to saving cost in object storage lies in reducing API call fees. Before introducing how AutoMQ Kafka saves on API call fees, let's first understand the two Object types defined by AutoMQ Kafka in object storage:

  • Stream Set Object: When Delta WAL data is uploaded, most Streams with small data volumes will be combined and uploaded as a Stream Set Object.
  • Stream Object: A Stream Object only contains data from a single Stream, which facilitates precise data deletion for Streams with different life cycles.

AutoMQ Kafka achieves API call cost savings in three areas: Delta WAL uploads, Stream Set Object Compaction, and Stream Object Compaction.

Delta WAL Uploads

The upload to object storage is triggered after Delta WAL buffers 500MB of data:

  • For high-volume streams, if the data exceeds 16MB, it is uploaded as a single Stream Object;

  • For low-volume streams, they are sorted together and written into a single WAL Object.

    By using a divergent model based on the size of the stream, Delta WAL not only saves a substantial amount of API call costs for small-volume streams, but also diverts large-volume streams to keep the WAL Object size relatively small, reducing the cost of subsequent WAL Object Compaction.

Stream Set Object Compact

Stream Set Object Compaction is regularly performed in the Broker's background at 20-minute intervals. Similar to RocksDB SST, Stream Set Object Compaction selects an appropriate Stream Set Object list for the current Broker based on the strategy, and merges them using a merge sort:

  • For streams that will exceed 16MB after merging, they are also uploaded as a single Stream Object;

  • The remaining streams are sorted together and written into a single Stream Set Object.

    Merge sort is used for merging Stream Set Objects. With a memory usage of 500MB and a read/write range of 16MB, it can handle the merging of up to 15TB of Stream Set Objects.

    Through Stream Set Object Compaction, small-volume stream data segments are merged into larger data segments. For the cold data Catchup Read scenario, this greatly reduces read API calls and improves read efficiency.

Stream Object Compact

The core purpose of Stream Object Compaction is to save the total amount of metadata for maintaining Object mappings in the cluster and to increase the aggregation degree of Stream Object data to reduce API call costs for cold data Catchup Read.

Stream Objects participating in Compaction are usually already 16MB, meeting the minimum Part limit for object storage. Stream Object Compaction uses the MultiPartCopy API of object storage to directly perform Range Copy uploads, avoiding wasting network bandwidth by reading and writing to object storage.