1
我是新的F#以及Akka.Net並努力實現與他們以下幾點:Akka.net F#狀態的演員,正等待multipe FileSystemWatcher的可觀察到的事件
我想創建一個演員(尾),其接收文件位置,然後使用FileSystemWatcher和一些Observables在該位置監聽事件,並將它們作爲消息轉發給其他actor以進行處理。
我遇到的問題是,偵聽事件的代碼一次只會拾取一個事件,而忽略所有其他事件。例如如果我將20個文件複製到正在觀看的目錄中,它似乎只發送了其中的一個事件。
這裏是我的演員代碼:
module Tail
open Akka
open Akka.FSharp
open Akka.Actor
open System
open Model
open ObserveFiles
open ConsoleWriteActor
let handleTailMessages tm =
match tm with
| StartTail (f,r) ->
observeFile f consoleWriteActor |!> consoleWriteActor
|> ignore
let spawnTail =
fun (a : Actor<IMessage>) ->
let rec l (count : int) = actor{
let! m = a.Receive()
handleTailMessages m
return! l (count + 1)
}
l(0)
和這裏的偵聽事件的代碼:花了相當多的黑客得到它的這一點
module ObserveFiles
open System
open System.IO
open System.Threading
open Model
open Utils
open Akka
open Akka.FSharp
open Akka.Actor
let rec observeFile (absolutePath : string) (a : IActorRef) = async{
let fsw = new FileSystemWatcher(
Path = Path.GetDirectoryName(absolutePath),
Filter = "*.*",
EnableRaisingEvents = true,
NotifyFilter = (NotifyFilters.FileName ||| NotifyFilters.LastWrite ||| NotifyFilters.LastAccess ||| NotifyFilters.CreationTime ||| NotifyFilters.DirectoryName)
)
let prepareMessage (args: EventArgs) =
let text =
match box args with
| :? FileSystemEventArgs as fsa ->
match fsa.ChangeType with
| WatcherChangeTypes.Changed -> "Changed " + fsa.Name
| WatcherChangeTypes.Created -> "Created " + fsa.Name
| WatcherChangeTypes.Deleted -> "Deleted " + fsa.Name
| WatcherChangeTypes.Renamed -> "Renamed " + fsa.Name
| _ -> "Some other change " + fsa.ChangeType.ToString()
| :? ErrorEventArgs as ea -> "Error: " + ea.GetException().Message
| o -> "some other unexpected event occurd" + o.GetType().ToString()
WriteMessage text
let sendMessage x = async{ async.Return(prepareMessage x) |!> a
return! observeFile absolutePath a }
let! occurance =
[
fsw.Changed |> Observable.map(fun x -> sendMessage (x :> EventArgs));
fsw.Created |> Observable.map(fun x -> sendMessage (x :> EventArgs));
fsw.Deleted |> Observable.map(fun x -> sendMessage (x :> EventArgs));
fsw.Renamed |> Observable.map(fun x -> sendMessage (x :> EventArgs));
fsw.Error |> Observable.map(fun x -> sendMessage (x :> EventArgs));
]
|> List.reduce Observable.merge
|> Async.AwaitObservable
return! occurance
}
,如何我任何意見可以改變它,以便它在演員正在運行時拾取和處理所有事件將不勝感激。
我還沒有意識到,你可以創建一個演員所創建的觀察者,非常優雅的方式。 – Hugo