2010-11-05 43 views
4

我需要一個例子來說明如何使用ocaml-threads來編寫一個並行的iter函數。我的第一個想法是有一個功能類同此:Howto編程基於線程的並行列表迭代?

let procs = 4 ;; 

let rec _part part i lst = match lst with 
     [] ->() 
    | hd::tl -> 
     let idx = i mod procs in 
     (* Printf.printf "part idx=%i\n" idx; *) 
     let accu = part.(idx) in 
      part.(idx) <- (hd::accu); 
      _part part (i+1) tl ;; 

然後並行ITER看起來是這樣的(這裏作爲基於過程的變體):

let iter f lst = let part = Array.create procs [] in 
    _part part 0 lst; 
    let rec _do i = 
     (* Printf.printf "do idx=%i\n" i; *) 
     match Unix.fork() with 
      0 -> (* Code of child *) 
      if i < procs then 
       begin 
       (* Printf.printf "child %i\n" i; *) 
       List.iter f part.(i) 
       end 
     | pid -> (* Code of father *) 
      (* Printf.printf "father %i\n" i; *) 
      if i >= procs then ignore (Unix.waitpid [] pid) 
      else _do (i+1) 
    in 
     _do 0 ;; 

因爲線程模塊的使用是有點不同,我將如何使用ocaml的線程模塊進行編碼?

還有一個問題,_part()函數必須掃描整個列表以將它們分成n個部分,然後每個部分都將通過每個自己的進程(此處)進行管道傳輸。那裏仍然存在一個解決方案,而不是先分開一個列表?

+0

你已經知道,你不會從並行得到任何加速,如果你使用的模塊'Thread'與INRIA的OCaml的?這是一個有趣的編程練習,只是不實際。 – 2010-11-05 13:29:21

+0

是的,我知道一般垃圾收集器中缺少的並行性。但我仍然希望有更好的ocaml實現。但是,如果我使用線程在硬盤上寫東西,我已經看到了在linux'htop中工作的線程。你能解釋哪些限制ocaml詳細(或指向我這樣的信息)?上面的問題是瞭解Threads-module的正確用法。 – 2010-11-05 13:53:45

+0

我給了自己更多的空間,將解釋放在下面的答案中。 – 2010-11-05 17:10:19

回答

3

如果您有一個處理列表的功能,並且您想要在多個列表中獨立運行它,則可以使用該功能和每個列表調用Thread.create。如果您存儲列出陣列part則:

let threads = Array.map (Thread.create (List.iter f)) part in 
Array.iter Thread.join threads 

INRIA OCaml的線程不是實際線程:只有一個線程執行在任何給定時間,這意味着,如果你有四個處理器和四個線程,所有四個線程將使用同一處理器和其他三個處理器將保持未使用狀態。

線程的用處在於它們仍允許異步編程:某些Thread模塊原語可等待外部資源變爲可用。這可以減少軟件花費在不可用資源上的時間,因爲您可以讓另一個線程同時做其他事情。您也可以使用它來同時啓動多個外部異步進程(如通過HTTP查詢多個Web服務器)。如果你沒有很多與資源相關的阻塞,這不會對你有所幫助。

至於你的列表分裂問題:訪問列表中的元素,你必須遍歷所有以前的元素。雖然這個遍歷在理論上可以分解成幾個線程或進程,但是通信開銷可能會使得它比在一個進程中提前分割事物慢得多。或者使用數組。

2

回答評論中的問題。答案本身並不完全適合評論。

OCaml運行時存在鎖定。當OCaml線程即將輸入C函數時,鎖定被釋放。

  • 可能阻塞;
  • 可能需要很長時間。

所以,你只能使用一個OCaml線程使用堆,但有時你可以使用非堆使用的C函數與它並行工作。

例如見文件ocaml-3.12.0/otherlibs/unix/write.c

memmove (iobuf, &Byte(buf, ofs), numbytes); // if we kept the data in the heap 
              // the GC might move it from 
              // under our feet. 
enter_blocking_section();     // release lock. 
              // Another OCaml thread may 
              // start in parallel of this one now. 
ret = write(Int_val(fd), iobuf, numbytes); 
leave_blocking_section();     // take lock again to continue 
              // with Ocaml code.