2017-10-10 62 views
0

如果我在GCS上存儲了一個.txt文件,其中包含將用作beam.Filter一部分的單詞列表,可以在我的apache波束管道中動態訪問此列表嗎?我知道我可以在流水線中將這個列表定義爲一個全局變量,但我不確定如何將整個文件讀入列表,以及是否有任何光束技巧來完成此操作。有什麼建議麼?這是我當前實現,其工作不..Google Cloud Dataflow訪問雲存儲中的.txt文件

def boolean_terms(word, term_list): 
    if word in term_list: 
    return (word, 1) 
    else: 
    return (word, 0) 

# side table 
filter_terms = p | beam.io.ReadFromText(path_to_gcs_txt_file) 

words = ... 

filtered_words = words | beam.FlatMap(lambda x: 
    [boolean_terms(word, filter_terms) for word in x]) 

我得到以下錯誤「類型錯誤:類型_InvalidUnpickledPCollection'的說法並不迭代」

回答

3

您可以訪問單詞列表爲side input 。我相信beam.Filter變換支持使用來自過濾功能的側面輸入,其方式與該鏈接示例中的FlatMapParDo完全相同。

喜歡的東西:

words | beam.Filter(lambda x, filter_terms: word in filter_terms, 
        filter_terms=pvalue.AsList(p | beam.io.ReadFromText(path))) 
+0

謝謝!我認爲我更接近,但它似乎仍然不適合我。我錯過了什麼嗎? – reese0106

+0

嗯,我想我想通了 - 我需要添加'pvalue.AsList(filter_terms)'讓這個工作正常 – reese0106

相關問題