2013-07-08 29 views
0

我想修改高級教程的innerjoin-例子,使它可以使用mapreduce進行稀疏矩陣乘法(由Ullman描述)。因此,我需要第二個映射 - 減少步驟在結果矩陣中求和相等位置的值。迪斯科中的鏈接作業​​(MapReduce)

不幸的是,我沒有設法得到類CsvInnerJoin的第一個reduce函數的輸出到SumJob的map函數中。

import sys 
sys.path.append("/home/damian/disco/lib/") 
from disco.core import Job, result_iterator 
from disco.worker.classic.func import chain_reader 
import csv, sys 


if __name__ == '__main__': 
    input_filename = "input.csv" 
    output_filename = "output.csv" 
    if len(sys.argv) > 1: 
     input_filename = sys.argv[1] 
     if len(sys.argv) > 2: 
      output_filename = sys.argv[2] 

    from CsvInnerJoiner import CsvInnerJoiner 
    from SumJob import SumJob 

    job = CsvInnerJoiner().run(input=[input_filename]) 
    job = SumJob().run() (******************) 

    with open(output_filename, 'w') as fp: 
     writer = csv.writer(fp) 
     for url_key, descriptors in result_iterator(job.wait(show=True)): 
      writer.writerow([url_key] + descriptors) 

CsvInnerJoiner.py是這樣的文件:

import sys 
sys.path.append("/home/damian/disco/lib/") 
from disco.core import Job, result_iterator 
from disco.worker.classic.func import chain_reader 
import csv, sys 
class CsvInnerJoiner(Job): 
    partitions = 2 
    sort = True 

    def map(self, row, params): 
     yield row[0], row[1:] 

    @staticmethod 
    def map_reader(fd, size, url, params): 
     reader = csv.reader(fd, delimiter=',') 
     for row in reader: 
      yield row 

    #@staticmethod 
def reduce(self, rows_iter, out, params): 
    from disco.util import kvgroup 
    from itertools import chain 
    #for url_key, descriptors in kvgroup(sorted(rows_iter)): 
    for url_key, descriptors in kvgroup(rows_iter): 
     merged_descriptors = list(chain.from_iterable(descriptors)) 
     print url_key,"_______",merged_descriptors 
     if len(merged_descriptors) > 3: 
      Alist = merged_descriptors[:merged_descriptors.index("B")] 
      Blist = merged_descriptors[merged_descriptors.index("B"):] 
      Alistlength = len(Alist)/3 
      Blistlength = len(Blist)/3 
      for i in range(Alistlength): 
       for j in range(Blistlength): 
        container = int(Alist[3*i+2])*int(Blist[3*j+2]) 
        yield [Alist[3*i+1],Blist[3*j+1]],container 
        #out.add(Alist[3*i+1],[Blist[3*j+1],container])   

SumJob.py是這樣的:

import sys 
sys.path.append("/home/damian/disco/lib/") 
from disco.core import Job, result_iterator 
from disco.worker.classic.func import chain_reader 
import csv, sys 


class SumJob(Job): 
    map_reader = staticmethod(chain_reader) 

    @staticmethod 
    def map(self,key_value, params): 
     print "KEY::::::",str(key_value[0]) 
     print "VAL::::::",str(key_value[1]) 
     yield key_value[0], key_value[1] 
    @staticmethod  
    def reduce(self,key_value,out, params): 
     Summe = sum(key_value[1]) 
     out.add(key_value[0],Summe) 

的問題是,我不知道如何改變(**)使得第一縮小步驟的第二輸出被第二映射函數作爲輸入。

非常感謝您的幫助! Damian

回答

0

您可以使用map/reduce階段的輸出作爲另一個的輸入(從job.wait()返回)。

job1 = SumJob().run(input=[...]) 
job2 = SumJob().run(input=[...]) 

output = SomeOtherJob.run(input=[job1.wait(), job2.wait()]).wait(show=True) 
for key, value in result_iterator(output): 
    print key, value 

我不由的代碼塊的專家對我的作品(我實現pagerank算法有很多階段,並多次反覆)。