Spark應該以極快的速度完成數據處理。但我想我沒有使用我的程序的正確功能來讓Spark以這種方式工作。Spark程序需要很長時間才能完成執行
這是我的計劃是如何的樣子:
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyparsing import re
import time
start_time = time.time()
sc = SparkContext("local","test")
sqlContext = SQLContext(sc)
def func1(lines):
for line in lines:
qtype = re.search("qtype=(\S+)",line)
try:
url = re.search(" url=(\S+)",line)
url=url.group(1)
except:
url="null"
time = re.search("(^\S+)",line)
.... #extracting field names
row = Row(time = time.group(1),ttms = ttms.group(1),chi = chi.group(1),pssc = pssc.group(1),cqhm = cqhm.group(1),rtype = rtype.group(1),rdetails = rdetails.group(1),rurl = rurl.group(1),qtype = qtype.group(1),phn = phn.group(1),fqdn = fqdn,url = url)
return row
file = sc.textFile("C:\\Logs\\filename.log").cache()
result = file.map(lambda x: x.split("\n"))
lines = result.map(func1).collect()
df = sqlContext.createDataFrame(lines)
df.registerTempTable("df")
df.show()
for line in df :
query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect()
query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect()
query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect()
query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect()
df1 = sqlContext.createDataFrame(query1)
df2 = sqlContext.createDataFrame(query2)
df3 = sqlContext.createDataFrame(query3)
df4 = sqlContext.createDataFrame(query4)
df1.toPandas().to_csv("C:\\Sample logs\\sqlcsv1.csv")
df2.toPandas().to_csv("C:\\Sample logs\\sqlcsv2.csv")
df3.toPandas().to_csv("C:\\Sample logs\\sqlcsv3.csv")
df4.toPandas().to_csv("C:\\Sample logs\\sqlcsv4.csv")
print(time.time() - start_time)
這個程序需要將近200秒,執行這是一個很長的時間。我無法弄清楚原因。 (我的日誌文件包含大約34k條日誌行)。我試圖用正則表達式的火花filter
,但我得到的錯誤rdd is not iterable
。所以我需要知道如何優化我的程序以使其運行速度更快。 另外,我得到stage x contains task of very large size
的警告。試過廣播lines
,但發生了錯誤。
這是一個單箱或多節點你遇到的羣集?你能否澄清代碼中代表查詢時間的地方?一些快速觀察可能會有所幫助:1)'線'你需要運行一個收集? 2)你想在你的uber-dataframe中爲每行創建一個DataFrame嗎? –
您可以嘗試解釋一下,請問您正在嘗試做什麼?對我來說不是很清楚。數據的大小又是多少? DF中有多少條目? – eliasah
@DennyLee:這是我正在使用的一個盒子。我剛剛使用python的'time'函數來評估整個程序執行的時間。 1)我需要爲'lines'使用collect,以便它被轉換爲列表並且可以從那裏創建df。 2)我沒有爲每一行創建一個df,我只是從超級用戶那裏查詢每一行。 – kaks