
说明:此处使用的是Talend 8.0.1的版本,此版本自带的Kafka组件最高支持2.2.1的版本和此项目要求的Kafka3.6.2版本不同,因此采用的是tJava的方式进行传输
要求:键固定,值是JSONObject格式
1、tLibraryLoad1和tLibraryLoad2
用于引入使用的Jar包,分别为:kafka-clients-3.6.2.jar,json-20240303.jar;此处使用的是Install a new module引入新的包,包可以在maven下载,放到本地后选这个选项进行install
2、tFileInputDelimited
用于读取CSV文件的内容,在Edit schema中设置好读取的字段,然后将分隔符设置为逗号,如果有标题行的话,header为1
3、tMap和tLogRow
用于配置映射关系,以及在log中打印每条数据的结果
4、tJavaRow
该组件中可以通过input_row获取到当前遍历行的数据,然后通过Java语句对当前数据进行操作。
// 获取jsonArray参数用于存储每一行的数据(Talend全局参数)
JSONArray jsonArray = (JSONArray) globalMap.get("jsonArray");
// jsonArray不存在则创建(第一次执行jsonArray不存在时创建,后续正常使用)
if (jsonArray == null) {
jsonArray = new JSONArray();
globalMap.put("jsonArray", jsonArray);
}
// 创建一个JSONObject存放各行的数据
JSONObject rowObject = new JSONObject();
// input_row这个参数可以拿到当前遍历的数据
rowObject.put("DEVICE_ID", input_row.DEVICE_ID);
rowObject.put("PARAM_VALUE", input_row.PARAM_VALUE);
rowObject.put("DESCRIPTION", input_row.DESCRIPTION);
// 将存放当前数据的JSONObject添加到JSONArray当中
jsonArray.put(rowObject);
5、tJava
在此组件中,配置Kafka的相关连接以及传输消息
// 获取tJavaRow处理过的jsonArray参数(其中已经包含了所有的数据)
JSONArray jsonArray = (JSONArray) globalMap.get("jsonArray");
JSONObject finalObject = new JSONObject();
finalObject.put("title", "TalendAllDataTest");
finalObject.put("data", jsonArray);
// 将JSONObject转换为字符串
String csvJsonString = finalObject.toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka Broker 地址
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
// 创建Kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息到Kafka
ProducerRecord<String, String> record = new ProducerRecord<>("Talend_To_Kafka", "AllDataTalend", csvJsonString);
producer.send(record, (RecordMetadata metadata, Exception e) -> {
if (e != null) {
System.out.println("发送消息失败: " + e.getMessage());
} else {
System.out.println("消息发送成功: " + metadata.toString());
}
});
// 关闭生产者
producer.close();
最后效果:

如需对此类格式的Kafka数据进行操作,可以考虑使用Flink:
Java使用Flink 从Kafka中获取并处理数据,再传回Kafka(案例) – YW
Flink官网文档:Kafka | Apache Flink
I was recommended this blog by my cousin. I’m not sure whether this
post is written by him as nobody else know such
detailed about my difficulty. You are incredible! Thanks!