2017-06-13 164 views
3

我使用的火花2.11版本,我在做什麼只有3個在我的應用程序的基本操作:性能問題

  1. 從數據庫中取記錄:從文件220萬
  2. 檢查記錄(5 000 )目前使用的數據庫(220萬美元)包含
  3. 寫作匹配的記錄,以csv格式

的文件,但這些操作3需要近20分鐘。如果我在sql中執行相同的操作,則只需要不到1分鐘。

我已經開始使用火花,因爲它會產生非常快的結果,但它需要太多的時間。如何提高性能?

第1步:從數據庫中記錄。

 Properties connectionProperties = new Properties(); 
     connectionProperties.put("user", "test"); 
     connectionProperties.put("password", "test##"); 
     String query="(SELECT * from items) 
     dataFileContent= spark.read().jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", query,connectionProperties); 

步驟2:檢查存在於文件B(2M)文件A的記錄(5K),使用含有

Dataset<Row> NewSet=source.join(target,target.col("ItemIDTarget").contains(source.col("ItemIDSource")),"inner"); 

步驟3:寫入匹配的記錄,以CSV格式的文件

NewSet.repartition(1).select("*") 
     .write().format("com.databricks.spark.csv") 
     .option("delimiter", ",") 
     .option("header", "true") 
     .option("treatEmptyValuesAsNulls", "true") 
     .option("nullValue", "") 
     .save(fileAbsolutePath); 

爲了提高性能,我嘗試了幾項設置,如設置Cache, 數據序列化

set("spark.serializer","org.apache.spark.serializer.KryoSerializer")), 

洗牌時間

sqlContext.setConf("spark.sql.shuffle.partitions", "10"), 

數據結構調整

-XX:+UseCompressedOops , 

沒有辦法沒有產生更好的性能。

+0

是否有理由在這個用例中使用spark?在我看來,將5k記錄寫入數據庫並在數據庫中發出SQL連接將是最有效的方法。 – maasg

+0

我的意思是,將這個查詢物化爲Spark需要多長時間:'SELECT * from items)'? – maasg

回答

4

提高性能更像是提高並行性。

並行性取決於RDD中分區的數量。

確保數據集/數據幀/ RDD既沒有太多的分區也沒有非常少的分區數。

請在下面檢查您可以改善您的代碼的建議。我更喜歡scala,所以我在scala中提供了一些建議。

步驟1: 確保您通過提及numPartition控制與數據庫建立的連接。

連接數量=分區數量。

下面我剛剛分配了10個num_partitions,你必須調整以獲得更多的性能。

int num_partitions; 
    num_partitions = 10; 
    Properties connectionProperties = new Properties(); 
    connectionProperties.put("user", "test"); 
    connectionProperties.put("password", "test##"); 
    connectionProperties.put("partitionColumn", "hash_code"); 
    String query = "(SELECT mod(A.id,num_partitions) as hash_code, A.* from items A)"; 
    dataFileContent = spark.read() 
    .jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", 
     dbtable = query, 
     columnName = "hash_code", 
     lowerBound = 0, 
     upperBound = num_partitions, 
     numPartitions = num_partitions, 
     connectionProperties); 

You can check how numPartitions works

第二步:

Dataset<Row> NewSet = source.join(target, 
    target.col("ItemIDTarget").contains(source.col("ItemIDSource")), 
    "inner"); 

由於具有5k的記錄(數據量小),可以使用如下所述的廣播連接表/數據幀中的一個。

import org.apache.spark.sql.functions.broadcast 
val joined_df = largeTableDF.join(broadcast(smallTableDF), "key") 

第三步:使用 聚結以減少分區的號碼,這樣避免了全面洗牌。

NewSet.coalesce(1).select("*") 
     .write().format("com.databricks.spark.csv") 
     .option("delimiter", ",") 
     .option("header", "true") 
     .option("treatEmptyValuesAsNulls", "true") 
     .option("nullValue", "") 
     .save(fileAbsolutePath); 

希望我的回答能幫助你。