我曾與下面的腳本解決了這個:
import paramiko
# Define connection info
host_ip = 'xx.xx.xx.xx'
user = 'xxxxxxxx'
pw = 'xxxxxxxx'
# Paths
input_loc = '/nfs_home/appers/extracts/*/*.xml'
output_loc = '/user/lcmsprod/output/cnielsen/'
python_path = "/usr/lib/python_2.7.3/bin/python"
hdfs_home = '/nfs_home/appers/cnielsen/'
output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt'
# File names
xml_lookup_file = 'product_lookups.xml'
mapper = 'Mapper.py'
reducer = 'Reducer.py'
helper_script = 'Process.py'
product_name = 'test1'
output_ref = 'test65'
# ----------------------------------------------------
def buildMRcommand(product_name):
space = " "
mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar',
'-files', hdfs_home+xml_lookup_file,
'-file', hdfs_home+mapper,
'-file', hdfs_home+reducer,
'-mapper', "'"+python_path, mapper, product_name+"'",
'-file', hdfs_home+helper_script,
'-reducer', "'"+python_path, reducer+"'",
'-input', input_loc,
'-output', output_loc+output_ref]
MR_command = space.join(mr_command_list)
print MR_command
return MR_command
# ----------------------------------------------------
def unbuffered_lines(f):
line_buf = ""
while not f.channel.exit_status_ready():
line_buf += f.read(1)
if line_buf.endswith('\n'):
yield line_buf
line_buf = ''
# ----------------------------------------------------
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host_ip, username=user, password=pw)
# Build Commands
list_dir = "ls "+hdfs_home+" -l"
getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+"test_011216_0.txt"
# Run Command
stdin, stdout, stderr = client.exec_command(list_dir)
##stdin, stdout, stderr = client.exec_command(buildMRcommand(product_name))
##stdin, stdout, stderr = client.exec_command(getmerge)
print "Executing command..."
writer = open(output_log, 'w')
for l in unbuffered_lines(stderr):
e = '[stderr] ' + l
print '[stderr] ' + l.strip('\n')
writer.write(e)
for line in stdout:
r = '[stdout]' + line
print '[stdout]' + line.strip('\n')
writer.write(r)
client.close()
writer.close()