0
我有一個自定義的Spark流的foreach編寫器。對於我寫入JDBC源的每一行。在執行JDBC操作之前,我還想做一些快速查找並在執行JDBC操作之後更新值,如下面的示例代碼中的「Step-1」和「Step-3」...在Apache Spark中存在內存數據庫
I don不想使用REDIS,MongoDB等外部數據庫。我想用低足跡像RocksDB,德比等什麼......
我可以接受存儲每個應用程序的一個文件,就像檢查點,我將創建一個內部數據庫文件夾...
我看不到任何內存數據庫用於星火..
def main(args: Array[String]): Unit = {
val brokers = "quickstart:9092"
val topic = "safe_message_landing_app_4"
val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate();
val sparkContext = sparkSession.sparkContext;
sparkContext.setLogLevel("ERROR")
val sqlContext = sparkSession.sqlContext;
val kafkaDataframe = sparkSession.readStream.format("kafka")
.options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic,
"startingOffsets" -> "latest", "group.id" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader"))
.load()
kafkaDataframe.printSchema()
kafkaDataframe.createOrReplaceTempView("kafka_view")
val sqlDataframe = sqlContext.sql("select concat (topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view")
val customForEachWriter = new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long) = {
println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version)
true
}
override def process(value: Row) = {
// Step 1 ==> Lookup a key in persistent KEY-VALUE store
// JDBC operations
// Step 3 ==> Update the value in persistent KEY-VALUE store
}
override def close(errorOrNull: Throwable) = {
println(" ************** Closed ****************** ")
}
}
val yy = sqlDataframe
.writeStream
.queryName("foreachquery")
.foreach(customForEachWriter)
.start()
yy.awaitTermination()
sparkSession.close();
}
你問的https://db.apache。 org/derby/docs/10.13/devguide/cdevdvlpinmemdb.html?我真的不知道什麼是「永久性內存數據庫」,除非您正在討論使用像NVM這樣的硬件?如果沒有特殊的硬件,Derby內存數據庫不會持久。 –
我的意思是在內存中意味着.. MySQL,Redis作爲單獨的進程運行......我不想要......德比加載到火花驅動程序中,並從執行者我想連接到德比......因爲我的火花由紗線運行的作業將在5臺機器上..所以我可以使用德比是火花......並將它用於我的需要步驟1和3 ...但是不支持MVCC,所以我正在考慮H2數據庫...所以我想體驗一下使用Derby和H2是Spark – Manjesh
OK。 Derby關於這個「進程內」數據庫引擎的術語是「嵌入式」的,對於將Derby嵌入到另一個(Java)應用程序中來說,它的效果很好。 Derby不是MVCC數據庫引擎是正確的。要開始使用Derby,我推薦以下教程:https://db.apache.org/derby/docs/10.13/getstart/ –