Skip to Main Content

Flink

Overview

Apache Flink is a renowned stream processing engine, widely used for event-driven analytics and mixed stream-batch processing. AutoMQ represents a high-performance, scalable cloud-native Kafka solution. By redesigning Kafka's storage layer for cloud-native environments, AutoMQ has achieved significant cost reductions—over ten times lower—and improved scalability. With its full compatibility with Kafka, AutoMQ flawlessly integrates with the existing Kafka ecosystem, enabling efficient data flows between Flink and AutoMQ. This article will illustrate how Flink can retrieve data from an AutoMQ Topic, analyze it using a WordCount example, and subsequently write the results back to AutoMQ.

Environment Setup

This document refers to Apache Flink version v1.19.0. Initially, consult the Flink First Step official documentation for deploying a v1.19.0 Flink service.

Installing and Starting AutoMQ

To install and initiate AutoMQ, follow the AutoMQ Quick Start guide to configure a local AutoMQ cluster. The version of AutoMQ employed in this example is v1.0.4.

Preparing Test Topics and Data

Create a topic named to-flink to store data intended for import into Flink for subsequent analysis and processing.


### The Default Port for a Locally Installed AutoMQ Is 9094.
bin/kafka-topics.sh --create --topic to-flink --bootstrap-server localhost:9094

Use the command line tool to write a batch of data for computing word count.


bin/kafka-console-producer.sh --topic to-flink --bootstrap-server localhost:9094

The following data needs to be entered, and you can exit the producer by pressing Ctrl+C:


apple
apple
banana
banana
banana
cherry
cherry
pear
pear
pear
lemon
lemon
mango
mango
mango

We anticipate the following results from Flink's computations:


apple 2
banana 3
cherry 2
pear 3
lemon 2
mango 3

Once the data has been written, try consuming it to verify successful entry.


bin/kafka-console-consumer.sh --topic to-flink --from-beginning --bootstrap-server localhost:9094

Establish a topic for capturing the results post-computation by Flink.


bin/kafka-topics.sh --create --topic from-flink --bootstrap-server localhost:9094

Thanks to AutoMQ's full compatibility with Apache Kafka®, we can leverage the Kafka Connector provided by Flink to effortlessly write source and sink code, facilitating data transfer from AutoMQ's Topic.

POM Dependencies


....
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.19.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.2</version>
</dependency>

....

<!-- Supported by the shade plugin for building the jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>automq-wordcount-flink-job</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.automq.example.flink.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

The following Java code sets up an AutoMQ source and sink using KafkaSource and KafkaSink, respectively. Initially, it retrieves a pre-prepared "fruit list" test dataset from the topic to-flink. Subsequently, it establishes a DataStream to execute the WordCount computation and directs the results to the AutoMQ topic from-flink.


/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.example.flink.WordCount;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

*/***
* * This is a re-write of the Apache Flink WordCount example using Kafka connectors.*
* * Find the reference example at https://github.com/redpanda-data/flink-kafka-examples/blob/main/src/main/java/io/redpanda/examples/WordCount.java*
* */*
public class WordCount {

final static String *TO_FLINK_TOPIC_NAME *= "to-flink";
final static String *FROM_FLINK_TOPIC_NAME *= "from-flink";
final static String *FLINK_JOB_NAME *= "WordCount";

public static void main(String[] args) throws Exception {
// Use your AutoMQ cluster's bootstrap servers here
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9094";

// Set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.*getExecutionEnvironment*();

KafkaSource<String> source = KafkaSource.<String>*builder*()
.setBootstrapServers(bootstrapServers)
.setTopics(*TO_FLINK_TOPIC_NAME*)
.setGroupId("automq-example-group")
.setStartingOffsets(OffsetsInitializer.*earliest*())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.*builder*()
.setValueSerializationSchema(new SimpleStringSchema())
.setTopic(*FROM_FLINK_TOPIC_NAME*)
.build();

KafkaSink<String> sink = KafkaSink.<String>*builder*()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(serializer)
.build();

DataStream<String> text = env.fromSource(source, WatermarkStrategy.*noWatermarks*(), "AutoMQ Source");

// Split up the lines in pairs (2-tuples) containing: (word,1)
DataStream<String> counts = text.flatMap(new Tokenizer())
// Group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.sum(1)
.flatMap(new Reducer());

// Add the sink to so results
// are written to the outputTopic
counts.sinkTo(sink);

// Execute program
env.execute(*FLINK_JOB_NAME*);
}

*/***
* * Implements the string tokenizer that splits sentences into words as a user-defined*
* * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the*
* * form of "(word,1)" ({@code Tuple2<String, Integer>}).*
* */*
* *public static final class Tokenizer
implements
FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// Normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");

// Emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}

// Implements a simple reducer using FlatMap to
// reduce the Tuple2 into a single string for
// writing to kafka topics
public static final class Reducer
implements
FlatMapFunction<Tuple2<String, Integer>, String> {

@Override
public void flatMap(Tuple2<String, Integer> value, Collector<String> out) {
// Convert the pairs to a string
// for easy writing to Kafka Topic
String count = value.f0 + " " + value.f1;
out.collect(count);
}
}
}

This code will produce an automq-wordcount-flink-job.jar after being compiled with mvn build, which is the executable job we need to submit to Flink.

Execute the following command to deploy the task jar to flink, and from the console, we can observe that 15 pieces of data have been received and processed.


./bin/flink run automq-wordcount-flink-job.jar

Analyzing the Results

Utilizing the Kafka bin tool from AutoMQ to consume data from the from-flink topic, we can review the output:


bin/kafka-console-consumer.sh --topic from-flink --from-beginning --bootstrap-server localhost:9094

The output is presented as follows: Since the stream processing is implemented without configuring watermarks or window functions, the complete word count results are displayed each time.


apple 1
apple 2
banana 1
banana 2
banana 3
cherry 1
cherry 2
pear 1
pear 2
pear 3
lemon 1
lemon 2
mango 1
mango 2
mango 3

Next, we will input 5 additional data entries into the to-flink topic to monitor the stream processing outcomes:


bin/kafka-console-producer.sh --topic to-flink --bootstrap-server localhost:9094

The data entered are


apple
banana
cherry
pear
lemon

Subsequently, we can confirm that the accurate word count results were received from the from-flink topic as follows


apple 3
banana 4
cherry 3
pear 4
lemon 3

On the console, it is also evident that 20 pieces of data have been correctly received and processed:

Summary

This article illustrates how AutoMQ collaborates with Flink to execute a Word Count analysis. For additional configurations and instructions on using the Kafka Connector, consult the Flink official documentation Apache Kafka Connector.