-2
我是新來的Apache火花,並試圖理解結構化流與Apache卡夫卡在斯卡拉,但沒有工作在我的青睞,直到現在基本上我想發送JSON從卡夫卡過程它使用火花結構化流併發回到卡夫卡。我試着在網站上給出的例子,但它不工作。試圖理解結構化流
這裏是我的代碼:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
object dataset_kafka {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("kafka-consumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("subscribe", "test")
.load()
df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.trigger(Trigger.ProcessingTime("5 seconds"))
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("topic", "test1")
.option("checkpointLocation", "/home/hduser/Desktop/tempo")
.start()
.awaitTermination()
}
}
與我要去哪裏不對任何幫助?
我以這種格式發送卡夫卡JSON:
{"schema":"Hiren","payload":"123"}
歡迎來到SO!請參閱這裏瞭解如何發佈一個很好的問題,這個問題可能不會被關閉,甚至可能會被回答:https://stackoverflow.com/help/how-to-ask –
我的問題無效嗎? –
你需要顯示你自己的一些代碼沒有工作/你自己的一些努力。你所要求的是稱爲教程 – radumanolescu