2011-07-10 106 views
0

如果代碼編譯爲控制檯程序或運行爲 fsi --use:Program.fs - exec - quiet。任何方式來等待所有線程結束?編譯控制檯命令行程序不會等待所有線程完成

當多個MailboxProcessers存在時,此問題可以描述爲「程序退出問題」。

輸出實例

(注意最後一行被截斷,並且永遠不會執行的最後一個輸出功能(printfn "[Main] after crawl")。)

 
[Main] before crawl 
[Crawl] before return result 
http://news.google.com crawled by agent 1. 
[supervisor] reached limit 
Agent 5 is done. 
http://www.gstatic.com/news/img/favicon.ico crawled by agent 1. 
[supervisor] reached limit 
Agent 1 is done. 
http://www.google.com/imghp?hl=en&tab=ni crawled by agent 4. 
[supervisor] reached limit 
Agent 4 is done. 
http://www.google.com/webhp?hl=en&tab=nw crawled by agent 2. 
[supervisor] reached limit 
Agent 2 is done. 
http://news.google. 

代碼

編輯:增加了幾個System.Threading.Thread.CurrentThread.IsBackground <- false

open System 
open System.Collections.Concurrent 
open System.Collections.Generic 
open System.IO 
open System.Net 
open System.Text.RegularExpressions 

module Helpers = 

    type Message = 
     | Done 
     | Mailbox of MailboxProcessor<Message> 
     | Stop 
     | Url of string option 
     | Start of AsyncReplyChannel<unit> 

    // Gates the number of crawling agents. 
    [<Literal>] 
    let Gate = 5 

    // Extracts links from HTML. 
    let extractLinks html = 
     let pattern1 = "(?i)href\\s*=\\s*(\"|\')/?((?!#.*|/\B|" + 
         "mailto:|location\.|javascript:)[^\"\']+)(\"|\')" 
     let pattern2 = "(?i)^https?" 

     let links = 
      [ 
       for x in Regex(pattern1).Matches(html) do 
        yield x.Groups.[2].Value 
      ] 
      |> List.filter (fun x -> Regex(pattern2).IsMatch(x)) 
     links 

    // Fetches a Web page. 
    let fetch (url : string) = 
     try 
      let req = WebRequest.Create(url) :?> HttpWebRequest 
      req.UserAgent <- "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)" 
      req.Timeout <- 5000 
      use resp = req.GetResponse() 
      let content = resp.ContentType 
      let isHtml = Regex("html").IsMatch(content) 
      match isHtml with 
      | true -> use stream = resp.GetResponseStream() 
         use reader = new StreamReader(stream) 
         let html = reader.ReadToEnd() 
         Some html 
      | false -> None 
     with 
     | _ -> None 

    let collectLinks url = 
     let html = fetch url 
     match html with 
     | Some x -> extractLinks x 
     | None -> [] 

open Helpers 

// Creates a mailbox that synchronizes printing to the console (so 
// that two calls to 'printfn' do not interleave when printing) 
let printer = 
    MailboxProcessor.Start(fun x -> async { 
     while true do 
     let! str = x.Receive() 
     System.Threading.Thread.CurrentThread.IsBackground <- false 
     printfn "%s" str }) 
// Hides standard 'printfn' function (formats the string using 
// 'kprintf' and then posts the result to the printer agent. 
let printfn fmt = 
    Printf.kprintf printer.Post fmt 

