2017-09-15 61 views
0

我有一個自定義的Spark流的foreach編寫器。對於我寫入JDBC源的每一行。在執行JDBC操作之前,我還想做一些快速查找並在執行JDBC操作之後更新值,如下面的示例代碼中的「Step-1」和「Step-3」...在Apache Spark中存在內存數據庫

I don不想使用REDIS,MongoDB等外部數據庫。我想用低足跡像RocksDB,德比等什麼......

我可以接受存儲每個應用程序的一個文件,就像檢查點,我將創建一個內部數據庫文件夾...

我看不到任何內存數據庫用於星火..

def main(args: Array[String]): Unit = { 

val brokers = "quickstart:9092" 
val topic = "safe_message_landing_app_4" 

val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate(); 

val sparkContext = sparkSession.sparkContext; 
sparkContext.setLogLevel("ERROR") 
val sqlContext = sparkSession.sqlContext; 

val kafkaDataframe = sparkSession.readStream.format("kafka") 
    .options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic, 
    "startingOffsets" -> "latest", "group.id" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader")) 
    .load() 

kafkaDataframe.printSchema() 
kafkaDataframe.createOrReplaceTempView("kafka_view") 
val sqlDataframe = sqlContext.sql("select concat (topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view") 

val customForEachWriter = new ForeachWriter[Row] { 
    override def open(partitionId: Long, version: Long) = { 
    println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version) 
    true 
    } 

    override def process(value: Row) = { 
    // Step 1 ==> Lookup a key in persistent KEY-VALUE store 

    // JDBC operations 

    // Step 3 ==> Update the value in persistent KEY-VALUE store 
    } 

    override def close(errorOrNull: Throwable) = { 
    println(" ************** Closed ****************** ") 
    } 
} 

val yy = sqlDataframe 
    .writeStream 
    .queryName("foreachquery") 
    .foreach(customForEachWriter) 
    .start() 

yy.awaitTermination() 

sparkSession.close(); 

}

+1

你問的https://db.apache。 org/derby/docs/10.13/devguide/cdevdvlpinmemdb.html?我真的不知道什麼是「永久性內存數據庫」,除非您正在討論使用像NVM這樣的硬件?如果沒有特殊的硬件,Derby內存數據庫不會持久。 –

+1

我的意思是在內存中意味着.. MySQL,Redis作爲單獨的進程運行......我不想要......德比加載到火花驅動程序中,並從執行者我想連接到德比......因爲我的火花由紗線運行的作業將在5臺機器上..所以我可以使用德比是火花......並將它用於我的需要步驟1和3 ...但是不支持MVCC,所以我正在考慮H2數據庫...所以我想體驗一下使用Derby和H2是Spark – Manjesh

+2

OK。 Derby關於這個「進程內」數據庫引擎的術語是「嵌入式」的,對於將Derby嵌入到另一個(Java)應用程序中來說,它的效果很好。 Derby不是MVCC數據庫引擎是正確的。要開始使用Derby,我推薦以下教程:https://db.apache.org/derby/docs/10.13/getstart/ –

回答

1

Manjesh,

你在找什麼,「星火和你的內存數據庫作爲一個無縫集羣,共享單個進程空間「,支持MVCC正是SnappyData所提供的。藉助SnappyData,您希望快速查找的表格與正在運行Spark流式作業的過程處於同一個過程中。檢查出來here

SnappyData擁有核心產品的Apache V2許可證,並且您所指的特定用途可在OSS下載中找到。

(披露:我是一個SnappyData員工,是有意義的提供產品具體回答這個問題,因爲該產品是問題的答案)