2016-09-13 18 views
0

我對Apache Spark(版本1.6)比較陌生,而且我覺得我碰壁了:我查看了大多數關於SE的Spark相關問題,但到目前爲止,我找不到任何幫助。我相信我在基礎層面上做了一些根本性的錯誤,但是我不能指出它到底是什麼,特別是因爲我寫的其他代碼運行得很好。Spark性能問題(可能是由於「基本」錯誤造成的)

儘管我會簡化我的任務,以便更好地理解,但我會盡可能詳細地解釋我的情況。請記住,因爲我仍然在學習它,所以我使用Spark的本地模式運行此代碼;值得注意的是我一直在使用DataFrames(而不是RDDs)。最後,請注意以下代碼是使用Pyspark以Python編寫的,但我確實歡迎使用Scala或Java的可行解決方案,因爲我認爲這個問題是非常基礎的。

我有一個通用的JSON文件,其結構類似於如下:

{"events":[ 
    {"Person":"Alex","Shop":"Burger King","Timestamp":"100"}, 
    {"Person":"Alex","Shop":"McDonalds","Timestamp":"101"}, 
    {"Person":"Alex","Shop":"McDonalds","Timestamp":"104"}, 
    {"Person":"Nathan","Shop":"KFC","Timestamp":"100"}, 
    {"Person":"Nathan","Shop":"KFC","Timestamp":"120"}, 
    {"Person":"Nathan","Shop":"Burger King","Timestamp":"170"}]} 

我需要做的,是算多少時間由同一人兩次訪問之間傳遞給同一家商店。輸出應該是至少有一個客戶至少每5秒訪問一次的商店列表,以及符合此要求的客戶數量。在上述情況下,輸出應該是這個樣子:

{"Shop":"McDonalds","PeopleCount":1} 

我的想法是分配給每個(人,店鋪)相同的標識符,然後進行驗證,如果是對符合要求。可以使用窗口函數ROW_NUMBER()來分配標識符,這需要在Spark中使用hiveContext。這就是上面的文件應該看起來像標識符已經被分配後:

{"events":[ 
    {"Person":"Alex","Shop":"Burger King","Timestamp":"100","ID":1}, 
    {"Person":"Alex","Shop":"McDonalds","Timestamp":"101", "ID":2}, 
    {"Person":"Alex","Shop":"McDonalds","Timestamp":"104", "ID":2}, 
    {"Person":"Nathan","Shop":"KFC","Timestamp":"100","ID":3}, 
    {"Person":"Nathan","Shop":"KFC","Timestamp":"120","ID":3}, 
    {"Person":"Nathan","Shop":"Burger King","Timestamp":"170","ID":4}]} 

因爲我需要來一個前進行幾個步驟(其中的一些需要使用自加入)每對結論,我使用了臨時表格。

我寫的代碼是這樣的(當然,我只包括了相關的部分 - 「DF」代表「數據幀」):

t1_df = hiveContext.read.json(inputFileName) 
t1_df.registerTempTable("events") 
t2_df = hiveContext.sql("SELECT Person, Shop, ROW_NUMBER() OVER (order by Person asc, Shop asc) as ID FROM events group by Person, Shop HAVING count(*)>1") #if there are less than 2 entries for the same pair, then we can discard this pair 
t2_df.write.mode("overwrite").saveAsTable("orderedIDs") 
n_pairs = t2_df.count() #used to determine how many pairs I need to inspect 
i=1 
while i<=n_pairs: 
    #now I perform several operations, each one displaying this structure 
    #first operation... 
    query="SELECT ... FROM orderedIDs WHERE ID=%d" %i 
    t3_df = hiveContext.sql(query) 
    t3_df.write.mode("overwrite").saveAsTable("table1") 
    #...second operation... 
    query2="SELECT ... FROM table1 WHERE ..." 
    t4_df = hiveContext.sql(query2) 
    temp3_df.write.mode("overwrite").saveAsTable("table2") 
    #...and so on. Let us skip to the last operation in this loop, which consists of the "saving" of the shop if it met the requirements: 
    t8_df = hiveContext.sql("SELECT Shop from table7") 
    t8_df.write.mode("append").saveAsTable("goodShops") 
    i=i+1 

