我用下面的代碼解決了這個問題。訣竅是使用scp模塊並導入SCPClient。請參閱下面的scp_download(ssh)函數。
當MapReduce作業完成時,運行getmerge命令,接着執行scp_download步驟。
import paramiko
from scp import SCPClient
import time
# Define connection info
host_ip = 'xx.xx.xx.xx'
user = 'xxxxxxxx'
pw = 'xxxxxxxx'
port = 22
# Paths
input_loc = '/nfs_home/appers/extracts/*/*.xml'
output_loc = '/user/lcmsprod/output/cnielsen/'
python_path = "/usr/lib/python_2.7.3/bin/python"
hdfs_home = '/nfs_home/appers/cnielsen/'
output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt'
# File names
xml_lookup_file = 'product_lookups.xml'
mapper = 'Mapper.py'
reducer = 'Reducer.py'
helper_script = 'Process.py'
product_name = 'test1'
output_ref = 'test65'
target_file = 'test_011416_3.txt'
# ----------------------------------------------------
def createSSHClient(host_ip, port, user, pw):
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host_ip, port, user, pw)
return client
# ----------------------------------------------------
def buildMRcommand(product_name):
space = " "
mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar',
'-files', hdfs_home+xml_lookup_file,
'-file', hdfs_home+mapper,
'-file', hdfs_home+reducer,
'-mapper', "'"+python_path, mapper, product_name+"'",
'-file', hdfs_home+helper_script,
'-reducer', "'"+python_path, reducer+"'",
'-input', input_loc,
'-output', output_loc+output_ref]
MR_command = space.join(mr_command_list)
print MR_command
return MR_command
# ----------------------------------------------------
def unbuffered_lines(f):
line_buf = ""
while not f.channel.exit_status_ready():
line_buf += f.read(1)
if line_buf.endswith('\n'):
yield line_buf
line_buf = ""
# ----------------------------------------------------
def stream_output(stdin, stdout, stderr):
writer = open(output_log, 'w')
# Using line_buffer function
for l in unbuffered_lines(stderr):
e = '[stderr] ' + l
print '[stderr] ' + l.strip('\n')
writer.write(e)
# gives full listing..
for line in stdout:
r = '[stdout]' + line
print '[stdout]' + line.strip('\n')
writer.write(r)
writer.close()
# ----------------------------------------------------
def run_MapReduce(ssh):
stdin, stdout, stderr = ssh.exec_command(buildMRcommand(product_name))
stream_output(stdin, stdout, stderr)
return 1
# ----------------------------------------------------
def run_list_dir(ssh):
list_dir = "ls "+hdfs_home+" -l"
stdin, stdout, stderr = ssh.exec_command(list_dir)
stream_output(stdin, stdout, stderr)
# ----------------------------------------------------
def run_getmerge(ssh):
getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+target_file
print getmerge
stdin, stdout, stderr = ssh.exec_command(getmerge)
for line in stdout:
print '[stdout]' + line.strip('\n')
time.sleep(1.5)
return 1
# ----------------------------------------------------
def scp_download(ssh):
scp = SCPClient(ssh.get_transport())
print "Fetching SCP data.."
scp.get(hdfs_home+target_file, local_dir)
print "File download complete."
# ----------------------------------------------------
def main():
# Get the ssh connection
global ssh
ssh = createSSHClient(host_ip, port, user, pw)
print "Executing command..."
# Command list
##run_list_dir(ssh)
##run_getmerge(ssh)
##scp_download(ssh)
# Run MapReduce
MR_status = 0
MR_status = run_MapReduce(ssh)
if MR_status == 1:
gs = 0
gs = run_getmerge(ssh)
if gs == 1:
scp_download(ssh)
# Close ssh connection
ssh.close()
if __name__ == '__main__':
main()
注:似乎沒有要由getmerge命令提供了一個退出代碼,所以我用一個等待'time.sleep(1.5)'的腳本,以便scp_download步驟不啓動前getmerge作業已完成。 –