let crawl url limit = 
    // Concurrent queue for saving collected urls. 
    let q = ConcurrentQueue<string>() 

    // Holds crawled URLs. 
    let set = HashSet<string>() 


    let supervisor = 
     MailboxProcessor.Start(fun x -> async { 
      System.Threading.Thread.CurrentThread.IsBackground <- false 
      // The agent expects to receive 'Start' message first - the message 
      // carries a reply channel that is used to notify the caller 
      // when the agent completes crawling. 
      let! start = x.Receive() 
      let repl = 
       match start with 
       | Start repl -> repl 
       | _ -> failwith "Expected Start message!" 

      let rec loop run = 
       async { 
        let! msg = x.Receive() 
        match msg with 
        | Mailbox(mailbox) -> 
         let count = set.Count 
         if count < limit - 1 && run then 
          let url = q.TryDequeue() 
          match url with 
          | true, str -> if not (set.Contains str) then 
               let set'= set.Add str 
               mailbox.Post <| Url(Some str) 
               return! loop run 
              else 
               mailbox.Post <| Url None 
               return! loop run 

          | _ -> mailbox.Post <| Url None 
            return! loop run 
         else 
          printfn "[supervisor] reached limit" 
          // Wait for finishing 
          mailbox.Post Stop 
          return! loop run 
        | Stop -> printfn "[Supervisor] stop"; return! loop false 
        | Start _ -> failwith "Unexpected start message!" 
        | Url _ -> failwith "Unexpected URL message!" 
        | Done -> printfn "[Supervisor] Supervisor is done." 
           (x :> IDisposable).Dispose() 
           // Notify the caller that the agent has completed 
           repl.Reply(()) 
       } 
      do! loop true }) 


    let urlCollector = 
     MailboxProcessor.Start(fun y -> 
      let rec loop count = 
       async { 
        System.Threading.Thread.CurrentThread.IsBackground <- false 
        let! msg = y.TryReceive(6000) 
        match msg with 
        | Some message -> 
         match message with 
         | Url u -> 
          match u with 
          | Some url -> q.Enqueue url 
              return! loop count 
          | None -> return! loop count 
         | _ -> 
          match count with 
          | Gate -> (y :> IDisposable).Dispose() 
             printfn "[urlCollector] URL collector is done." 
             supervisor.Post Done 
          | _ -> return! loop (count + 1) 
        | None -> supervisor.Post Stop 
           return! loop count 
       } 
      loop 1) 

    /// Initializes a crawling agent. 
    let crawler id = 
     MailboxProcessor.Start(fun inbox -> 
      let rec loop() = 
       async { 
        System.Threading.Thread.CurrentThread.IsBackground <- false 
        let! msg = inbox.Receive() 
        match msg with 
        | Url x -> 
         match x with 
         | Some url -> 
           let links = collectLinks url 
           printfn "%s crawled by agent %d." url id 
           for link in links do 
            urlCollector.Post <| Url (Some link) 
           supervisor.Post(Mailbox(inbox)) 
           return! loop() 
         | None -> supervisor.Post(Mailbox(inbox)) 
            return! loop() 
        | _ -> printfn "Agent %d is done." id 
          urlCollector.Post Done 
          (inbox :> IDisposable).Dispose() 
        } 
      loop()) 

    // Send 'Start' message to the main agent. The result 
    // is asynchronous workflow that will complete when the 
    // agent crawling completes 
    let result = supervisor.PostAndAsyncReply(Start) 
    // Spawn the crawlers. 
    let crawlers = 
     [ 
      for i in 1 .. Gate do 
       yield crawler i 
     ] 

    // Post the first messages. 
    crawlers.Head.Post <| Url (Some url) 
    crawlers.Tail |> List.iter (fun ag -> ag.Post <| Url None) 
    printfn "[Crawl] before return result" 
    result 

// Example: 
printfn "[Main] before crawl" 
crawl "http://news.google.com" 5 
|> Async.RunSynchronously 
printfn "[Main] after crawl" 
+1

你能否簡化你的代碼,這樣仍然存在問題,但不包含大量不相關的代碼? – svick

回答

0

我想我已經解決了這個問題:在打印機代理中添加System.Threading.Thread.CurrentThread.IsBackground <- false之後的let!

但是,我試圖通過在所有的let!之後加入System.Threading.Thread.CurrentThread.IsBackground <- false來修改原始代碼(Tomas的AsyncChannel修復之前的第一個版本),但它仍然無效。不知道。

