我對Apache Spark(版本1.6)比較陌生,而且我覺得我碰壁了:我查看了大多數關於SE的Spark相關問題,但到目前爲止,我找不到任何幫助。我相信我在基礎層面上做了一些根本性的錯誤,但是我不能指出它到底是什麼,特別是因爲我寫的其他代碼運行得很好。Spark性能問題(可能是由於「基本」錯誤造成的)
儘管我會簡化我的任務,以便更好地理解,但我會盡可能詳細地解釋我的情況。請記住,因爲我仍然在學習它,所以我使用Spark的本地模式運行此代碼;值得注意的是我一直在使用DataFrames(而不是RDDs)。最後,請注意以下代碼是使用Pyspark以Python編寫的,但我確實歡迎使用Scala或Java的可行解決方案,因爲我認爲這個問題是非常基礎的。
我有一個通用的JSON文件,其結構類似於如下:
{"events":[
{"Person":"Alex","Shop":"Burger King","Timestamp":"100"},
{"Person":"Alex","Shop":"McDonalds","Timestamp":"101"},
{"Person":"Alex","Shop":"McDonalds","Timestamp":"104"},
{"Person":"Nathan","Shop":"KFC","Timestamp":"100"},
{"Person":"Nathan","Shop":"KFC","Timestamp":"120"},
{"Person":"Nathan","Shop":"Burger King","Timestamp":"170"}]}
我需要做的,是算多少時間由同一人兩次訪問之間傳遞給同一家商店。輸出應該是至少有一個客戶至少每5秒訪問一次的商店列表,以及符合此要求的客戶數量。在上述情況下,輸出應該是這個樣子:
{"Shop":"McDonalds","PeopleCount":1}
我的想法是分配給每個對(人,店鋪)相同的標識符,然後進行驗證,如果是對符合要求。可以使用窗口函數ROW_NUMBER()來分配標識符,這需要在Spark中使用hiveContext。這就是上面的文件應該看起來像標識符已經被分配後:
{"events":[
{"Person":"Alex","Shop":"Burger King","Timestamp":"100","ID":1},
{"Person":"Alex","Shop":"McDonalds","Timestamp":"101", "ID":2},
{"Person":"Alex","Shop":"McDonalds","Timestamp":"104", "ID":2},
{"Person":"Nathan","Shop":"KFC","Timestamp":"100","ID":3},
{"Person":"Nathan","Shop":"KFC","Timestamp":"120","ID":3},
{"Person":"Nathan","Shop":"Burger King","Timestamp":"170","ID":4}]}
因爲我需要來一個前進行幾個步驟(其中的一些需要使用自加入)每對結論,我使用了臨時表格。
我寫的代碼是這樣的(當然,我只包括了相關的部分 - 「DF」代表「數據幀」):
t1_df = hiveContext.read.json(inputFileName)
t1_df.registerTempTable("events")
t2_df = hiveContext.sql("SELECT Person, Shop, ROW_NUMBER() OVER (order by Person asc, Shop asc) as ID FROM events group by Person, Shop HAVING count(*)>1") #if there are less than 2 entries for the same pair, then we can discard this pair
t2_df.write.mode("overwrite").saveAsTable("orderedIDs")
n_pairs = t2_df.count() #used to determine how many pairs I need to inspect
i=1
while i<=n_pairs:
#now I perform several operations, each one displaying this structure
#first operation...
query="SELECT ... FROM orderedIDs WHERE ID=%d" %i
t3_df = hiveContext.sql(query)
t3_df.write.mode("overwrite").saveAsTable("table1")
#...second operation...
query2="SELECT ... FROM table1 WHERE ..."
t4_df = hiveContext.sql(query2)
temp3_df.write.mode("overwrite").saveAsTable("table2")
#...and so on. Let us skip to the last operation in this loop, which consists of the "saving" of the shop if it met the requirements:
t8_df = hiveContext.sql("SELECT Shop from table7")
t8_df.write.mode("append").saveAsTable("goodShops")
i=i+1
#then we only need to write the table to a proper file
output_df = hiveContext.sql("SELECT Shop, count(*) as PeopleCount from goodShops group by Shop")
output_df.write.json('output')
現在,來這裏的問題:輸出是正確的。我已經嘗試了幾個輸入,並且程序在這方面工作正常。然而,它速度非常緩慢:分析每一對需要大約15-20秒,而不管每對的條目如何。例如,如果有10對,大約需要3分鐘,如果有100對需要30分鐘,依此類推。我在幾臺機器上運行這個代碼,硬件相當好,但沒有任何改變。 我也嘗試緩存一些(甚至全部)我使用的表,但問題仍然存在(甚至在某些情況下需要增加的時間)。更具體地說,循環的最後一個操作(使用「append」的循環)需要幾秒鐘才能完成(從5到10),而前6個只需要1-2秒(這仍然很多,考慮到任務的範圍,但絕對更易於管理)。
我相信問題可能在於以下中的一個(或多個):
- 使用一個循環,這威力並行的事業問題;
- 使用「saveAsTable」的方法,強制寫入I/O的
- 不好或很差的緩存使用
這3個都是來我的心,因爲另一件唯一的東西我使用Spark編寫的軟件(我沒有遇到任何性能問題)不使用上述技術,因爲我基本上執行了簡單的JOIN操作,並使用了臨時表的方法(這對我的瞭解,不能用於循環)而不是saveAsTable方法。
我試圖儘可能的清楚,但如果你確實需要更多的細節,我想提供更多的信息。
編輯:我設法解決我的問題歸功於zero323的答案。事實上,使用LAG功能是我真正需要做的事情。另一方面,我知道使用「saveAsTable」方法應該是不鼓勵的,尤其是在循環中 - 因爲每次調用都會導致性能大幅下降。除非絕對必要,否則我會避免使用它。
如果這是所有你需要做的,你可以嘗試在你的初始數據groupbykey然後reduceGroups。它並不十分困難,但它應該非常快。 – BBogdan
你介意拓展一下嗎? –