说明:此处使用的是Talend 8.0.1的版本,此版本自带的Kafka组件最高支持2.2.1的版本和此项目要求的Kafka3.6.2版本不同,因此采用的是tJava的方式进行传输
要求:键固定,值是JSONObject格式

1、tLibraryLoad1和tLibraryLoad2
用于引入使用的Jar包,分别为:kafka-clients-3.6.2.jar,json-20240303.jar;此处使用的是Install a new module引入新的包,包可以在maven下载,放到本地后选这个选项进行install

2、tFileInputDelimited
用于读取CSV文件的内容,在Edit schema中设置好读取的字段,然后将分隔符设置为逗号,如果有标题行的话,header为1

3、tMap和tLogRow
用于配置映射关系,以及在log中打印每条数据的结果

4、tJavaRow
该组件中可以通过input_row获取到当前遍历行的数据,然后通过Java语句对当前数据进行操作。

// 获取jsonArray参数用于存储每一行的数据(Talend全局参数)
JSONArray jsonArray = (JSONArray) globalMap.get("jsonArray");

// jsonArray不存在则创建(第一次执行jsonArray不存在时创建,后续正常使用)
if (jsonArray == null) {
    jsonArray = new JSONArray();
    globalMap.put("jsonArray", jsonArray);
}

// 创建一个JSONObject存放各行的数据
JSONObject rowObject = new JSONObject();

// input_row这个参数可以拿到当前遍历的数据
rowObject.put("DEVICE_ID", input_row.DEVICE_ID);
rowObject.put("PARAM_VALUE", input_row.PARAM_VALUE);
rowObject.put("DESCRIPTION", input_row.DESCRIPTION);

// 将存放当前数据的JSONObject添加到JSONArray当中
jsonArray.put(rowObject);

5、tJava
在此组件中,配置Kafka的相关连接以及传输消息

// 获取tJavaRow处理过的jsonArray参数(其中已经包含了所有的数据)
JSONArray jsonArray = (JSONArray) globalMap.get("jsonArray");

JSONObject finalObject = new JSONObject();
finalObject.put("title", "TalendAllDataTest");
finalObject.put("data", jsonArray);

// 将JSONObject转换为字符串
String csvJsonString = finalObject.toString();


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka Broker 地址
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());

// 创建Kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 发送消息到Kafka
ProducerRecord<String, String> record = new ProducerRecord<>("Talend_To_Kafka", "AllDataTalend", csvJsonString); 
producer.send(record, (RecordMetadata metadata, Exception e) -> {
    if (e != null) {
        System.out.println("发送消息失败: " + e.getMessage());
    } else {
        System.out.println("消息发送成功: " + metadata.toString());
    }
});

// 关闭生产者
producer.close();

最后效果:

如需对此类格式的Kafka数据进行操作,可以考虑使用Flink:
Java使用Flink 从Kafka中获取并处理数据,再传回Kafka(案例) – YW
Flink官网文档:Kafka | Apache Flink

One thought on “Talend将CSV数据传输给Kafka(3.6.2)”

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注