2016-10-07 136 views
0

我有一個需求,即需要從多個源系統(Mysql實例)每隔5分鐘獲取一次數據,並使用其他一些數據(S3中可以說)加入和豐富它們。Spark中的Mysql數據處理

我想在Spark中進行這個處理來將我的執行分配給多個執行者。

主要的問題是每次我在Mysql中進行查找時,我只想獲取最新記錄(可以用lastModifiedOn> timestamp來說)。 如何有效地選擇性地獲取MySql行? 這是我曾嘗試:

val filmDf = sqlContext.read.format("jdbc") 
    .option("url", "jdbc:mysql://localhost/sakila") 
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "film").option("user", "root").option("password", "") 
    .load() 
+0

你可以更新你已經嘗試了什麼你的問題? – eliasah

+0

@eliasah是的將更新帖子。 – Karshit

回答

0

您應該使用SQL火花與JDBC數據源。我向你展示一個例子。

val res = spark.read.jdbc(
     url = "jdbc:mysql://localhost/test?user=minty&password=greatsqldb", 
     table = "TEST.table", 
     columnName = "lastModifiedOn", 
     lowerBound = lowerTimestamp, 
     upperBound = upperTimestamp, 
     numPartitions = 20, 
     connectionProperties = new Properties() 
    ) 

有Apache中的星火測試套件例子:https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

+0

謝謝,你能告訴我numPartition在這裏意味着什麼嗎? 和connectionProperties以及? – Karshit

+0

結果將是一個帶有numPartitions分區的DataFrame。 Spark將並行執行numPartitions查詢以檢索結果。示例: lowerBound = 1,upperBound = 10,numPartitions = 2,Spark將執行兩個查詢,第一個位於1和5之間,第二個位於6和10之間。 – gasparms

+0

connectionProperties是一個將某些屬性傳遞給db的映射。取決於你的數據庫你可以使用它或不。 – gasparms