2015-11-22 55 views
0
animals_population_file = sc.textFile("input/myFile1.txt") 
animals_place_file = sc.textFile("input/myFile2.txt") 

animals_population_file:如何通過密鑰連接兩個RDD?

Dogs, 5 
Cats, 6 

animals_place_file:

Dogs, Italy 
Cats, Italy 
Dogs, Spain 

現在我想用動物的類型,一鍵加入animals_population_fileanimals_place_file。 結果應該是這樣:

Dogs, [Italy, Spain, 5] 
Cats, [Italy, 6] 

我試過joined = animals_population_file.join(animals_place_file),但我不知道如何定義的關鍵。此外,當我運行joined.collect(),它給了我一個錯誤:

298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling o247.collect. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 29, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/lib/spark/python/pyspark/worker.py", line 101, in main 
    process() 
    File "/usr/lib/spark/python/pyspark/worker.py", line 96, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/lib/spark/python/pyspark/serializers.py", line 236, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 1807, in <lambda> 
    map_values_fn = lambda (k, v): (k, f(v)) 
ValueError: too many values to unpack 
+0

@ zero323:我試圖在提到線程提供的解決方案。但是,我的問題是,我不知道如何指向這個代碼df1.join中的鍵(df2,df1.k == df2.k,joinType ='inner')。另外,如果我只是做了df1.join(df2),那麼collect()會給出一個錯誤。它沒有在尖銳的線程中解釋。 –

+0

你上面顯示的是你的文件的確切內容? – zero323

+0

@ zero323:精確含量:RDD1集= [u'Surreal_Games', u'269' , u'Hourly_Games', u'428' , u'Hot_Talking」, u'747' , u'Almost_Sports' , u'350 '] 和另一RDD2 = [u'Loud_Games', u'BAT ' u'Cold_Talking', u'DEF ' u'Surreal_Sports', u'XYZ', ù 'Hourly_Sports', u'CAB ' u'Hot_Talking', u'MAN ' u'Almost_Cooking', u'BAT'] –

回答

1

您不必PairRdd當你(從評論你的RDDS內容基礎)運行文本文件。 要加入,您需要PairRDD。 所以,把你的投入pairRDDs

val rdd1 = sc.textFile("input/myFile1.txt") 
val rdd2 = sc.textFile("input/myFile2.txt") 

val data1 = rdd1.map(line => line.split(",").map(elem => elem.trim)) 
val data2 = rdd2.map(line => line.split(",").map(elem => elem.trim)) 

val pairRdd1 = data1.map(r => (r(0), r)) /** zero index is the animal type which is the key in file 1*/ 
val pairRdd2 = data2.map(r => (r(0), r)) /** zero index is the animal type which is the key in file 2 as well */ 

val joined = pairRdd1.join(pairRdd2) 

val local = joined.collect() 
local.foreach{case (k, v) => { 
    print(k + " : ") 
    println(v._1.mkString("|") + "|" + v._2.mkString("|")) 
}} 
相關問題