2011-12-01 21 views
2

道歉,如果這已被覆蓋之前 - 我做了我的搜索,但可能可能不知道使用正確的術語。處理單個陣列的多個「代理」

此過程由PHP處理。

這裏的情況:

我有一個大型的文件名數組。我已經打開這些文件並將其內容輸入到數據庫中。一次處理這些文件需要24小時以上,並且這些文件每天都會更新。

將單個大陣列分解爲四個較小的陣列並運行併發進程會在24小時窗口過去之前完成作業,但有時一個或兩個進程將在其他時間之前完成幾個小時,因爲文件大小每天都有所不同。

就像誰炒股零售貨架(還有誰工作過那個噩夢?)間距,以幫助有什麼完成自己的任務後留下的人,我想有一個腳本在地方,這些「代理人」照着做。

這裏是什麼,我想通了一些基礎知識 - 這可能是錯的,我不是太自豪地抗議,如果我:-)

$files = array('file1','file2','file3','file4','file5'); 
//etc... on to over 4k elements 

while($file = array_pop($files)){ 

    //Something in here... I have no idea what. 

} 

想法?類似於四個函數調用或四個循環內的整個'while'已經跨越了我的想法,但我很確定它將等待執行後續調用,直到前一個(s)完成。

任何幫助表示讚賞。我嚴重卡在這一個!

謝謝!

+0

您確定要使用PHP來做到這一點嗎?使用支持簡單多線程的選擇語言(如C#中的ThreadPool) – CodeZombie

+0

Python .....也許這裏的答案 –

+0

這是我最熟悉的語言,抓取源文件的代碼是用PHP編寫的,但它是從一個bash控制檯而不是一個網站運行的 - 所以一些shell腳本完全處於可能的範圍之內。 – user1075581

回答

2

數據庫支持的消息隊列似乎是一個明顯的解決方案,但我認爲這在這種情況下是過度的。我只是將要處理的文件放入一個專用隊列目錄中,然後使用DirectoryIterator類對其進行掃描。事情是這樣的:

while (true) { 
    look in the queue directory for a file 
    if you don't fine one, exit the script, all processing is done 
    if you find one, rename it or move it to a work directory 
    if the rename/move command succeeded, process the file 
    if the rename/move command failed, one of the other threads got it first 
} 

編輯:

關於開展工作人員,你可以使用一個簡單的shell腳本產卵的PHP程序在後臺:

NUM_WORKERS=5 
for WORKER in $(seq 1 ${NUM_WORKERS}) 
do 
    echo "starting worker ${WORKER}" 
    php -f /path/to/my/process.php & 
done 

然後,創建一個cron運行這個啓動器,例如,在午夜:

0 0 * * * /path/to/launcher.sh 
+0

我喜歡這個,因爲它建立在我已有的東西上(所有目標文件在處理到數據庫中之前都被下載到一個目錄中),但我能想到的唯一方法就是使其具有一個shell或執行後臺進程的perl腳本(在命令的末尾添加一個&),並能夠確定進程何時結束,從而將新文件分配給可用的「代理程序」。不過,Shell腳本和Perl現在已經超出了我的視野。它在我的「待辦事項」列表上,但:-) – user1075581

+0

您不會將文件分配給代理。代理程序會查找更多的文件進行處理,如果沒有文件則退出。主腳本只會啓動四個(或十個)代理。 –

+0

嗯...所以這更像是一個「配電設施」? (因爲沒有更好的anaogy)。這運行在一個目標目錄上,然後它掃描另外四個目錄以查找文件。如果一個目錄是空的,它將一個文件移入它。每個文件夾都由一個單獨的進程監視,該進程將該目錄中的任何文件的內容輸入到數據庫中?我甚至關閉?我覺得今天早上我還沒有喝足夠的咖啡...... – user1075581

2

你想要什麼叫「消息隊列」。類似於beanstalkd

