有人可以提供示例代碼以將記錄從Spark Streaming推送到Kafka嗎?從Spark Streaming將數據推送到Kafka
-1
A
回答
0
使用Spark Streaming,您可以使用來自Kafka主題的數據。
如果你想發佈記錄卡夫卡的話題,你可以使用卡夫卡生產者[https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example]
或者你可以使用卡夫卡連接使用多個源連接到數據發佈到卡夫卡的話題。[http://www.confluent.io/product/connectors/]
請參閱以下鏈接瞭解Spark流媒體和Kafka集成的更多信息。
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
0
我已經使用Java做到了。您可以通過JavaDStream<String>
將此函數用作.foreachRDD()
的參數。這不是最好的方法,因爲它爲每個RDD創建KafkaProducer
,您可以使用KafkaProducers
的「池」(如socket example in Spark documentation)執行此操作。
這裏是我的代碼:
public static class KafkaPublisher implements VoidFunction<JavaRDD<String>> {
private static final long serialVersionUID = 1L;
public void call(JavaRDD<String> rdd) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "loca192.168.0.155lhost:9092");
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1000);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
private static final long serialVersionUID = 1L;
public void call(Iterator<String> partitionOfRecords) throws Exception {
Producer<String, String> producer = new KafkaProducer<>(props);
while(partitionOfRecords.hasNext()) {
producer.send(new ProducerRecord<String, String>("topic", partitionOfRecords.next()));
}
producer.close();
}
});
}
}
相關問題
- 1. Spark Streaming - Kafka- createStream - RDD到數據幀
- 2. 將Spark Streaming RDD推送到Neo4j -Scala
- 3. Spark Streaming Kafka java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.StringDeserializer
- 4. Spark Streaming Kafka backpressure
- 5. Spark-Streaming Kafka Direct Streaming API&Parallelism
- 6. Spark + Kafka streaming NoClassDefFoundError kafka/serializer/StringDecoder
- 7. Spark Streaming - Java - 從Kafka插入JSON到Cassandra
- 8. Spark Streaming + kafka「JobGenerator」java.lang.NoSuchMethodError
- 9. 將PubMed數據推送到Kafka
- 10. Kafka Streaming + Spark Streaming +機器學習
- 11. Kafka Spark-Streaming偏移問題
- 12. java.lang.NoClassDefFoundError:org/apache/spark/streaming/kafka/KafkaUtils
- 13. 在Spark Streaming中的Kafka createDirectStream
- 14. 如何在Spark Streaming中將壓縮數據寫入Kafka?
- 15. 從kafka-Spark-Streaming讀取數據時獲得空集
- 16. Kafka - Spark Streaming - 僅從一個分區讀取數據
- 17. 從卡夫卡發送CSV到Spark Streaming
- 18. Spark Streaming + Kafka:如何從kafka消息檢查主題名稱
- 19. Pyspark Kafka Streaming
- 20. Kafka Spark Streaming Consumer將不會收到來自Kafka Console Producer的任何消息?
- 21. spark-streaming-kafka-0-10:如何限制Spark分區的數量
- 22. 是否支持spark-streaming-kafka-0-10 lib?
- 23. 由於InvalidClassException,Spark Kafka Streaming作業失敗
- 24. Spark Streaming Kafka初始偏移量
- 25. apache spark streaming - kafka - 閱讀舊信息
- 26. 在Spark Streaming中重用kafka製作者
- 27. Kafka Streaming Concurrency?
- 28. 將spark-streaming連接到HBase
- 29. 無法將消息推送到apache kafka?
- 30. 從SqlServer將數據推送到Hive