感謝大家的幫助。我終於可以啓動我的第一個F#應用程序進行批處理。我認爲MailboxProcessor應該默認將IsBackground設置爲false。無論如何要求微軟改變它。

[更新]剛剛發現編譯後的程序集運行良好。但fsi --user:Program --exec --quiet仍然是一樣的。這似乎是一個fsi的錯誤?

+0

功能建議可以發送到[email protected]。 – kvb

+0

郵箱處理器線程應該是後臺線程,所有線程池線程都應該使用控制流來停止程序終止。 – 7sharp9

0

需要提醒的是,我知道零F#,但通常你等待使用Thread.Join所有感興趣的主題。在我看來,就像你的情況一樣,你需要等待通過致電.Start開始的任何有趣的事情。

您也可以考慮任務並行庫,它可以爲您提供更高級別(更簡單)的抽象到原始託管線程上。等待任務完成的示例here

+0

謝謝。任務並行庫似乎還沒有在F#中可用。我認爲MailboxProcessor是處理這類任務的好機制。看來MailboxProcessor還沒有像多個MailboxProcessers一樣的Thread.Join。 – ca9163d9

+2

對於它的價值,TPL可以從F#100%使用(與所有的.NET庫一樣)。但是,它可能沒有從F#庫中期望的語法甜頭。此外,對於你的代碼似乎在做什麼,我認爲MailboxProcessors是一個很好的「適合」。 – pblasucci

3

如果我正確識別代碼,它是基於your previous question(和我的答案)。

程序等待直到主管劑完成(通過發送Start消息,然後等待使用RunSynchronously的答覆)。這應該保證主代理以及所有爬蟲在應用程序退出前完成。

問題是,它不會等到printer代理完成!因此,對(重新定義的)函數的最後一次調用向代理髮送消息,然後應用程序完成而不必等到打印代理完成。

據我所知,沒有「標準模式」用於等待代理完成處理當前隊列中的所有消息。一些想法,你可以嘗試是:

  • 您可以檢查CurrentQueueLength財產(等到它爲0),但仍然不意味着代理完成處理所有消息。

  • 通過添加新類型的消息並等待代理回覆該消息(就像您當前正在等待對Start消息的回覆),您可以使代理更加複雜。

+0

是的,它基於我以前的問題和答案。非常感謝解決方案。似乎除監督員以外的所有代理人也需要「開始」消息,並且由於現在有(3 + 5)代理,所以它可能非常複雜。只是想知道是否有更好的解決方案。 – ca9163d9

+0

@NickW - 只有當它收到來自所有url收集器(必須收到來自所有爬蟲的「Done」消息)的「Done」消息後,'supervisor'才返回'Done'消息。所以,主管應該在所有其他(打印機除外)完成後才能結束。我認爲解決方案雖然有點太複雜。我可能只使用一個URL隊列,也許使用這個代理實現:http://tomasp.net/blog/parallel-extra-blockingagent.aspx –

0

當線程設置爲true時,.NET線程具有屬性Thread.IsBackground,線程不會阻止進程退出。當設置爲false時,它將阻止進程退出。請參閱:http://msdn.microsoft.com/en-us/library/system.threading.thread.isbackground.aspx

運行代理的線程來自線程池,因此默認情況下已將Thread.IsBackground設置爲false。

您可能會嘗試在每次讀取消息時將線程的IsBackground設置爲false。你可以添加一個函數來爲你做這個,使得方法更清晰。這可能不是解決問題的最佳方法,因爲每次使用let時都是如此!你可以改變線程,所以需要仔細實施才能正常工作。我只是想過提及它回答具體問題

任何方式等待所有線程結束?

並幫助人們理解某些線程爲什麼會停止程序退出而其他線程不會。

+0

它的其他方法不是嗎? http://msdn.microsoft.com/en-us/library/h339syd0.aspx – 7sharp9