0
我想修改高級教程的innerjoin-例子,使它可以使用mapreduce進行稀疏矩陣乘法(由Ullman描述)。因此,我需要第二個映射 - 減少步驟在結果矩陣中求和相等位置的值。迪斯科中的鏈接作業(MapReduce)
不幸的是,我沒有設法得到類CsvInnerJoin的第一個reduce函數的輸出到SumJob的map函數中。
import sys
sys.path.append("/home/damian/disco/lib/")
from disco.core import Job, result_iterator
from disco.worker.classic.func import chain_reader
import csv, sys
if __name__ == '__main__':
input_filename = "input.csv"
output_filename = "output.csv"
if len(sys.argv) > 1:
input_filename = sys.argv[1]
if len(sys.argv) > 2:
output_filename = sys.argv[2]
from CsvInnerJoiner import CsvInnerJoiner
from SumJob import SumJob
job = CsvInnerJoiner().run(input=[input_filename])
job = SumJob().run() (******************)
with open(output_filename, 'w') as fp:
writer = csv.writer(fp)
for url_key, descriptors in result_iterator(job.wait(show=True)):
writer.writerow([url_key] + descriptors)
CsvInnerJoiner.py是這樣的文件:
import sys
sys.path.append("/home/damian/disco/lib/")
from disco.core import Job, result_iterator
from disco.worker.classic.func import chain_reader
import csv, sys
class CsvInnerJoiner(Job):
partitions = 2
sort = True
def map(self, row, params):
yield row[0], row[1:]
@staticmethod
def map_reader(fd, size, url, params):
reader = csv.reader(fd, delimiter=',')
for row in reader:
yield row
#@staticmethod
def reduce(self, rows_iter, out, params):
from disco.util import kvgroup
from itertools import chain
#for url_key, descriptors in kvgroup(sorted(rows_iter)):
for url_key, descriptors in kvgroup(rows_iter):
merged_descriptors = list(chain.from_iterable(descriptors))
print url_key,"_______",merged_descriptors
if len(merged_descriptors) > 3:
Alist = merged_descriptors[:merged_descriptors.index("B")]
Blist = merged_descriptors[merged_descriptors.index("B"):]
Alistlength = len(Alist)/3
Blistlength = len(Blist)/3
for i in range(Alistlength):
for j in range(Blistlength):
container = int(Alist[3*i+2])*int(Blist[3*j+2])
yield [Alist[3*i+1],Blist[3*j+1]],container
#out.add(Alist[3*i+1],[Blist[3*j+1],container])
SumJob.py是這樣的:
import sys
sys.path.append("/home/damian/disco/lib/")
from disco.core import Job, result_iterator
from disco.worker.classic.func import chain_reader
import csv, sys
class SumJob(Job):
map_reader = staticmethod(chain_reader)
@staticmethod
def map(self,key_value, params):
print "KEY::::::",str(key_value[0])
print "VAL::::::",str(key_value[1])
yield key_value[0], key_value[1]
@staticmethod
def reduce(self,key_value,out, params):
Summe = sum(key_value[1])
out.add(key_value[0],Summe)
的問題是,我不知道如何改變(**)使得第一縮小步驟的第二輸出被第二映射函數作爲輸入。
非常感謝您的幫助! Damian