您將基本上創建一個包含您的個人文件名的消息列表。然後您將創建一組處理器來處理它們。每個處理器將處理一個文件,然後返回隊列以查看是否有更多消息/文件正在等待處理。

編輯: 下面是一個類比來幫助解釋消息隊列。你的第一個想法就像是一個人工經理拿着一堆文件,把他們分成四堆,然後把他的四個員工的每一個都交給一堆人處理。消息隊列更像這樣:管理器將所有文件放在一個表上,並告訴每個員工從表中取出單個文件並處理它。他告訴他們何時完成第一個文件,以便在文件沒有更多文件的情況下繼續進行文件處理。當所有文件完成後,員工可以回家。

一名員工可能以非常大的文件結束,只處理一些,而另一名員工可能會得到較小的文件並處理很多問題。每個員工處理多少並不重要,他們都會繼續工作,直到表格爲空。

+0

在確定它是否需要我之前,我需要對此進行更多的瞭解。 @ alex-howansky提到了一個處理目錄中文件的解決方案,這與我已有的東西非常一致,但對於並行執行進程而不是串行進程的代碼,我仍然有點卡住(因此希望進一步研究豆杆)。 – user1075581

+0

「消息隊列」是一個很好的搜索術語。這是一個普遍的想法,不限於beanstalkd,這是一個特定的實現。對於Alex的隊列實現,您可以使用一個php或bash腳本將所有文件移動到隊列目錄中。然後它會啓動一系列獨立的PHP腳本,它們遵循他提供的僞代碼。這些腳本會同時處理隊列中的文件。這仍然是一個「消息隊列」,只是一個更簡單的代碼自己的版本。 –

1

我會有一個插座服務器主腳本,將文件路徑提交給x個從腳本,直到沒有文件需要處理。這樣,所有的從屬腳本將繼續運行,並且可以根據請求動態地分配文件路徑。

事情是這樣的:

master.php

<?php 

    // load the array of files to process (however you do this) 
    $fileList = file('filelist.txt'); 

    // Create a listening socket on localhost 
    $serverSocket = stream_socket_server('tcp://127.0.0.1:7878'); 
    $sockets = array($serverSocket); 
    $clients = array(); 

    // Loop while there are still files to process 
    while (count($fileList)) { 

    // Run a select() call on the existing sockets' read buffers 
    // Skip to next iteration if no sockets are waiting for handling 
    if (stream_select($read = $sockets, $write = NULL, $except = NULL, 1) < 1) { 
     continue; 
    } 

    // Loop sockets with data to read 
    foreach ($read as $socket) { 

     if ($socket == $serverSocket) { 
     // Accept new clients 
     $sockets[] = $clients[] = stream_socket_accept($serverSocket); 
     } else if (trim(fgets($socket)) == 'next') { 
     // Hand out a new file path to the client 
     fwrite($socket, array_shift($fileList)."\n"); 
     if (!count($fileList)) { 
      break 2; 
     } 
     } 

    } 

    } 

    // When we're done, disconnect the clients 
    foreach ($clients as $socket) { 
    @fclose($socket); 
    } 

    // ...and close the listen socket 
    @fclose($serverSocket); 

slave.php

<?php 

    $socket = fsockopen('127.0.0.1', 7878); 

    while (!feof($socket)) { 

    // Get a new file path from the master 
    fwrite($socket,"next\n"); 
    $path = trim(fgets($socket)); 

    if (is_file($path)) { 
     // Process the file at $path here 
    } 

    } 

,那麼你只需要啓動master.php,那麼當它正在運行,但可以啓動多個實例slave.php如你所願,他們將一直運行,直到沒有更多的文件要處理。

顯然,這沒有錯誤處理,但它應該提供一個基本的框架,讓你開始。這依賴於阻止函數調用(stream_select()fgets())以避免競爭條件 - 這可能或可能不足以滿足您的需要。

+0

這比我自己想象的更聰明!我喜歡這個概念!然而,在我能夠嘗試之前還有一段時間 - 準備好迎接一段時間。不錯,但! :-D – user1075581