2015-11-09 55 views

回答

2

謝謝你們,

我能夠實現它。一旦數據在消費者端被接收到,那麼它就是你必須編寫的一個常見的java代碼。

下面是將消息打印到控制檯的代碼行。

System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); 

您可以將所有消息存儲到字符串並一次全部打印到文件。

System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); 
completMessage += new String(bytes, "UTF-8")+"\n"; 

new String(bytes, "UTF-8")+"\n";包含實際的消息。

最後打印所有文件。

writeDataToFile(completMessage); 

writeDataToFile包含簡單的java代碼來寫一個字符串到文件。

謝謝。

+0

Kafka流迭代器是阻塞的,因此當從Java調用它們時,它們將繼續等待新消息,並且代碼無法關閉FileWriter。爲此,您可以執行以下操作:
1.更改爲降低超時
'props.put屬性(「consumer.timeout.ms」,「1500」);'
2.現在在超時的情況下打破循環
'VAL文件=新的文件(參數(0)) VAL體重=新的BufferedWriter(新的FileWriter(文件)) 而(試行{stream.iterator()hasNext()} {捕案件 E:Throwable的= >假}) { VAL記錄= stream.iterator.next()。消息() 的println(record.toString()) bw.write(record.toString()) } ' –

3

如果您正在編寫自己的客戶,則應該在同一個應用程序中包含寫入文件的邏輯。使用預先打包的控制檯使用者,您可以將其管道化爲文件。例如:kafka-console-consumer > file.txt

另一個(無代碼)選項將嘗試StreamSets Data Collector一個開源的Apache許可工具,它也具有拖放UI。它包含用於Kafka和各種數據格式的內置連接器。

*完全披露我是這個項目的提交者。

0

這是可能的。以下是這個的工作代碼。

package com.venk.prac; 

import java.io.BufferedWriter; 
import java.io.FileWriter; 
import java.io.IOException; 
import java.util.Collections; 
import java.util.Properties; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.serialization.IntegerDeserializer; 
import org.apache.kafka.common.serialization.StringDeserializer; 

import kafka.utils.ShutdownableThread; 

public class FileConsumer extends ShutdownableThread { 

    private final KafkaConsumer<Integer, String> kafkaConsumer; 
    private String topic; 
    private String filePath; 
    private BufferedWriter buffWriter; 

    public FileConsumer(String topic, String filePath) { 

     super("FileConsumer", false); 
     Properties properties = new Properties(); 
     properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
       KafkaProperties.KAFKA_BROKER_SERVERS_PORTS_STRING); 
     properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FileConsumer"); 
     properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); 
     properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); 
     properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); 
     properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); 
     properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 

     kafkaConsumer = new KafkaConsumer<Integer, String>(properties); 
     this.topic = topic; 
     this.filePath = filePath; 

     try { 
      this.buffWriter = new BufferedWriter(new FileWriter(filePath)); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void doWork() { 
     // TODO Auto-generated method stub 
     kafkaConsumer.subscribe(Collections.singletonList(this.topic)); 
     ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000); 
     try { 
      for (ConsumerRecord<Integer, String> record : consumerRecords) 
       buffWriter.write(record.value() + System.lineSeparator()); 
      buffWriter.flush(); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public String name() { 
     // TODO Auto-generated method stub 
     return null; 
    } 

    @Override 
    public boolean isInterruptible() { 
     return false; 
    } 

}