2015-10-15 30 views
1

我至今是:打開RDD到廣播詞典查找

lookup = sc.textFile("/user/myuser/lookup.asv") 
lookup.map(lambda r: r.split(chr(1))) 

而且現在我有一個RDD看起來像

[ 
    [filename1, category1], 
    [filename2, category2], 
    ... 
    [filenamen, categoryn] 
] 

我怎樣才能把這一RDD成廣播字典一樣:

{filename1: category1, filename2: category2, ...} 

這是我試過,但沒有工作:

>>> broadcastVar = sc.broadcast({}) 
>>> data = sc.parallelize([[1,1], [2,2], [3,3], [4,4]]) 
>>> def myfunc(x): 
...  broadcastVar[str(x[0])] = x[1] 
... 
>>> result = data.map(myfunc) 
>>> broadcastVar 
<pyspark.broadcast.Broadcast object at 0x7f776555e710> 
>>> broadcastVar.value 
{} 
>>> result.collect() 
... 
ERROR: TypeError: 'Broadcast' object does not support item assignment 
... 
>>> broadcastVar.value 
{} 

有關爲什麼我建立這個巨大的查找變量的更多信息,請閱讀本:

這是本one的後續問題。

我有兩個表,其中

表1:其中各列包含該像素信息和第一列中的非常寬(25K列和150K行)表是輸入圖象文件的文件名。

表2:TSV(製表符分隔文件)文件,有300萬行,每行包含圖像文件名稱和圖像的產品類別。

在SQL中,我需要在文件名的這兩個表上做一個內部連接,這樣我就可以爲圖像數據添加標籤,以便稍後進行機器學習。

在任何類型的SQL中執行它是不現實的,因爲您必須爲table1創建一個具有25K列的表,而create table語法將會很荒謬。

然後我想創建一個使用table2的查找變量,也許使它成爲一個廣播變量,其中的關鍵是文件名,值是產品類別。

回答

0

廣播變量對工作人員是隻讀的。 Spark提供了只寫的累加器,但是這些專用於計數器等。在這裏,你可以簡單地收集並創建一個Python字典:

lookup_bd = sc.broadcast({ 
    k: v for (k, v) in lookup.map(lambda r: r.split(chr(1))).collect() 
}) 

,因爲你必須創建一個表table1的具有25K列,創建表的語法是不現實做到在任何類型的SQL的將是可笑的漫長。

創建不應該是一個問題。只要你知道的名字您可以輕鬆創建表像這樣編程:

from pyspark.sql import Row 

colnames = ["x{0}".format(i) for i in range(25000)] # Replace with actual names 

df = sc.parallelize([ 
    row(*[randint(0, 100) for _ in range(25000)]) for x in range(10) 
]).toDF() 

## len(df.columns) 
## 25000 

這裏有一個問題,即使你使用純RDDS這是更嚴重的。一般來說,非常寬的行很難在任何行式格式中處理。

你可以做的一件事是使用像SparseVectorSparseMatrix這樣的稀疏表示。另一個例子是使用RLE編碼像素信息。