2015-05-13 61 views
2

我有一個從遠程Oracle數據庫讀取記錄(至少65k記錄)的Java代碼片段。本質上,我們試圖將每小時過濾器傳遞到數據幀以獲取記錄,每小時分區x 24個。Spark DataFrame - 最後分區收集緩慢

源視圖基於一個包含數百萬條記錄的表。

我們面臨的問題是,Spark(在YARN或作爲SPARK羣集)在3分鐘內處理24個分區中的22個。最後2個分區需要5個多小時才能完成。

有什麼辦法可以加速使用DataFrames?

HashMap<String, String> options = new HashMap<>(); 
sqlContext.setConf("spark.sql.shuffle.partition", "50"); 
options.put("dbtable", "(select * from "+VIEW_NAME+" where 1=1)"); 
options.put("driver", "oracle.jdbc.OracleDriver"); 
options.put("url", JDBC_URL); 
options.put("partitionColumn", "hrs"); 
options.put("lowerBound", "00"); 
options.put("upperBound", "23"); 
options.put("numPartitions", "24"); 

DataFrame dk = sqlContext.load("jdbc", options).cache(); 
dk.registerTempTable(VIEW_NAME); 
dk.printSchema(); 
DateTime dt = new DateTime(2015, 5, 8, 10, 0, 0); 
String s = SQL_DATE_FORMATTER.print(dt); 
dt = dt.plusHours(24); 
String t = SQL_DATE_FORMATTER.print(dt); 
System.out.println("S is " + s + "and t is "+ t); 
Stream<Row> rows = dk.filter("DATETIME >= '" + s + "' and DATETIME <= '" + t + "'").collectAsList().parallelStream(); 
    System.out.println("Collected" + rows.count()); 
+0

對這個有什麼更新?有沒有找到解決辦法? – zengr

+0

沒有。沒有更新,但我確實找到了一件事情,它執行00時間分區到23時間分區,然後一直進行單個分區M/R(00-23),因此它不起作用。 – RvK

+0

作爲解決方法,我們應該更改dt.plusHours(24).minusSecond(1) – RvK

回答

0

不知道這是在完成一個答案,但作爲一個替代,如果我們做了以下

dt = dt.plusHours(24).minusSeconds(1) 

這是更快,但仍然不是一樣快,前23個分區