2016-01-13 53 views
1

我使用paramiko模塊(代碼可見here)從Python腳本運行Hadoop MapReduce和其他SSH命令。一旦MapReduce作業完成,我運行getmerge步驟將輸出轉換爲文本文件。如何在我的Python腳本中運行PSCP cmd窗口步驟

問題是,我不得不打開一個cmd窗口並運行PSCP將output.txt文件從HDFS環境下載到我的電腦。例如:

pscp [email protected]:/nfs_home/appers/cnielsen/MROutput_121815_0.txt C:\Users\cnielsen\Desktop\MR_Test 

我怎樣才能將這一PSCP踏進我的腳本,這樣我就不必打開一個cmd窗口後,我的MapReduce和getmerge任務完成後運行呢?我希望我的腳本能夠運行MR任務,getmerge任務,然後自動將MR輸出保存到我​​的計算機。

這是我的code

回答

1

我用下面的代碼解決了這個問題。訣竅是使用scp模塊並導入SCPClient。請參閱下面的scp_download(ssh)函數。

當MapReduce作業完成時,運行getmerge命令,接着執行scp_download步驟。

import paramiko 
from scp import SCPClient 
import time 

# Define connection info 
host_ip = 'xx.xx.xx.xx' 
user = 'xxxxxxxx' 
pw = 'xxxxxxxx' 
port = 22 

# Paths 
input_loc = '/nfs_home/appers/extracts/*/*.xml' 
output_loc = '/user/lcmsprod/output/cnielsen/' 
python_path = "/usr/lib/python_2.7.3/bin/python" 
hdfs_home = '/nfs_home/appers/cnielsen/' 
output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt' 

# File names 
xml_lookup_file = 'product_lookups.xml' 
mapper = 'Mapper.py' 
reducer = 'Reducer.py' 
helper_script = 'Process.py' 
product_name = 'test1' 
output_ref = 'test65' 
target_file = 'test_011416_3.txt' 

# ---------------------------------------------------- 
def createSSHClient(host_ip, port, user, pw): 
    client = paramiko.SSHClient() 
    client.load_system_host_keys() 
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 
    client.connect(host_ip, port, user, pw) 
    return client 
# ---------------------------------------------------- 
def buildMRcommand(product_name): 
    space = " " 
    mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar', 
         '-files', hdfs_home+xml_lookup_file, 
         '-file', hdfs_home+mapper, 
         '-file', hdfs_home+reducer, 
         '-mapper', "'"+python_path, mapper, product_name+"'", 
         '-file', hdfs_home+helper_script, 
         '-reducer', "'"+python_path, reducer+"'", 
         '-input', input_loc, 
         '-output', output_loc+output_ref] 

    MR_command = space.join(mr_command_list) 
    print MR_command 
    return MR_command 
# ---------------------------------------------------- 
def unbuffered_lines(f): 
    line_buf = "" 
    while not f.channel.exit_status_ready(): 
     line_buf += f.read(1) 
     if line_buf.endswith('\n'): 
      yield line_buf 
      line_buf = "" 
# ---------------------------------------------------- 
def stream_output(stdin, stdout, stderr): 
    writer = open(output_log, 'w') 
    # Using line_buffer function 
    for l in unbuffered_lines(stderr): 
     e = '[stderr] ' + l 
     print '[stderr] ' + l.strip('\n') 
     writer.write(e) 

    # gives full listing.. 
    for line in stdout: 
     r = '[stdout]' + line 
     print '[stdout]' + line.strip('\n') 
     writer.write(r) 
    writer.close() 
# ---------------------------------------------------- 
def run_MapReduce(ssh): 
    stdin, stdout, stderr = ssh.exec_command(buildMRcommand(product_name)) 
    stream_output(stdin, stdout, stderr) 
    return 1 
# ---------------------------------------------------- 
def run_list_dir(ssh): 
    list_dir = "ls "+hdfs_home+" -l" 
    stdin, stdout, stderr = ssh.exec_command(list_dir) 
    stream_output(stdin, stdout, stderr) 
# ---------------------------------------------------- 
def run_getmerge(ssh): 
    getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+target_file 
    print getmerge 
    stdin, stdout, stderr = ssh.exec_command(getmerge) 
    for line in stdout: 
     print '[stdout]' + line.strip('\n') 
    time.sleep(1.5) 
    return 1 
# ---------------------------------------------------- 
def scp_download(ssh): 
    scp = SCPClient(ssh.get_transport()) 
    print "Fetching SCP data.." 
    scp.get(hdfs_home+target_file, local_dir) 
    print "File download complete." 
# ---------------------------------------------------- 
def main(): 
    # Get the ssh connection 
    global ssh 
    ssh = createSSHClient(host_ip, port, user, pw) 
    print "Executing command..." 

    # Command list 
    ##run_list_dir(ssh) 
    ##run_getmerge(ssh) 
    ##scp_download(ssh) 

    # Run MapReduce 
    MR_status = 0 
    MR_status = run_MapReduce(ssh) 

    if MR_status == 1: 
     gs = 0 
     gs = run_getmerge(ssh) 
     if gs == 1: 
      scp_download(ssh) 

    # Close ssh connection 
    ssh.close() 

if __name__ == '__main__': 
    main() 
+0

注:似乎沒有要由getmerge命令提供了一個退出代碼,所以我用一個等待'time.sleep(1.5)'的腳本,以便scp_download步驟不啓動前getmerge作業已完成。 –

相關問題