以下是我將System.Diagnostics.Process轉換爲IConnectableObservable的嘗試。 此解決方案有一個問題:我想要連續收聽標準輸出和錯誤,並使用事件Process.Exited作爲OnCompleted的觸發器。不幸的是,我發現在輸出緩衝區爲空之前,Process.Exited被提出。這意味着,如果沒有線程睡眠的難題,我可以重現輸出不通過OnNext語句提供服務的情況。從進程輸出緩衝區問題創建IConnectableObservable <string>
問題1:您是否看到針對此問題的任何解決方法?第二季度:關於System.Reactive:我在解決方案中做了什麼更好的事情?
問候,
馬庫斯
public static class RxProcessUtilities
{
/// <summary>
/// Creates a connectable observable for a process.
/// </summary>
/// <remarks>Must be a connectable observable in order to hinder multiple
/// subscriptions to call the process multiple times.</remarks>
/// <param name="process">The process.</param>
/// <returns></returns>
public static IConnectableObservable<string> CreateConnectableObservableProcess
(string filename, string arguments, IObservable<string> input = null)
{
var observable = Observable.Using(() =>
{
Process process = new Process();
// process configuration
process.StartInfo.FileName = filename;
process.StartInfo.Arguments = arguments;
process.StartInfo.CreateNoWindow = true;
process.StartInfo.UseShellExecute = false;
process.EnableRaisingEvents = true;
process.StartInfo.RedirectStandardError = true;
process.StartInfo.RedirectStandardOutput = true;
if (null != input)
{
process.StartInfo.RedirectStandardInput = true;
input.Subscribe(s =>
{
if (!process.HasExited)
{
process.StandardInput.Write(s);
}
});
}
return process;
},
process =>
{
return Observable.Create<string>(
(IObserver<string> observer) =>
{
// listen to stdout and stderr
var stdOut = RxProcessUtilities.CreateStandardOutputObservable(process);
var stdErr = RxProcessUtilities.CreateStandardErrorObservable(process);
var stdOutSubscription = stdOut.Subscribe(observer);
var stdErrSubscription = stdErr.Subscribe(observer);
var processExited = Observable.FromEventPattern
(h => process.Exited += h, h => process.Exited -= h);
var processError = processExited.Subscribe(args =>
{
// Here is my problem: process sends exited event *before* all
// *DataReceived events have been raised
// My ugly workaround for process exit before stdout and stderr buffers are empty.
Thread.Sleep(2000);
// Also: AFAICS we cannot read synchronously what is left in the buffer,
// since we started asynchronously. This will throw:
// string restOfStdOut = process.StandardOutput.ReadToEnd();
// string restOfStdErr = process.StandardError.ReadToEnd();
if (process.ExitCode != 0)
{
observer.OnError(new Exception
(String.Format("Process '{0}' terminated with error code {1}",
process.StartInfo.FileName, process.ExitCode)));
}
else
{
observer.OnCompleted();
}
});
process.Start();
process.BeginOutputReadLine();
process.BeginErrorReadLine();
return new CompositeDisposable
(stdOutSubscription,
stdErrSubscription,
processError);
});
});
return observable.Publish();
}
/// <summary>
/// Creates an IObservable<string> for the standard error of a process.
/// </summary>
/// <param name="process">The process.</param>
/// <returns></returns>
public static IObservable<string> CreateStandardErrorObservable(Process process)
{
// var processExited = Observable.FromEventPattern
// (h => process.Exited += h, h => process.Exited -= h);
var receivedStdErr =
Observable.FromEventPattern<DataReceivedEventHandler, DataReceivedEventArgs>
(h => process.ErrorDataReceived += h,
h => process.ErrorDataReceived -= h)
//.TakeUntil(processExited)
// cannot be used here, since process exited event might be raised
// before all stderr and stdout events occurred.
.Select(e => e.EventArgs.Data);
return Observable.Create<string>(observer =>
{
var cancel = Disposable.Create(process.CancelErrorRead);
return new CompositeDisposable(cancel, receivedStdErr.Subscribe(observer));
});
}
/// <summary>
/// Creates an IObservable<string> for the standard output of a process.
/// </summary>
/// <param name="process">The process.</param>
/// <returns></returns>
public static IObservable<string> CreateStandardOutputObservable(Process process)
{
var receivedStdOut =
Observable.FromEventPattern<DataReceivedEventHandler, DataReceivedEventArgs>
(h => process.OutputDataReceived += h,
h => process.OutputDataReceived -= h)
.Select(e => e.EventArgs.Data);
return Observable.Create<string>(observer =>
{
var cancel = Disposable.Create(process.CancelOutputRead);
return new CompositeDisposable(cancel, receivedStdOut.Subscribe(observer));
});
}
}
低於[http://msdn.microsoft.com/en-us/library/fb4aw7b8.aspx](http://msdn.microsoft.com/en -us/library/fb4aw7b8.aspx)我發現使用WaitForExit()來刷新標準輸出的提示:「此重載確保所有處理已完成,包括處理重定向標準輸出的異步事件。」並且還提示使用Process.Close()。所以這可能是Q1的答案。 – 2012-07-29 20:19:52