2012-11-19 62 views
0

我的perl腳本的一部分出現了一些問題,現在困擾了我好幾天。總結的目的是以塊讀入一個大文件,並對輸入流進行一些操作(與我的問題無關)。當我第一次實現了它,我只是環繞在該文件,然後做了一些東西就可以了,就像這樣:加入線程的問題

while (read FILE, $buffer, $chunksize){ 
    callSomeOperation($buffer); 
    # Do some other stuff 
} 

不幸的是,文件是非常大的,操作莫名其妙的複雜了許多函數調用,因此這導致穩步增加內存perl無法分配內存,腳本失敗。所以我做了一些調查並嘗試了幾件事情來儘量減少內存開銷(循環外的定義變量,設置爲undef等),這導致分配的內存大小增加較慢,但最終仍然失敗。 (如果我找到了正確的答案,perl回饋給操作系統的內存是......在實踐中不會發生這種情況)。

因此,我決定將函數調用及其所有定義嵌套在一個子線程中,等待它完成,加入,然後再與下一個塊調用線程:

while (read FILE, $buffer, $chunksize){ 
my $thr = threads->create(\&thrWorker,$buffer); 
$thr->join(); 
} 

sub thrWorker{ 
# Do the stuff here! 
} 

這可能是一個解決方案,如果線程會加入!但實際上並沒有。如果我運行$ thr-> detach();一切工作正常,除了我同時得到線程的胡言亂語,這不是一個好主意,在這種情況下,我需要連續運行它們。

所以我就這個連接問題進行了一些調查,並得到了一些聲音,說它可能是perl 5.16.1的一個問題,所以我更新到5.16.2,但它仍然沒有加入。在郵件列表中的任何地方,我不記得我從某個設法讓線程加入到CPAN模塊Thread :: Queue中讀取的東西,但是這對我也沒有任何作用。

所以我放棄了線程,並試圖分叉這個東西。但是對於叉子來說,「叉子」的總數似乎有限?不管怎麼樣,直到第13到第20次迭代都很順利,然後放棄了它不能再分叉的消息。

my $pid = fork(); 
if($pid == 0){ 
     thrWorker($buffer); 
    exit 0; 
} 

我也試過用CPAN模塊Para​​llel :: ForkManager和Proc :: Fork,但是沒有幫助。

所以現在我有點卡住了,不能幫助自己。也許別人可以!任何建議非常感謝!

  1. 我怎樣才能讓這件事情與線程或子進程一起工作?
  2. 或者至少我該如何強制perl釋放內存,以便我可以在同一個進程中執行此操作?

我的系統上一些額外的信息: 操作系統:Windows 7 64位/ Ubuntu服務器12.10 的Perl在Windows上:草莓的Perl 5.16.2 64位

我的一個#2第一職位。希望我所做的是正確的:-)

+0

您的實際問題是內存消耗猖獗,而且無論多線程還是多處理,都必須修復此問題。如果' - > join()'不返回它,因爲'thrWorker' *永遠不會結束*。如果'fork()'在一兩個進程之後失敗,那麼你的系統*資源不足*,可能是內存耗盡。爲什麼'callSomeOperation'如此低效?我們不知道。 – pilcrow

+0

你是對的主要問題_IS_內存消耗,但自callSomeOperation();將調用大約1500行代碼,這可能是不可避免的。我真正想知道的是,'$ thr-> detach();'實際_runs_操作和_finishes!_所以_must_是' - > join();'操作的問題,因爲分離或不分離不應該對手術的執行有沒有影響,還是我誤解了? – achschneid

+0

(1)聽起來像callSomeOp()中的泄漏。 (2)您如何知道在分離線程中運行的操作完成? – pilcrow

回答

1

我推薦閱讀:this

我通常使用線程隊列::管理線程的輸入。 示例代碼:

my @threads = {}; 
my $Q = new Thread::Queue; 

# Start the threads 
for (my $i=0; $i<NUM_THREADS; $i++) { 
    $threads[$i] = 
     threads->new(\&insert_1_thread, $Q); 
} 

# Get the list of sites and put in the work queue 
foreach $row (@{$ref}) { 
    $Q->enqueue($row->[0]); 
    #sleep 1 while $Q->pending > 100; 
} # foreach $row 

# Signal we are done 
for (my $i=0; $i<NUM_THREADS; $i++) { 
    $Q->enqueue(undef); } 

$count = 0; 
# Now wait for the threads to complete before going on to the next step 
for (my $i=0; $i<NUM_THREADS; $i++) { 
    $count += $threads[$i]->join(); } 

而且工作線程:

sub insert_1_thread { 
my ($Q) = @_; 
my $tid = threads->tid; 
my $count = 0; 
Log("Started thread #$tid"); 

while(my $row = $Q->dequeue) { 
    PROCESS ME... 
    $count++; 
} # while 

Log("Thread#$tid, done"); 
return $count; 

} # sub insert_1_thread 
+0

感謝您的建議,我已經嘗試過與上述相同的方式,但看起來Threads :: Queue只允許您排入單個參數。但我的線程實際上需要更多,所以我試圖把它放入一個數組或哈希,實際上導致出隊循環的params數組。在一個數組中,序列化數組並將其作爲字符串進入(我真的不想這麼做骯髒;-)) – achschneid

+0

我們已經有一段時間了,我實際上使用Thread :: Queue將一個對象與我的所有對象隊列中的參數。現在運行三年,沒有任何問題,所以這是定義在Queable線程上的解決方案。 – achschneid

0

我不知道這是否是你的解決方案,但你可以創建大塊對象的數組,並處理它們平行如下:

#!/usr/bin/perl 

package Object; { 
    use threads; 
    use threads::shared;   

    sub new(){ 
     my $class=shift; 
     share(my %this); 
     return(bless(\%this,$class)); 
    } 

    sub set { 
     my ($this,$value)[email protected]_;  
     lock($this); 
#  $this->{"data"}=shared_clone($value); 
     $this->{"data"}=$value; 
    } 

    sub get { 
     my $this=shift; 
     return $this->{"data"}; 
    } 
} 


package main; { 

use strict; 
use warnings; 

use threads; 
use threads::shared; 

    my @objs; 
    foreach (0..2){ 
     my $o = Object->new(); 
     $o->set($_); 
     push @objs, $o; 
    } 

    threads->create(\&run,(\@objs))->join(); 

    sub run { 
     my ($obj) = @_;  
     $$obj[$_]->get() foreach(0..2);   
    } 
}