我想使用Python將本地文件複製到多個並行的遠程主機。我試圖用asyncio
和Paramiko來做到這一點,因爲我已經在我的程序中將這些庫用於其他目的。通過SFTP將一個文件複製到多個遠程主機
我使用的是BaseEventLoop.run_in_executor()
和默認ThreadPoolExecutor
,它實際上是舊的threading
庫的新接口,以及用於複製的Paramiko的SFTP功能。
下面是一個簡單的例子。
import sys
import asyncio
import paramiko
import functools
def copy_file_node(
*,
user: str,
host: str,
identity_file: str,
local_path: str,
remote_path: str):
ssh_client = paramiko.client.SSHClient()
ssh_client.load_system_host_keys()
ssh_client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
ssh_client.connect(
username=user,
hostname=host,
key_filename=identity_file,
timeout=3)
with ssh_client:
with ssh_client.open_sftp() as sftp:
print("[{h}] Copying file...".format(h=host))
sftp.put(localpath=local_path, remotepath=remote_path)
print("[{h}] Copy complete.".format(h=host))
loop = asyncio.get_event_loop()
tasks = []
# NOTE: You'll have to update the values being passed in to
# `functools.partial(copy_file_node, ...)`
# to get this working on on your machine.
for host in ['10.0.0.1', '10.0.0.2']:
task = loop.run_in_executor(
None,
functools.partial(
copy_file_node,
user='user',
host=host,
identity_file='/path/to/identity_file',
local_path='/path/to/local/file',
remote_path='/path/to/remote/file'))
tasks.append(task)
try:
loop.run_until_complete(asyncio.gather(*tasks))
except Exception as e:
print("At least one node raised an error:", e, file=sys.stderr)
sys.exit(1)
loop.close()
我看到的問題是文件被串行復制到主機而不是並行。因此,如果單個主機的副本需要5秒鐘,兩臺主機需要10秒鐘,依此類推。
我已經嘗試了各種其他方法,包括開溝SFTP並通過exec_command()
將文件管道傳輸到每個遠程主機上的dd
,但這些副本總是以串行方式發生。
我可能誤解了這裏的一些基本想法。什麼讓不同的線程並行複製文件?
從我的測試中看來,劫持發生在遠程寫入上,而不是讀取本地文件。但爲什麼會這樣?因爲我們正在嘗試對獨立遠程主機進行網絡I/O操作?
也許'paramiko'在內部使用一些鎖。你嘗試過'ProcessPoolExecutor'嗎? –
@胡佐高 - 我做過了,但它似乎沒有幫助。也許我做錯了嗎?現在快速瀏覽一下Paramiko的源代碼,它[看起來不像](https://github.com/paramiko/paramiko/blob/0b9d772a21a44af38ecceae0fdbae645e386bd9b/paramiko/sftp_client.py#L595-L639)有任何內部鎖定順便說一下。 –
我用一些虛擬代碼替換了'copy_file_node()',它工作正常,所以我認爲它是'paramiko',它阻止了併發。如果是這種情況,'ProcessPoolExecutor'應該解決問題。你可以發佈你的代碼的'ProcessPoolExecutor'版本嗎? –