Skip to Main Content

Flink

Introduction

Apache Flink is a renowned stream processing engine widely used in event-driven, stream-batch analysis scenarios. AutoMQ is a highly elastic cloud-native Kafka that brings over 10x cost reduction and elasticity benefits through cloud-native modifications to the Kafka storage layer. Thanks to AutoMQ's 100% compatibility with Kafka, it can easily leverage existing Kafka ecosystem tools to read and write with Flink. This article will demonstrate how Flink can read data from an AutoMQ Topic, perform data analysis, and then write the results back to AutoMQ using a WordCount example.

Environment Setup

This document uses Flink version v1.19.0. Follow the official documentation Flink First Step to deploy a v1.19.0 Flink service.

Install and Launch AutoMQ

Refer to the AutoMQ Quick Start documentation to deploy an AutoMQ cluster locally. This example uses AutoMQ version v1.0.4.

Prepare Test Topic and Data

Create a topic to-flink to store data that needs to be imported into Flink for analysis and computation.


### The Default Port for Locally Installed AutoMQ Is 9094.
bin/kafka-topics.sh --create --topic to-flink --bootstrap-server localhost:9094

Use the command-line tool to write a batch of data for word count computation.


bin/kafka-console-producer.sh --topic to-flink --bootstrap-server localhost:9094

The data to be written is as follows, and you can exit the producer by pressing Ctrl+C after finishing the input.


apple
apple
banana
banana
banana
cherry
cherry
pear
pear
pear
lemon
lemon
mango
mango
mango

Finally, we hope that the result obtained through Flink computation will be.


apple 2
banana 3
cherry 2
pear 3
lemon 2
mango 3

After writing is complete, we can try to consume the data to confirm it was successfully written.


bin/kafka-console-consumer.sh --topic to-flink --from-beginning --bootstrap-server localhost:9094

Create a topic to receive the results of Flink computations.


bin/kafka-topics.sh --create --topic from-flink --bootstrap-server localhost:9094

Thanks to AutoMQ's full compatibility with Kafka, we can directly use the Kafka Connector provided by Flink to write the source and sink code to load data from AutoMQ's Topic.

POM Dependencies


....
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.19.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.2</version>
</dependency>

....

<!-- shade plugin support for building jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>automq-wordcount-flink-job</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.automq.example.flink.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

The following Java code defines an AutoMQ source and sink using KafkaSource and KafkaSink, respectively. It first reads the pre-prepared "fruit list" test data from the topic to-flink. Then, it creates a DataStream to perform the WordCount computation and sinks the result into the AutoMQ topic from-flink.


/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.example.flink.WordCount;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* This is a re-write of the Apache Flink WordCount example using Kafka connectors.
* Find the reference example at https://github.com/redpanda-data/flink-kafka-examples/blob/main/src/main/java/io/redpanda/examples/WordCount.java
*/
public class WordCount {

final static String TO_FLINK_TOPIC_NAME = "to-flink";
final static String FROM_FLINK_TOPIC_NAME = "from-flink";
final static String FLINK_JOB_NAME = "WordCount";

public static void main(String[] args) throws Exception {
// Use your AutoMQ cluster's bootstrap servers here
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9094";

// Set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(TO_FLINK_TOPIC_NAME)
.setGroupId("automq-example-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new SimpleStringSchema())
.setTopic(FROM_FLINK_TOPIC_NAME)
.build();

KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(serializer)
.build();

DataStream<String> text = env.fromSource(source, WatermarkStrategy.noWatermarks(), "AutoMQ Source");

// Split up the lines in pairs (2-tuples) containing: (word,1)
DataStream<String> counts = text.flatMap(new Tokenizer())
// Group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.sum(1)
.flatMap(new Reducer());

// Add the sink to so results
// are written to the outputTopic
counts.sinkTo(sink);

// Execute program
env.execute(FLINK_JOB_NAME);
}

/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
* form of "(word,1)" ({@code Tuple2<String, Integer>}).
*/
public static final class Tokenizer
implements
FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// Normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");

// Emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}

// Implements a simple reducer using FlatMap to
// reduce the Tuple2 into a single string for
// writing to kafka topics
public static final class Reducer
implements
FlatMapFunction<Tuple2<String, Integer>, String> {

@Override
public void flatMap(Tuple2<String, Integer> value, Collector<String> out) {
// Convert the pairs to a string
// for easy writing to Kafka Topic
String count = value.f0 + " " + value.f1;
out.collect(count);
}
}
}

The following code, after being built using mvn, will generate an automq-wordcount-flink-job.jar, which is the job we need to submit to Flink.

Execute the following command to submit the task jar to Flink. Through the console, we can see that 15 pieces of data have been received and processed.


./bin/flink run automq-wordcount-flink-job.jar

Verify the Analysis Results

Use the Kafka bin tools extracted from AutoMQ to consume data from from-flink and check the results:


bin/kafka-console-consumer.sh --topic from-flink --from-beginning --bootstrap-server localhost:9094

You can see the output results below. Since it's processed in a stream and there is no watermark or window calculation set, the word count result is printed out every time a calculation is performed.


apple 1
apple 2
banana 1
banana 2
banana 3
cherry 1
cherry 2
pear 1
pear 2
pear 3
lemon 1
lemon 2
mango 1
mango 2
mango 3

Next, we write 5 more data entries to the to-flink Topic and observe the stream processing results:


bin/kafka-console-producer.sh --topic to-flink --bootstrap-server localhost:9094

The data written is


apple
banana
cherry
pear
lemon

Then we can see that the from-flink Topic correctly outputs the following word count results


apple 3
banana 4
cherry 3
pear 4
lemon 3

We can also see on the console that 20 data entries were correctly received and processed:

Summary

This article demonstrates how AutoMQ integrates with Flink to complete a Word Count analysis workflow. For more configurations and usage of the Kafka Connector, refer to the official Flink documentation Apache Kafka Connector.