我有一個用java編寫的卡夫卡生產者代碼,用於編寫卡夫卡消息。以及接收這些消息的消費者代碼。是否有可能使用java編寫kafka消費者接收的輸出到文件
是否有可能將消費者收到的消息寫入java中的任何文本文件。
我有一個用java編寫的卡夫卡生產者代碼,用於編寫卡夫卡消息。以及接收這些消息的消費者代碼。是否有可能使用java編寫kafka消費者接收的輸出到文件
是否有可能將消費者收到的消息寫入java中的任何文本文件。
謝謝你們,
我能夠實現它。一旦數據在消費者端被接收到,那麼它就是你必須編寫的一個常見的java代碼。
下面是將消息打印到控制檯的代碼行。
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
您可以將所有消息存儲到字符串並一次全部打印到文件。
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
completMessage += new String(bytes, "UTF-8")+"\n";
new String(bytes, "UTF-8")+"\n";
包含實際的消息。
最後打印所有文件。
writeDataToFile(completMessage);
writeDataToFile
包含簡單的java代碼來寫一個字符串到文件。
謝謝。
Kafka流迭代器是阻塞的,因此當從Java調用它們時,它們將繼續等待新消息,並且代碼無法關閉FileWriter。爲此,您可以執行以下操作:
1.更改爲降低超時
'props.put屬性(「consumer.timeout.ms」,「1500」);'
2.現在在超時的情況下打破循環
'VAL文件=新的文件(參數(0)) VAL體重=新的BufferedWriter(新的FileWriter(文件)) 而(試行{stream.iterator()hasNext()} {捕案件 E:Throwable的= >假}) { VAL記錄= stream.iterator.next()。消息() 的println(record.toString()) bw.write(record.toString()) } ' –
如果您正在編寫自己的客戶,則應該在同一個應用程序中包含寫入文件的邏輯。使用預先打包的控制檯使用者,您可以將其管道化爲文件。例如:kafka-console-consumer > file.txt
另一個(無代碼)選項將嘗試StreamSets Data Collector一個開源的Apache許可工具,它也具有拖放UI。它包含用於Kafka和各種數據格式的內置連接器。
*完全披露我是這個項目的提交者。
這是可能的。以下是這個的工作代碼。
package com.venk.prac;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import kafka.utils.ShutdownableThread;
public class FileConsumer extends ShutdownableThread {
private final KafkaConsumer<Integer, String> kafkaConsumer;
private String topic;
private String filePath;
private BufferedWriter buffWriter;
public FileConsumer(String topic, String filePath) {
super("FileConsumer", false);
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaProperties.KAFKA_BROKER_SERVERS_PORTS_STRING);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FileConsumer");
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaConsumer = new KafkaConsumer<Integer, String>(properties);
this.topic = topic;
this.filePath = filePath;
try {
this.buffWriter = new BufferedWriter(new FileWriter(filePath));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void doWork() {
// TODO Auto-generated method stub
kafkaConsumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000);
try {
for (ConsumerRecord<Integer, String> record : consumerRecords)
buffWriter.write(record.value() + System.lineSeparator());
buffWriter.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public String name() {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean isInterruptible() {
return false;
}
}
是的。在發佈之前,您是否嘗試過谷歌答案? – aviad