前置说明:
(1)Kafka中存储的数据样式为JSONArray格式,并不是常规的JSONObject格式。
{
"title":"LineNameData",
"data":[
{
"LINE_NAME":"A1",
"TYPE":"R"
},
{
"LINE_NAME":"A2",
"TYPE":"SR"
},
{
"LINE_NAME":"A3",
"TYPE":"SSR"
},
]
}
(2)使用的Java版本为1.8,Flink版本为1.17.0,Kafka版本为3.6.2。
1、pom配置
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.1</version>
</dependency>
<!-- 如果有请求MySQL的需求可以加上 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.2.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.compal.App</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2、APP.java
package com.compal;
public class App {
public static void main(String[] args) throws Exception {
System.out.println("is running");
KafkaGetUtil.getData(); // Kafka分装到这个类里面了,所以这边就直接引用了
}
}
3、KafkaGetUtil.java
package com.compal;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
public class KafkaGetUtil {
private final static String kafkaServer = "yourServer";
private final static String getTopic = "yourTopic";
private final static String groupId = "yourConsumer";
public static void getData() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Flink作业的并行度
env.setParallelism(1);
// kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(kafkaServer)
.setTopics(getTopic)
.setGroupId(groupId)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
DataStreamSource<String> fromSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 这里的formSource拿到的是Topic下所有消息的Value,由于我们设置消息内容时将title设置为了LineNameData,所以此处代码的作用就是过滤出title为LineNameData的数据
SingleOutputStreamOperator<String> filter = fromSource
.filter(value -> value.indexOf("LineNameData") != -1);
// 将获得的数据传递给DataProcess类来处理
filter.addSink(new DataProcess());
env.execute();
}
}
4、ChangeLine.java
创建一个类用于存储最终想要的结果,方便开发过程中的赋值,对比等操作。
5、DataProcess.java
Stream的使用方法以及支持的函数可以查看这一篇文章:Java Stream API中常用的函数及其示例 – YW
package com.compal;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
/*
* 对捕获的数据进行处理,类需要继承RichSinkFunction。
*/
public class DataProcess extends RichSinkFunction<String> {
// 重写此类可以获取到传递过来的数据,也就是value
@Override
public void invoke(String value) throws Exception {
JSONObject KafkaData = JSONObject.parseObject(value);
JSONArray Data= (JSONArray) KafkaData.get("Data"); // 获取对应Key的Value
// 将获取到的数据转化为ChangeLine对象,方便后续的数据处理
List<ChangeLine> LineDataList = LineData.toJavaList(ChangeLine.class);
// 这里使用的是java的stream流处理,可以实现基础的聚合,过滤,排序等操作
List<ChangeLine> NowLineList = LineDataList.stream().filter(obj -> {
// 你的过滤逻辑
}).collect(Collectors.toList());
// 用于存放处理完的数据(好传给Kafka)
JSONArray LineJsonArray = new JSONArray();
for (ChangeLine line : NowLineList) {
// 遍历数据,做具体的数据处理......
// 转化为JSONObject格式,方便传回Kafka
JSONObject LineJsonObject = new JSONObject();
LineJsonObject.put("LINE_NAME", line.getLINE_NAME());
LineJsonArray.add(LineJsonObject);
}
;
// 构造最终的JSONObject,并发送到Kafka
JSONObject finalJsonObject = new JSONObject();
finalJsonObject.put("LineData", LineJsonArray);
finalJsonObject.put("title", "ChangeLine");
String ChangeLineJsonString = finalJsonObject.toString();
Properties props = new Properties();
props.put("bootstrap.servers","yourKafkaServer");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
// 创建Kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息到Kafka
ProducerRecord<String, String> record = new ProducerRecord<>("yourTopicName", "ChangeLine",ChangeLineJsonString);
producer.send(record, (RecordMetadata metadata, Exception e) -> {
if (e != null) {
System.out.println("发送消息失败: " + e.getMessage());
} else {
System.out.println("消息发送成功: " + metadata.toString());
}
});
// 关闭生产者
producer.close();
}
}
官方文档参考:Kafka | Apache Flink