#then we only need to write the table to a proper file 
output_df = hiveContext.sql("SELECT Shop, count(*) as PeopleCount from goodShops group by Shop") 
output_df.write.json('output') 

現在,來這裏的問題:輸出是正確的。我已經嘗試了幾個輸入,並且程序在這方面工作正常。然而,它速度非常緩慢:分析每一對需要大約15-20秒,而不管每對的條目如何。例如,如果有10對,大約需要3分鐘,如果有100對需要30分鐘,依此類推。我在幾臺機器上運行這個代碼,硬件相當好,但沒有任何改變。 我也嘗試緩存一些(甚至全部)我使用的表,但問題仍然存在(甚至在某些情況下需要增加的時間)。更具體地說,循環的最後一個操作(使用「append」的循環)需要幾秒鐘才能完成(從5到10),而前6個只需要1-2秒(這仍然很多,考慮到任務的範圍,但絕對更易於管理)。

我相信問題可能在於以下中的一個(或多個):

  1. 使用一個循環,這威力並行的事業問題;
  2. 使用「saveAsTable」的方法,強制寫入I/O的
  3. 不好或很差的緩存使用

這3個都是來我的心,因爲另一件唯一的東西我使用Spark編寫的軟件(我沒有遇到任何性能問題)不使用上述技術,因爲我基本上執行了簡單的JOIN操作,並使用了臨時表的方法(這對我的瞭解,不能用於循環)而不是saveAsTable方法。

我試圖儘可能的清楚,但如果你確實需要更多的細節,我想提供更多的信息。

編輯:我設法解決我的問題歸功於zero323的答案。事實上,使用LAG功能是我真正需要做的事情。另一方面,我知道使用「saveAsTable」方法應該是不鼓勵的,尤其是在循環中 - 因爲每次調用都會導致性能大幅下降。除非絕對必要,否則我會避免使用它。

+0

如果這是所有你需要做的,你可以嘗試在你的初始數據groupbykey然後reduceGroups。它並不十分困難,但它應該非常快。 – BBogdan

+0

你介意拓展一下嗎? –

回答

1

同一人到同一商店進行兩次探訪之間經過了多少時間。輸出應該是至少有一個客戶至少每5秒訪問一次的商店列表,以及符合此要求的客戶數量。

如何簡單lag採用聚集:

from pyspark.sql.window import Window 
from pyspark.sql.functions import col, lag, sum 

df = (sc 
    .parallelize([ 
     ("Alex", "Burger King", "100"), ("Alex", "McDonalds", "101"), 
     ("Alex", "McDonalds", "104"), ("Nathan", "KFC", "100"), 
     ("Nathan", "KFC", "120"), ("Nathan", "Burger King", "170") 
    ]).toDF(["Person", "Shop", "Timestamp"]) 
    .withColumn("Timestamp", col("timestamp").cast("long"))) 

w = (Window() 
    .partitionBy("Person", "Shop") 
    .orderBy("timestamp")) 

ind = ((
    # Difference between current and previous timestamp le 5 
    col("Timestamp") - lag("Timestamp", 1).over(w)) <= 5 
).cast("long") # Cast so we can sum 

(df 
    .withColumn("ind", ind) 
    .groupBy("Shop") 
    .agg(sum("ind").alias("events")) 
    .where(col("events") > 0) 
    .show()) 

## +---------+------+ 
## |  Shop|events| 
## +---------+------+ 
## |McDonalds|  1| 
## +---------+------+ 
+0

這個想法很棒,我很想嘗試一下。但是,我在循環中執行了其他幾個操作(我沒有在我的問題中提到過),我不確定是否仍然可以這樣做。 您的方法與我對問題的想象完全不同,所以我會提供必要的信息。還有,爲什麼我的程序表現如此之慢? –

+0

呃,很難說....'ROW_NUMBER()OVER(順序由Person asc,Shop asc)'不好,重複循環''SELECT ... FROM ... WHERE ...'不好特別是如果數據沒有使用基於條件的分區進行緩存,根據格式和存儲重複執行'write.mode(「append」)'可能效率低下......但我想這可能只是冰山一角: ) – zero323

相關問題