我是BigData生態系統和入門的新手。從Kafka讀取並寫入實木複合地板的hdfs
我看了幾篇文章有關使用火花流,但想知道是否可以使用火花的工作,而不是流從卡夫卡讀讀卡夫卡的話題? 如果是的話,你們可以幫我指出一些可以讓我開始的文章或代碼片段。
我的問題的第二部分是實木複合地板的格式寫入HDFS。 一旦我從卡夫卡讀到,我想我會有一個rdd。 將此rdd轉換爲數據幀,然後將數據幀寫入parquet文件。 這是正確的做法。
任何幫助表示讚賞。
感謝
我是BigData生態系統和入門的新手。從Kafka讀取並寫入實木複合地板的hdfs
我看了幾篇文章有關使用火花流,但想知道是否可以使用火花的工作,而不是流從卡夫卡讀讀卡夫卡的話題? 如果是的話,你們可以幫我指出一些可以讓我開始的文章或代碼片段。
我的問題的第二部分是實木複合地板的格式寫入HDFS。 一旦我從卡夫卡讀到,我想我會有一個rdd。 將此rdd轉換爲數據幀,然後將數據幀寫入parquet文件。 這是正確的做法。
任何幫助表示讚賞。
感謝
對於卡夫卡讀取數據並將其寫入HDFS,在木地板格式,使用星火批處理作業,而不是流,你可以使用Spark Structured Streaming。
結構化數據流是建立在星火SQL引擎可擴展性和容錯流處理引擎。您可以使用與在靜態數據上表示批量計算的相同方式來表達流式計算。 Spark SQL引擎將負責逐步連續運行它,並在流式數據持續到達時更新最終結果。您可以在Scala,Java,Python或R中使用Dataset/DataFrame API來表示流聚合,事件時窗口,流到批處理連接等。計算在同一優化的Spark SQL引擎上執行。最後,系統通過檢查點和預寫日誌確保端到端的準確性 - 一次容錯保證。簡而言之,結構化數據流提供了快速,可擴展,容錯,端到端的流處理,而無需用戶推斷流式傳輸。
它配備了卡夫卡從卡夫卡一個內置的來源,即,我們可以查詢數據。它與卡夫卡經紀人版本0.10.0或更高版本兼容。
對於批處理模式從卡夫卡拉低數據,則可以創建一個定義的偏置範圍內的數據集/數據幀。
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
源每一行都有下面的模式:
| Column | Type |
|:-----------------|--------------:|
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | long |
| timestampType | int |
現在,將數據寫入到HDFS在地板格式,下面的代碼可以這樣寫:
df.write.parquet("hdfs://data.parquet")
更多結構化數據流+卡夫卡的星火信息,請參見下面的指南 - Kafka Integration Guide
我希望它有幫助!
使用卡夫卡流。 SparkStreaming是一個可怕的用詞不當(它是引擎蓋下的小批量)。
https://eng.verizondigitalmedia.com/2017/04/28/Kafka-to-Hdfs-ParquetSerializer/
此答案有幫助嗎? – himanshuIIITian
謝謝Himanshu,這很有幫助。似乎這需要Spark 2.2,還有其他方式可以在2.0版本的火花低版本中執行此操作。 – Henosis