對於只讀12GB的行爲良好的csv文件,您可以將其複製到所有工人和驅動程序機器上,然後手動分割「,」。這可能不會解析任何RFC4180 csv,但它解析了我所擁有的。
- 當您申請集羣時,爲每個工作人員添加至少12GB的輔助磁盤空間。
- 使用至少具有12GB RAM的機器類型,例如
c3.2xlarge
。如果您不打算保持閒置狀態,並且可以承受較大的費用,則可以擴大規模。更大的機器意味着更少的磁盤文件複製即可開始。我經常在現貨市場上以每小時0.50美元的價格看到c3.8xlarge。
將文件複製到每個工人的每個工人的相同目錄中。這應該是物理連接的驅動器,即每臺計算機上的不同物理驅動器。
請確保您在驅動程序機器上也有相同的文件和目錄。
raw = sc.textFile("/data.csv")
print "Counted %d lines in /data.csv" % raw.count()
raw_fields = raw.first()
# this regular expression is for quoted fields. i.e. "23","38","blue",...
matchre = r'^"(.*)"$'
pmatchre = re.compile(matchre)
def uncsv_line(line):
return [pmatchre.match(s).group(1) for s in line.split(',')]
fields = uncsv_line(raw_fields)
def raw_to_dict(raw_line):
return dict(zip(fields, uncsv_line(raw_line)))
parsedData = (raw
.map(raw_to_dict)
.cache()
)
print "Counted %d parsed lines" % parsedData.count()
parsedData將類型的字典,其中,所述類型的字典的鍵是從第一行的CSV的字段名稱的RDD,並且值是當前行的CSV值。如果您的CSV數據中沒有標題行,這可能並不適合您,但應該清楚,您可以覆蓋讀取第一行的代碼並手動設置字段。
請注意,這對於創建數據框架或註冊Spark SQL表格並不是很有用。但是對於其他任何東西,都可以,如果需要將其轉儲到Spark SQL中,則可以進一步提取並將其轉換爲更好的格式。
我在沒有任何問題的情況下在一個7GB文件上使用它,除了我已經刪除了一些過濾器邏輯來檢測有效數據,這些數據的副作用是從解析數據中刪除標題。您可能需要重新實現一些過濾。
如果spark-csv lib是1.2.0+版本,你可以嘗試將'parserLib'選項設置爲'univocity'嗎? – rchukh
@rchukh是不是默認?今天已經從主人那裏建立了罐子。編輯:不,它不是。將嘗試。 – tchakravarty