2015-12-08 79 views
1

我通過PuTTy登錄到SSH運行Hadoop MapReduce作業,要求輸入主機名/ IP地址,登錄名和密碼到PuTTY以獲取SSH命令行窗口。一旦進入SSH控制檯窗口,我會提供相應的MR命令,例如:通過Python無PuTTy/SSH啓動Hadoop MapReduce作業

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4 .0.1.jar -file /nfs_home/appers/user1/mapper.py -file /nfs_home/appers/user1/reducer.py -mapper'/usr/lib/python_2.7.3/bin/python mapper.py'-reducer' /usr/lib/python_2.7.3/bin/python reducer.py'-input/ccexp/data/test_xml/0901282-510179094535002-oozie-oozi -W/extractOut// .xml -output/user/ccexptest/output/user1/MRoutput

我想要做的就是使用Python來改變這個笨重的過程,以便我可以從Python腳本中啓動MapReduce作業,避免必須通過PuTTy登錄到SSH。

可以這樣做,如果有的話,有人可以告訴我怎麼做?

回答

1

我曾與下面的腳本解決了這個:

import paramiko 

# Define connection info 
host_ip = 'xx.xx.xx.xx' 
user = 'xxxxxxxx' 
pw = 'xxxxxxxx' 

# Paths 
input_loc = '/nfs_home/appers/extracts/*/*.xml' 
output_loc = '/user/lcmsprod/output/cnielsen/' 
python_path = "/usr/lib/python_2.7.3/bin/python" 
hdfs_home = '/nfs_home/appers/cnielsen/' 
output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt' 

# File names 
xml_lookup_file = 'product_lookups.xml' 
mapper = 'Mapper.py' 
reducer = 'Reducer.py' 
helper_script = 'Process.py' 
product_name = 'test1' 
output_ref = 'test65' 

# ---------------------------------------------------- 

def buildMRcommand(product_name): 
    space = " " 
    mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar', 
         '-files', hdfs_home+xml_lookup_file, 
         '-file', hdfs_home+mapper, 
         '-file', hdfs_home+reducer, 
         '-mapper', "'"+python_path, mapper, product_name+"'", 
         '-file', hdfs_home+helper_script, 
         '-reducer', "'"+python_path, reducer+"'", 
         '-input', input_loc, 
         '-output', output_loc+output_ref] 

    MR_command = space.join(mr_command_list) 
    print MR_command 
    return MR_command 

# ---------------------------------------------------- 

def unbuffered_lines(f): 
    line_buf = "" 
    while not f.channel.exit_status_ready(): 
     line_buf += f.read(1) 
     if line_buf.endswith('\n'): 
      yield line_buf 
      line_buf = '' 

# ---------------------------------------------------- 

client = paramiko.SSHClient() 
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 
client.connect(host_ip, username=user, password=pw) 

# Build Commands 
list_dir = "ls "+hdfs_home+" -l" 
getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+"test_011216_0.txt" 

# Run Command 
stdin, stdout, stderr = client.exec_command(list_dir) 
##stdin, stdout, stderr = client.exec_command(buildMRcommand(product_name)) 
##stdin, stdout, stderr = client.exec_command(getmerge) 

print "Executing command..." 
writer = open(output_log, 'w') 

for l in unbuffered_lines(stderr): 
    e = '[stderr] ' + l 
    print '[stderr] ' + l.strip('\n') 
    writer.write(e) 

for line in stdout: 
    r = '[stdout]' + line 
    print '[stdout]' + line.strip('\n') 
    writer.write(r) 

client.close() 
writer.close()