2015-10-24 40 views
4

我想使用Python將本地文件複製到多個並行的遠程主機。我試圖用asyncio和Paramiko來做到這一點,因爲我已經在我的程序中將這些庫用於其他目的。通過SFTP將一個文件複製到多個遠程主機

我使用的是BaseEventLoop.run_in_executor()和默認ThreadPoolExecutor,它實際上是舊的threading庫的新接口,以及用於複製的Paramiko的SFTP功能。

下面是一個簡單的例子。

import sys 
import asyncio 
import paramiko 
import functools 


def copy_file_node(
     *, 
     user: str, 
     host: str, 
     identity_file: str, 
     local_path: str, 
     remote_path: str): 
    ssh_client = paramiko.client.SSHClient() 
    ssh_client.load_system_host_keys() 
    ssh_client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy()) 

    ssh_client.connect(
     username=user, 
     hostname=host, 
     key_filename=identity_file, 
     timeout=3) 

    with ssh_client: 
     with ssh_client.open_sftp() as sftp: 
      print("[{h}] Copying file...".format(h=host)) 
      sftp.put(localpath=local_path, remotepath=remote_path) 
      print("[{h}] Copy complete.".format(h=host)) 


loop = asyncio.get_event_loop() 

tasks = [] 

# NOTE: You'll have to update the values being passed in to 
#  `functools.partial(copy_file_node, ...)` 
#  to get this working on on your machine. 
for host in ['10.0.0.1', '10.0.0.2']: 
    task = loop.run_in_executor(
     None, 
     functools.partial(
      copy_file_node, 
      user='user', 
      host=host, 
      identity_file='/path/to/identity_file', 
      local_path='/path/to/local/file', 
      remote_path='/path/to/remote/file')) 
    tasks.append(task) 

try: 
    loop.run_until_complete(asyncio.gather(*tasks)) 
except Exception as e: 
    print("At least one node raised an error:", e, file=sys.stderr) 
    sys.exit(1) 

loop.close() 

我看到的問題是文件被串行復制到主機而不是並行。因此,如果單個主機的副本需要5秒鐘,兩臺主機需要10秒鐘,依此類推。

我已經嘗試了各種其他方法,包括開溝SFTP並通過exec_command()將文件管道傳輸到每個遠程主機上的dd,但這些副本總是以串行方式發生。

我可能誤解了這裏的一些基本想法。什麼讓不同的線程並行複製文件?

從我的測試中看來,劫持發生在遠程寫入上,而不是讀取本地文件。但爲什麼會這樣?因爲我們正在嘗試對獨立遠程主機進行網絡I/O操作?

+1

也許'paramiko'在內部使用一些鎖。你嘗試過'ProcessPoolExecutor'嗎? –

+0

