2015-12-16 25 views
1

我是新的F#以及Akka.Net並努力實現與他們以下幾點:Akka.net F#狀態的演員,正等待multipe FileSystemWatcher的可觀察到的事件

我想創建一個演員(尾),其接收文件位置,然後使用FileSystemWatcher和一些Observables在該位置監聽事件,並將它們作爲消息轉發給其他actor以進行處理。

我遇到的問題是,偵聽事件的代碼一次只會拾取一個事件,而忽略所有其他事件。例如如果我將20個文件複製到正在觀看的目錄中,它似乎只發送了其中的一個事件。

這裏是我的演員代碼:

module Tail 

open Akka 
open Akka.FSharp 
open Akka.Actor 
open System 
open Model 
open ObserveFiles 
open ConsoleWriteActor 

let handleTailMessages tm = 
    match tm with 
     | StartTail (f,r) -> 
      observeFile f consoleWriteActor |!> consoleWriteActor 

    |> ignore 

let spawnTail = 
    fun (a : Actor<IMessage>) -> 
    let rec l (count : int) = actor{ 

     let! m = a.Receive() 
     handleTailMessages m 
     return! l (count + 1) 
    } 
    l(0) 

和這裏的偵聽事件的代碼:花了相當多的黑客得到它的這一點

module ObserveFiles 
open System 
open System.IO 
open System.Threading 
open Model 
open Utils 
open Akka 
open Akka.FSharp 
open Akka.Actor 



let rec observeFile (absolutePath : string) (a : IActorRef) = async{ 

    let fsw = new FileSystemWatcher(
         Path = Path.GetDirectoryName(absolutePath), 
         Filter = "*.*", 
         EnableRaisingEvents = true, 
         NotifyFilter = (NotifyFilters.FileName ||| NotifyFilters.LastWrite ||| NotifyFilters.LastAccess ||| NotifyFilters.CreationTime ||| NotifyFilters.DirectoryName) 
         ) 

    let prepareMessage (args: EventArgs) = 
     let text = 
      match box args with 
      | :? FileSystemEventArgs as fsa -> 
       match fsa.ChangeType with 
       | WatcherChangeTypes.Changed -> "Changed " + fsa.Name 
       | WatcherChangeTypes.Created -> "Created " + fsa.Name 
       | WatcherChangeTypes.Deleted -> "Deleted " + fsa.Name 
       | WatcherChangeTypes.Renamed -> "Renamed " + fsa.Name 
       | _ -> "Some other change " + fsa.ChangeType.ToString() 
      | :? ErrorEventArgs as ea -> "Error: " + ea.GetException().Message 
      | o -> "some other unexpected event occurd" + o.GetType().ToString() 
     WriteMessage text 


    let sendMessage x = async{ async.Return(prepareMessage x) |!> a 
           return! observeFile absolutePath a } 

    let! occurance = 
     [ 
     fsw.Changed |> Observable.map(fun x -> sendMessage (x :> EventArgs)); 
     fsw.Created |> Observable.map(fun x -> sendMessage (x :> EventArgs)); 
     fsw.Deleted |> Observable.map(fun x -> sendMessage (x :> EventArgs)); 
     fsw.Renamed |> Observable.map(fun x -> sendMessage (x :> EventArgs)); 
     fsw.Error |> Observable.map(fun x -> sendMessage (x :> EventArgs)); 
     ] 
     |> List.reduce Observable.merge 
     |> Async.AwaitObservable 

    return! occurance 
} 

,如何我任何意見可以改變它,以便它在演員正在運行時拾取和處理所有事件將不勝感激。

回答

4

在設計這樣的任務,我們可以把它分成以下部分:

  1. 創建經理負責接收所有信息 - 它的主要作用是對進入目錄的監聽請求作出迴應。一旦請求進入,它會創建一個負責在此特定目錄下進行偵聽的子actor。
  2. 兒童演員負責管理FileSystemWatcher的特定路徑。它應該訂閱傳入的事件並將它們作爲消息重定向給負責接收更改事件的actor。它應該在關閉時釋放一次性資源。
  3. 負責接收變化事件的演員 - 在我們的例子中,通過在控制檯上顯示它們。

示例代碼:

open Akka.FSharp 
open System 
open System.IO 

let system = System.create "observer-system" <| Configuration.defaultConfig() 

let observer filePath consoleWriter (mailbox: Actor<_>) =  
    let fsw = new FileSystemWatcher(
         Path = filePath, 
         Filter = "*.*", 
         EnableRaisingEvents = true, 
         NotifyFilter = (NotifyFilters.FileName ||| NotifyFilters.LastWrite ||| NotifyFilters.LastAccess ||| NotifyFilters.CreationTime ||| NotifyFilters.DirectoryName) 
         ) 
    // subscribe to all incoming events - send them to consoleWriter 
    let subscription = 
     [fsw.Changed |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString()); 
     fsw.Created |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString()); 
     fsw.Deleted |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString()); 
     fsw.Renamed |> Observable.map(fun x -> x.Name + " " + x.ChangeType.ToString());] 
      |> List.reduce Observable.merge 
      |> Observable.subscribe(fun x -> consoleWriter <! x) 

    // don't forget to free resources at the end 
    mailbox.Defer <| fun() -> 
     subscription.Dispose() 
     fsw.Dispose() 

    let rec loop() = actor { 
     let! msg = mailbox.Receive() 
     return! loop() 
    } 
    loop() 

// create actor responsible for printing messages 
let writer = spawn system "console-writer" <| actorOf (printfn "%A") 

// create manager responsible for serving listeners for provided paths 
let manager = spawn system "manager" <| actorOf2 (fun mailbox filePath -> 
    spawn mailbox ("observer-" + Uri.EscapeDataString(filePath)) (observer filePath writer) |> ignore) 

manager <! "testDir" 
+0

我還沒有意識到,你可以創建一個演員所創建的觀察者,非常優雅的方式。 – Hugo