2016-11-09 293 views
1

Spark應該以極快的速度完成數據處理。但我想我沒有使用我的程序的正確功能來讓S​​park以這種方式工作。Spark程序需要很長時間才能完成執行

這是我的計劃是如何的樣子:

from pyspark import SparkContext 
from pyspark import SQLContext 
from pyspark.sql.types import * 
from pyspark.sql import Row 
from pyparsing import re 
import time 

start_time = time.time() 
sc = SparkContext("local","test") 
sqlContext = SQLContext(sc) 

def func1(lines): 
    for line in lines: 
     qtype = re.search("qtype=(\S+)",line) 
     try: 
      url = re.search(" url=(\S+)",line) 
      url=url.group(1) 
     except: 
      url="null" 

     time = re.search("(^\S+)",line) 
     .... #extracting field names 
     row = Row(time = time.group(1),ttms = ttms.group(1),chi = chi.group(1),pssc = pssc.group(1),cqhm = cqhm.group(1),rtype = rtype.group(1),rdetails = rdetails.group(1),rurl = rurl.group(1),qtype = qtype.group(1),phn = phn.group(1),fqdn = fqdn,url = url) 

    return row 

file = sc.textFile("C:\\Logs\\filename.log").cache() 
result = file.map(lambda x: x.split("\n")) 
lines = result.map(func1).collect() 
df = sqlContext.createDataFrame(lines) 
df.registerTempTable("df") 
df.show() 

for line in df : 
    query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect() 
    query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect() 
    query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect() 
    query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect() 

df1 = sqlContext.createDataFrame(query1) 
df2 = sqlContext.createDataFrame(query2) 
df3 = sqlContext.createDataFrame(query3) 
df4 = sqlContext.createDataFrame(query4) 
df1.toPandas().to_csv("C:\\Sample logs\\sqlcsv1.csv") 
df2.toPandas().to_csv("C:\\Sample logs\\sqlcsv2.csv") 
df3.toPandas().to_csv("C:\\Sample logs\\sqlcsv3.csv") 
df4.toPandas().to_csv("C:\\Sample logs\\sqlcsv4.csv") 
print(time.time() - start_time) 

這個程序需要將近200秒,執行這是一個很長的時間。我無法弄清楚原因。 (我的日誌文件包含大約34k條日誌行)。我試圖用正則表達式的火花filter,但我得到的錯誤rdd is not iterable。所以我需要知道如何優化我的程序以使其運行速度更快。 另外,我得到stage x contains task of very large size的警告。試過廣播lines,但發生了錯誤。

+0

這是一個單箱或多節點你遇到的羣集?你能否澄清代碼中代表查詢時間的地方?一些快速觀察可能會有所幫助:1)'線'你需要運行一個收集? 2)你想在你的uber-dataframe中爲每行創建一個DataFrame嗎? –

+1

您可以嘗試解釋一下,請問您正在嘗試做什麼?對我來說不是很清楚。數據的大小又是多少? DF中有多少條目? – eliasah

+0

@DennyLee:這是我正在使用的一個盒子。我剛剛使用python的'time'函數來評估整個程序執行的時間。 1)我需要爲'lines'使用collect,以便它被轉換爲列表並且可以從那裏創建df。 2)我沒有爲每一行創建一個df,我只是從超級用戶那裏查詢每一行。 – kaks

回答

3

一些原因,這個火花的代碼運行速度會變慢與純Python代碼:使用一臺機器

sc = SparkContext("local","test") 

蟒蛇火花集羣上運行時,可能要執行比純Python更好

1)。 在「本地」模式下,spark對純python沒有好處。使用「緩存」時,它不使用

file = sc.textFile("C:\\Logs\\filename.log").cache() 

.cache()

2)應該使用僅當相同對象被調用多次。 「文件」只調用一次 - 即不需要使用緩存

3)「收集()」

lines = result.map(func1).collect() 

for line in df : 
    query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect() 
    query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect() 
    query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect() 
    query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect() 

的一般規則 - 避免使用「收集()」,除非是真的需要它。使用

4) 「toPandas()」

df1.toPandas().to_csv("C:\\Sample logs\\sqlcsv1.csv") 
df2.toPandas().to_csv("C:\\Sample logs\\sqlcsv2.csv") 
df3.toPandas().to_csv("C:\\Sample logs\\sqlcsv3.csv") 
df4.toPandas().to_csv("C:\\Sample logs\\sqlcsv4.csv") 

「toPandas()」 實施開始的 「收集()」 執行(見#3)

由於火花2.0,你可以直接寫數據幀到CSV:

http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) 

5)我不知道,我明白下面的代碼:

for line in df : 
    query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect() 
    query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect() 
    query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect() 
    query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect() 

你有什麼努力實現「爲DF行:」?

如果數據框包含100,000行,您計劃執行這個「for-loop」100,000次嗎?

似乎變量query1,query2,query3,query4將僅保存for循環的最後一次執行的結果(因爲每次從「df」中讀取新的「行」時,它們的值似乎都被覆蓋「) - 是否有意?

6)可以直接從RDD

創建數據幀例如使用

sqlContext.createDataFrame 

http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html

createDataFrame(**數據,模式=無,samplingRatio =無,verifySchema =真) **創建一個數據幀從RDD,列表或pandas.DataFrame 。

RDD.toDF() 

http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html

toDF(*的cols)

返回一個新的類:數據幀,與新指定的列名 參數:列 - 新的列名列表(串)

>>> df.toDF('f1', 'f2').collect() 
[Row(f1=2, f2=u'Alice'), Row(f1=5, f2=u'Bob')] 
+0

2)和5)非常有效。謝謝!我像5)中所說的那樣刪除了for循環。 我使用'collect'的原因是RDD將被轉換爲列表。只有這樣我才能創建一個df。否則會引發錯誤。有另一種方法嗎? 我認爲如果使用熊貓,轉換爲CSV更容易。 現在我只使用本地機器。稍後我會開始使用集羣。 – kaks

+0

@kaks - 我已經更新了我的答案,關於df.write.csv()函數。 – Yaron

+0

@kaks - 添加了關於直接從RDD創建Dataframe的部分 – Yaron

相關問題