@胡佐高 - 我做過了,但它似乎沒有幫助。也許我做錯了嗎?現在快速瀏覽一下Paramiko的源代碼,它[看起來不像](https://github.com/paramiko/paramiko/blob/0b9d772a21a44af38ecceae0fdbae645e386bd9b/paramiko/sftp_client.py#L595-L639)有任何內部鎖定順便說一下。 –

+2

我用一些虛擬代碼替換了'copy_file_node()',它工作正常,所以我認爲它是'paramiko',它阻止了併發。如果是這種情況,'ProcessPoolExecutor'應該解決問題。你可以發佈你的代碼的'ProcessPoolExecutor'版本嗎? –

回答

2

使用asyncio沒有任何問題。

爲了證明這一點,我們來試試一下你的腳本的簡化版本 - 沒有paramiko,只是純粹的Python。

import asyncio, functools, sys, time 

START_TIME = time.monotonic() 

def log(msg): 
    print('{:>7.3f} {}'.format(time.monotonic() - START_TIME, msg)) 

def dummy(thread_id): 
    log('Thread {} started'.format(thread_id)) 
    time.sleep(1) 
    log('Thread {} finished'.format(thread_id)) 

loop = asyncio.get_event_loop() 
tasks = [] 
for i in range(0, int(sys.argv[1])): 
    task = loop.run_in_executor(None, functools.partial(dummy, thread_id=i)) 
    tasks.append(task) 
loop.run_until_complete(asyncio.gather(*tasks)) 
loop.close() 

有兩個線程,這將打印:

$ python3 async.py 2 
    0.001 Thread 0 started 
    0.002 Thread 1 started  <-- 2 tasks are executed concurrently 
    1.003 Thread 0 finished 
    1.003 Thread 1 finished  <-- Total time is 1 second 

這種併發擴展到5個線程:

$ python3 async.py 5 
    0.001 Thread 0 started 
    ... 
    0.003 Thread 4 started  <-- 5 tasks are executed concurrently 
    1.002 Thread 0 finished 
    ... 
    1.005 Thread 4 finished  <-- Total time is still 1 second 

如果我們再增加一個線程,我們打的線程池限制:

$ python3 async.py 6 
    0.001 Thread 0 started 
    0.001 Thread 1 started 
    0.002 Thread 2 started 
    0.003 Thread 3 started 
    0.003 Thread 4 started  <-- 5 tasks are executed concurrently 
    1.002 Thread 0 finished 
    1.003 Thread 5 started  <-- 6th task is executed after 1 second 
    1.003 Thread 1 finished 
    1.004 Thread 2 finished 
    1.004 Thread 3 finished 
    1.004 Thread 4 finished  <-- 5 task are completed after 1 second 
    2.005 Thread 5 finished  <-- 6th task is completed after 2 seconds 

每事情按預期進行,每5個項目的總體時間增長1秒。幻數5記錄於ThreadPoolExecutor文檔:

在版本3中進行了更改。5:如果max_workersNone或者沒有給出,它將默認爲機器上的處理器數量乘以5,假設ThreadPoolExecutor通常用於重疊I/O而不是CPU工作,並且工作人員的數量應該高於ProcessPoolExecutor的工作人員數量。

第三方庫如何阻止我的ThreadPoolExecutor?

  • 庫使用某種全局鎖。這意味着庫不支持多線程。嘗試使用ProcessPoolExecutor,但要謹慎:庫可能包含其他反模式,例如使用相同的硬編碼臨時文件名。

  • 函數執行很長時間並且不釋放GIL。它可能表明C擴展代碼中存在一個錯誤,但持有GIL的最常見原因是執行一些CPU密集型計算。再次,您可以嘗試ProcessPoolExecutor,因爲它不受GIL的影響。

這些預計都不會發生像paramiko圖書館。

第三方庫如何阻止我的ProcessPoolExecutor?

通常不能。您的任務在不同的進程中執行。如果您看到ProcessPoolExecutor中的兩項任務需要花費兩倍的時間,那麼可疑資源瓶頸(例如佔用100%的網絡帶寬)。

+0

而且,正如您幫助我在問題的評論中看到的那樣,我看到的明顯的串行上傳背後的原因只是我的上傳帶寬! –

2

我不知道這是接近它的最佳方式,但它爲我工作

#start 
from multiprocessing import Process 

#omitted 

tasks = [] 
for host in hosts: 
    p = Process(
     None, 
     functools.partial(
      copy_file_node, 
      user=user, 
      host=host, 
      identity_file=identity_file, 
      local_path=local_path, 
      remote_path=remote_path)) 

    tasks.append(p) 

[t.start() for t in tasks] 
[t.join() for t in tasks] 

根據意見,增加了一個郵戳,並捕獲從多輸出和得到這個:

2015-10-24 03:06:08.749683[vagrant1] Copying file... 
2015-10-24 03:06:08.751826[basement] Copying file... 
2015-10-24 03:06:08.757040[upstairs] Copying file... 
2015-10-24 03:06:16.222416[vagrant1] Copy complete. 
2015-10-24 03:06:18.094373[upstairs] Copy complete. 
2015-10-24 03:06:22.478711[basement] Copy complete. 
+0

我會給這是嘗試。但是,不應該在功能上等同於使用具有'asyncio'的'ProcessPoolExecutor',[就像我在這裏所做的那樣](https://gist.github.com/nchammas/783632df222277605fde)? –

+0

我會這麼認爲,因爲相同的基本apis,但沒有通過兩者的全部來源,我會冒險像「他們實施的東西略有不同」 此外,因爲你使用期貨,它引發的問題什麼*確切*版本的Python和你正在使用的各種模塊。這個問題被標記爲3.x,但期貨表明您使用2.x與3.x backports或早期的3.x或其他舊模塊。可以想象的是,你正面臨着反向模塊版本/未處理的邊緣情況之間的奇怪交互。我使用Python 3.4.3和最近的一切。 – tlastowka

+0

我使用Python 3.5.0和Paramiko 1.15.3。 ''concurrent.futures'是'asyncio'上的[3.5 docs](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.BaseEventLoop.run_in_executor)引用時解釋什麼執行者可以提供給'run_in_executor()'。無論如何,讓我們來看看是否有區別。當你說它適用於你時,順便說一句,你是否將足夠大的文件複製到2個遠程主機,以注意它們是否被串行或並行上傳? –

相關問題