Flink
Overview
Apache Flink is a renowned stream processing engine, widely used for event-driven analytics and mixed stream-batch processing. AutoMQ represents a high-performance, scalable cloud-native Kafka solution. By redesigning Kafka's storage layer for cloud-native environments, AutoMQ has achieved significant cost reductions—over ten times lower—and improved scalability. With its full compatibility with Kafka, AutoMQ flawlessly integrates with the existing Kafka ecosystem, enabling efficient data flows between Flink and AutoMQ. This article will illustrate how Flink can retrieve data from an AutoMQ Topic, analyze it using a WordCount example, and subsequently write the results back to AutoMQ.
Environment Setup
Installing and Starting Flink
This document refers to Apache Flink version v1.19.0. Initially, consult the Flink First Step official documentation for deploying a v1.19.0 Flink service.
Installing and Starting AutoMQ
To install and initiate AutoMQ, follow the AutoMQ Quick Start guide to configure a local AutoMQ cluster. The version of AutoMQ employed in this example is v1.0.4.
Preparing Test Topics and Data
Create a topic named to-flink
to store data intended for import into Flink for subsequent analysis and processing.
### The Default Port for a Locally Installed AutoMQ Is 9094.
bin/kafka-topics.sh --create --topic to-flink --bootstrap-server localhost:9094
Use the command line tool to write a batch of data for computing word count.
bin/kafka-console-producer.sh --topic to-flink --bootstrap-server localhost:9094
The following data needs to be entered, and you can exit the producer by pressing Ctrl+C:
apple
apple
banana
banana
banana
cherry
cherry
pear
pear
pear
lemon
lemon
mango
mango
mango
We anticipate the following results from Flink's computations:
apple 2
banana 3
cherry 2
pear 3
lemon 2
mango 3
Once the data has been written, try consuming it to verify successful entry.
bin/kafka-console-consumer.sh --topic to-flink --from-beginning --bootstrap-server localhost:9094
Establish a topic for capturing the results post-computation by Flink.
bin/kafka-topics.sh --create --topic from-flink --bootstrap-server localhost:9094
Import Data from AutoMQ into Flink for Detailed Analysis.
Thanks to AutoMQ's full compatibility with Apache Kafka®, we can leverage the Kafka Connector provided by Flink to effortlessly write source and sink code, facilitating data transfer from AutoMQ's Topic.
POM Dependencies
....
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.2</version>
</dependency>
....
<!-- Supported by the shade plugin for building the jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>automq-wordcount-flink-job</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.automq.example.flink.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
Writing Flink Job Code
The following Java code sets up an AutoMQ source and sink using KafkaSource and KafkaSink, respectively. Initially, it retrieves a pre-prepared "fruit list" test dataset from the topic to-flink. Subsequently, it establishes a DataStream to execute the WordCount computation and directs the results to the AutoMQ topic from-flink.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.automq.example.flink.WordCount;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
*/***
* * This is a re-write of the Apache Flink WordCount example using Kafka connectors.*
* * Find the reference example at https://github.com/redpanda-data/flink-kafka-examples/blob/main/src/main/java/io/redpanda/examples/WordCount.java*
* */*
public class WordCount {
final static String *TO_FLINK_TOPIC_NAME *= "to-flink";
final static String *FROM_FLINK_TOPIC_NAME *= "from-flink";
final static String *FLINK_JOB_NAME *= "WordCount";
public static void main(String[] args) throws Exception {
// Use your AutoMQ cluster's bootstrap servers here
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9094";
// Set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.*getExecutionEnvironment*();
KafkaSource<String> source = KafkaSource.<String>*builder*()
.setBootstrapServers(bootstrapServers)
.setTopics(*TO_FLINK_TOPIC_NAME*)
.setGroupId("automq-example-group")
.setStartingOffsets(OffsetsInitializer.*earliest*())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.*builder*()
.setValueSerializationSchema(new SimpleStringSchema())
.setTopic(*FROM_FLINK_TOPIC_NAME*)
.build();
KafkaSink<String> sink = KafkaSink.<String>*builder*()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(serializer)
.build();
DataStream<String> text = env.fromSource(source, WatermarkStrategy.*noWatermarks*(), "AutoMQ Source");
// Split up the lines in pairs (2-tuples) containing: (word,1)
DataStream<String> counts = text.flatMap(new Tokenizer())
// Group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.sum(1)
.flatMap(new Reducer());
// Add the sink to so results
// are written to the outputTopic
counts.sinkTo(sink);
// Execute program
env.execute(*FLINK_JOB_NAME*);
}
*/***
* * Implements the string tokenizer that splits sentences into words as a user-defined*
* * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the*
* * form of "(word,1)" ({@code Tuple2<String, Integer>}).*
* */*
* *public static final class Tokenizer
implements
FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// Normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// Emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
// Implements a simple reducer using FlatMap to
// reduce the Tuple2 into a single string for
// writing to kafka topics
public static final class Reducer
implements
FlatMapFunction<Tuple2<String, Integer>, String> {
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<String> out) {
// Convert the pairs to a string
// for easy writing to Kafka Topic
String count = value.f0 + " " + value.f1;
out.collect(count);
}
}
}
This code will produce an automq-wordcount-flink-job.jar after being compiled with mvn build, which is the executable job we need to submit to Flink.
Submitting the Job to Flink
Execute the following command to deploy the task jar to flink, and from the console, we can observe that 15 pieces of data have been received and processed.
./bin/flink run automq-wordcount-flink-job.jar
![](/assets/images/1-8177739e16b81795af7936be406a934f.png)
Analyzing the Results
Utilizing the Kafka bin tool from AutoMQ to consume data from the from-flink
topic, we can review the output:
bin/kafka-console-consumer.sh --topic from-flink --from-beginning --bootstrap-server localhost:9094
The output is presented as follows: Since the stream processing is implemented without configuring watermarks or window functions, the complete word count results are displayed each time.
apple 1
apple 2
banana 1
banana 2
banana 3
cherry 1
cherry 2
pear 1
pear 2
pear 3
lemon 1
lemon 2
mango 1
mango 2
mango 3
Next, we will input 5 additional data entries into the to-flink
topic to monitor the stream processing outcomes:
bin/kafka-console-producer.sh --topic to-flink --bootstrap-server localhost:9094
The data entered are
apple
banana
cherry
pear
lemon
Subsequently, we can confirm that the accurate word count results were received from the from-flink
topic as follows
apple 3
banana 4
cherry 3
pear 4
lemon 3
On the console, it is also evident that 20 pieces of data have been correctly received and processed:
![](/assets/images/2-b059a5c3e3fb2d63476f8972239c97a7.png)
Summary
This article illustrates how AutoMQ collaborates with Flink to execute a Word Count analysis. For additional configurations and instructions on using the Kafka Connector, consult the Flink official documentation Apache Kafka Connector.