2017-07-24 43 views
-2

我是新來的Apache火花,並試圖理解結構化流與Apache卡夫卡在斯卡拉,但沒有工作在我的青睞,直到現在基本上我想發送JSON從卡夫卡過程它使用火花結構化流併發回到卡夫卡。我試着在網站上給出的例子,但它不工作。試圖理解結構化流

這裏是我的代碼:

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.streaming.{OutputMode, Trigger} 
object dataset_kafka { 
    def main(args: Array[String]): Unit = { 

    val spark = SparkSession 
     .builder() 
     .appName("kafka-consumer") 
     .master("local[*]") 
     .getOrCreate() 

    import spark.implicits._ 

    spark.sparkContext.setLogLevel("WARN") 

    val df = spark 
     .readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", "172.21.0.187:9093") 
     .option("subscribe", "test") 
     .load() 

     df 
     .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
     .writeStream 
     .format("kafka") 
     .trigger(Trigger.ProcessingTime("5 seconds")) 
     .option("kafka.bootstrap.servers", "172.21.0.187:9093") 
     .option("topic", "test1") 
     .option("checkpointLocation", "/home/hduser/Desktop/tempo") 
     .start() 
     .awaitTermination() 
    } 
} 

與我要去哪裏不對任何幫助?

我以這種格式發送卡夫卡JSON:

{"schema":"Hiren","payload":"123"} 
+0

歡迎來到SO!請參閱這裏瞭解如何發佈一個很好的問題,這個問題可能不會被關閉,甚至可能會被回答:https://stackoverflow.com/help/how-to-ask –

+0

我的問題無效嗎? –

+2

你需要顯示你自己的一些代碼沒有工作/你自己的一些努力。你所要求的是稱爲教程 – radumanolescu

回答