2016-09-28 132 views

回答

0

使用Spark Streaming,您可以使用來自Kafka主題的數據。

如果你想發佈記錄卡夫卡的話題,你可以使用卡夫卡生產者[https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example]

或者你可以使用卡夫卡連接使用多個源連接到數據發佈到卡夫卡的話題。[http://www.confluent.io/product/connectors/]

請參閱以下鏈接瞭解Spark流媒體和Kafka集成的更多信息。

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

0

我已經使用Java做到了。您可以通過JavaDStream<String>將此函數用作.foreachRDD()的參數。這不是最好的方法,因爲它爲每個RDD創建KafkaProducer,您可以使用KafkaProducers的「池」(如socket example in Spark documentation)執行此操作。

這裏是我的代碼:

public static class KafkaPublisher implements VoidFunction<JavaRDD<String>> { 
    private static final long serialVersionUID = 1L; 

    public void call(JavaRDD<String> rdd) throws Exception { 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "loca192.168.0.155lhost:9092"); 
     props.put("acks", "1"); 
     props.put("retries", 0); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 1000); 
     props.put("buffer.memory", 33554432); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

     rdd.foreachPartition(new VoidFunction<Iterator<String>>() { 
      private static final long serialVersionUID = 1L; 

      public void call(Iterator<String> partitionOfRecords) throws Exception { 
       Producer<String, String> producer = new KafkaProducer<>(props); 
       while(partitionOfRecords.hasNext()) { 
        producer.send(new ProducerRecord<String, String>("topic", partitionOfRecords.next())); 
       } 
       producer.close(); 
      } 
     }); 
    } 
} 
相關問題