2017-04-03 38 views
2

我有一個作業,加載一個DataFrame對象,然後使用DataFrame partitionBy方法將數據保存爲parquet格式。然後我發佈創建的路徑,以便後續作業可以使用輸出。輸出中的路徑如下所示:爲什麼分區鍵列從DataFrame丟失

/ptest/_SUCCESS 
/ptest/id=0 
/ptest/id=0/part-00000-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet 
/ptest/id=0/part-00001-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet 
/ptest/id=0/part-00002-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet 
/ptest/id=1 
/ptest/id=1/part-00003-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet 
/ptest/id=1/part-00004-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet 
/ptest/id=1/part-00005-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet 
/ptest/id=3 
/ptest/id=3/part-00006-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet 
/ptest/id=3/part-00007-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet 

當我接收到新數據時,它會附加到數據集中。路徑已發佈,因此依賴於數據的作業只能處理新數據。

下面的代碼的一個簡單的例子:

>>> rdd = sc.parallelize([(0,1,"A"), (0,1,"B"), (0,2,"C"), (1,2,"D"), (1,10,"E"), (1,20,"F"), (3,18,"G"), (3,18,"H"), (3,18,"I")]) 
>>> df = sqlContext.createDataFrame(rdd, ["id", "score","letter"]) 
>>> df.show() 
+---+-----+------+ 
| id|score|letter| 
+---+-----+------+ 
| 0| 1|  A| 
| 0| 1|  B| 
| 0| 2|  C| 
| 1| 2|  D| 
| 1| 10|  E| 
| 1| 20|  F| 
| 3| 18|  G| 
| 3| 18|  H| 
| 3| 18|  I| 
+---+-----+------+ 
>>> df.write.partitionBy("id").format("parquet").save("hdfs://localhost:9000/ptest") 

問題是,當另一個程序試圖使用已發佈的路徑來讀取文件:

>>> df2 = spark.read.format("parquet").schema(df2.schema).load("hdfs://localhost:9000/ptest/id=0/") 
>>> df2.show() 
+-----+------+ 
|score|letter| 
+-----+------+ 
| 1|  A| 
| 1|  B| 
| 2|  C| 
+-----+------+ 

正如你所看到的分區鍵從加載的數據集中丟失。如果我要發佈作業可以使用的模式,我可以使用模式加載文件。該文件加載和分區鍵存在,但值爲空:

>>> df2 = spark.read.format("parquet").schema(df.schema).load("hdfs://localhost:9000/ptest/id=0/") 
>>> df2.show() 
+----+-----+------+ 
| id|score|letter| 
+----+-----+------+ 
|null| 1|  A| 
|null| 1|  B| 
|null| 2|  C| 
+----+-----+------+ 

有沒有一種方法,以確保分區密鑰存儲瓦特/拼花數據?我不想要求其他進程分析獲取密鑰的路徑。

回答

2

如果這樣你應該提供basePathoption

(spark.read 
    .format("parquet") 
    .option("basePath", "hdfs://localhost:9000/ptest/") 
    .load("hdfs://localhost:9000/ptest/id=0/")) 

它指向您的數據的根目錄。

隨着basePathDataFrameReader將意識到分區並相應地調整模式。

0

如果其他應用程序加載特定分區,它看起來像load("hdfs://localhost:9000/ptest/id=0/")路徑,該應用程序可以調整代碼來替換空與分區列值

part = 0 # partition to load 
df2 =spark.read.format("parquet")\ 
       .schema(df.schema)\ 
       .load("ptest/id="+str(part)).fillna(part,["id"]) 

這樣,輸出將是 -

+---+-----+------+ 
| id|score|letter| 
+---+-----+------+ 
| 0| 1|  A| 
| 0| 1|  B| 
| 0| 2|  C| 
+---+-----+------+ 
+0

另一個應用程序只有完整的特定分區,因爲我發佈路徑(例如'hdfs:// localhost:9000/ptest/id = 0 /')。它不知道路徑是什麼或者它是如何構建的。爲了使此解決方案能夠運行,應用程序必須解析路徑以提取值。我希望避免這種情況。 –

+0

'DataFrameWriter'不具有'schema'選項,因此指定自定義模式已走出門外。你有沒有想過複製分區列?一個用於實際分區並複製一個供其他應用程序使用而不必解析路徑? – Pushkr

+0

想過它。我可能會將其視爲潛在的解決方案。或者我可能會改變發佈路徑的方式,以提供關於分區和分區鍵值的知識 –