2

我在Spark Streaming中使用spark 1.6.0,並且遇到了廣泛操作中的一個問題。當我從DStream加入PipelinedRDD和RDD時,應用程序掛起

代碼示例: RDD名爲「a」,其類型爲:class'pyspark.rdd.PipelinedRDD'。

「一個」 被接收到爲:

# Load a text file and convert each line to a Row. 
    lines = sc.textFile(filename) 
    parts = lines.map(lambda l: l.split(",")) 
    clients = parts.map(lambda p: Row(client_id=int(p[0]), clientname=p[1] ...)) 

    # Infer the schema, and register the DataFrame as a table. 
    schemaPeople = sqlContext.createDataFrame(clients) 
    schemaPeople.registerTempTable("clients") 

    client_list = sqlContext.sql("SELECT * FROM clients") 

和後:

a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry))) 

有第二部分 「B」 型的類 'pyspark.streaming.dstream.TransformedDStream'。 我收到了 「B」,從水槽:

DStreamB = flumeStream.map(lambda tup: function_for_map(tup[1].encode('ascii','ignore'))) 

b = DStreamB.map(lambda event: (int(event[2]), value_from_event(event))) 

問題是:當我嘗試爲加盟:

mult = b.transform(lambda rdd: rdd.join(a)) 

我的應用程序在此階段掛起(現在我在b.pprint()之後和stage .join()之前顯示屏幕)

enter image description here

但是,當我加入:

  1. 聲明RDD 「測試」:

    test = sc.parallelize(range(1, 100000)).map(lambda k: (k, 'value')) 
    

    做:

    mult0 = a.join(test) 
    mult = b.transform(lambda rdd: rdd.join(mult0))` 
    

    然後它的工作原理(!! ):

    screen 2

  2. 我也可以這樣做:

    mult0 = b.transform(lambda rdd: rdd.join(test)) 
    

這樣:

我有RDDS 「a」 和 「測試」。 DStream「b」。 我可以乘:

  • 一個*測試* B
  • B *測試

但我不能這樣做 'B *一個'。

任何幫助表示讚賞!謝謝!

回答

0

在user6910411的意見,我沒有緩存 「a」 作爲

a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry))).cache() 

和問題得到解決。

相關問題