2012-07-28 43 views
1

以下是我將System.Diagnostics.Process轉換爲IConnectableObservable的嘗試。 此解決方案有一個問題:我想要連續收聽標準輸出和錯誤,並使用事件Process.Exited作爲OnCompleted的觸發器。不幸的是,我發現在輸出緩衝區爲空之前,Process.Exited被提出。這意味着,如果沒有線程睡眠的難題,我可以重現輸出不通過OnNext語句提供服務的情況。從進程輸出緩衝區問題創建IConnectableObservable <string>

問題1:您是否看到針對此問題的任何解決方法?第二季度:關於System.Reactive:我在解決方案中做了什麼更好的事情?

問候,

馬庫斯

public static class RxProcessUtilities 
{ 
    /// <summary> 
    /// Creates a connectable observable for a process. 
    /// </summary> 
    /// <remarks>Must be a connectable observable in order to hinder multiple 
    /// subscriptions to call the process multiple times.</remarks> 
    /// <param name="process">The process.</param> 
    /// <returns></returns> 
    public static IConnectableObservable<string> CreateConnectableObservableProcess 
     (string filename, string arguments, IObservable<string> input = null) 
    { 
     var observable = Observable.Using(() => 
      { 
       Process process = new Process(); 

       // process configuration 
       process.StartInfo.FileName = filename; 
       process.StartInfo.Arguments = arguments; 
       process.StartInfo.CreateNoWindow = true; 
       process.StartInfo.UseShellExecute = false; 

       process.EnableRaisingEvents = true; 
       process.StartInfo.RedirectStandardError = true; 
       process.StartInfo.RedirectStandardOutput = true; 

       if (null != input) 
       { 
        process.StartInfo.RedirectStandardInput = true; 

        input.Subscribe(s => 
         { 
          if (!process.HasExited) 
          { 
           process.StandardInput.Write(s); 
          } 
         }); 
       } 

       return process; 
      }, 
      process => 
      { 
       return Observable.Create<string>(
       (IObserver<string> observer) => 
       { 
        // listen to stdout and stderr 
        var stdOut = RxProcessUtilities.CreateStandardOutputObservable(process); 
        var stdErr = RxProcessUtilities.CreateStandardErrorObservable(process); 

        var stdOutSubscription = stdOut.Subscribe(observer); 
        var stdErrSubscription = stdErr.Subscribe(observer); 

        var processExited = Observable.FromEventPattern 
        (h => process.Exited += h, h => process.Exited -= h); 

        var processError = processExited.Subscribe(args => 
        { 
         // Here is my problem: process sends exited event *before* all 
         // *DataReceived events have been raised 

         // My ugly workaround for process exit before stdout and stderr buffers are empty. 
         Thread.Sleep(2000); 

         // Also: AFAICS we cannot read synchronously what is left in the buffer, 
         // since we started asynchronously. This will throw: 
         // string restOfStdOut = process.StandardOutput.ReadToEnd(); 
         // string restOfStdErr = process.StandardError.ReadToEnd(); 

         if (process.ExitCode != 0) 
         { 
          observer.OnError(new Exception 
           (String.Format("Process '{0}' terminated with error code {1}", 
           process.StartInfo.FileName, process.ExitCode))); 
         } 
         else 
         { 
          observer.OnCompleted(); 
         } 
        }); 

        process.Start(); 

        process.BeginOutputReadLine(); 
        process.BeginErrorReadLine(); 

        return new CompositeDisposable 
         (stdOutSubscription, 
         stdErrSubscription, 
         processError); 
       }); 
      }); 

     return observable.Publish(); 
    } 

    /// <summary> 
    /// Creates an IObservable&lt;string&gt; for the standard error of a process. 
    /// </summary> 
    /// <param name="process">The process.</param> 
    /// <returns></returns> 
    public static IObservable<string> CreateStandardErrorObservable(Process process) 
    { 
     // var processExited = Observable.FromEventPattern 
     // (h => process.Exited += h, h => process.Exited -= h); 

     var receivedStdErr = 
      Observable.FromEventPattern<DataReceivedEventHandler, DataReceivedEventArgs> 
       (h => process.ErrorDataReceived += h, 
       h => process.ErrorDataReceived -= h) 
      //.TakeUntil(processExited) 
      // cannot be used here, since process exited event might be raised 
      // before all stderr and stdout events occurred. 
      .Select(e => e.EventArgs.Data); 

     return Observable.Create<string>(observer => 
     { 
      var cancel = Disposable.Create(process.CancelErrorRead); 

      return new CompositeDisposable(cancel, receivedStdErr.Subscribe(observer)); 
     }); 
    } 

    /// <summary> 
    /// Creates an IObservable&lt;string&gt; for the standard output of a process. 
    /// </summary> 
    /// <param name="process">The process.</param> 
    /// <returns></returns> 
    public static IObservable<string> CreateStandardOutputObservable(Process process) 
    { 
     var receivedStdOut = 
      Observable.FromEventPattern<DataReceivedEventHandler, DataReceivedEventArgs> 
      (h => process.OutputDataReceived += h, 
      h => process.OutputDataReceived -= h) 
      .Select(e => e.EventArgs.Data); 

     return Observable.Create<string>(observer => 
     { 
      var cancel = Disposable.Create(process.CancelOutputRead); 

      return new CompositeDisposable(cancel, receivedStdOut.Subscribe(observer)); 
     }); 
    } 
} 
+0

低於[http://msdn.microsoft.com/en-us/library/fb4aw7b8.aspx](http://msdn.microsoft.com/en -us/library/fb4aw7b8.aspx)我發現使用WaitForExit()來刷新標準輸出的提示:「此重載確保所有處理已完成,包括處理重定向標準輸出的異步事件。」並且還提示使用Process.Close()。所以這可能是Q1的答案。 – 2012-07-29 20:19:52

回答

1

訣竅是

process.WaitForExit(); 

參見http://msdn.microsoft.com/en-us/library/fb4aw7b8.aspx:「此重載[WaitForExit()]確保所有處理已經完成,包括處理對於重定向標準輸出的異步事件,在標準輸出重新調用時,應在WaitForExit(Int32)過載調用後使用此重載受到異步事件處理程序的影響。「

下面是完整的解決方案:

/// <summary> 
    /// Creates a connectable observable for a process. 
    /// </summary> 
    /// <remarks>Must be a connectable observable in order to hinder multiple subscriptions to call the process multiple times.</remarks> 
    /// <param name="process">The process.</param> 
    /// <returns></returns> 
    public static IConnectableObservable<string> CreateConnectableObservableProcess(string filename, string arguments, IObservable<string> input = null) 
    { 
     var observable = Observable.Using(() => 
      { 
       Process process = new Process(); 

       // process configuration 
       process.StartInfo.FileName = filename; 
       process.StartInfo.Arguments = arguments; 
       process.StartInfo.CreateNoWindow = true; 
       process.StartInfo.UseShellExecute = false; 

       process.EnableRaisingEvents = true; 
       process.StartInfo.RedirectStandardError = true; 
       process.StartInfo.RedirectStandardOutput = true; 

       if (null != input) 
       { 
        process.StartInfo.RedirectStandardInput = true; 

        input.Subscribe(s => 
         { 
          if (!process.HasExited) 
          { 
           process.StandardInput.Write(s); 
          } 
         }); 
       } 

       return process; 
      }, 
      process => 
      { 
       return Observable.Create<string>(
        (IObserver<string> observer) => 
        { 
         // listen to stdout and stderr 
         var stdOut = RxProcessUtilities.CreateStandardOutputObservable(process); 
         var stdErr = RxProcessUtilities.CreateStandardErrorObservable(process); 

         var stdOutSubscription = stdOut.Subscribe(observer); 
         var stdErrSubscription = stdErr.Subscribe(observer); 

         var processExited = Observable.FromEventPattern(h => process.Exited += h, h => process.Exited -= h); 

         var processError = processExited.Subscribe(args => 
         { 
          process.WaitForExit(); 

          try 
          { 
           if (process.ExitCode != 0) 
           { 
            observer.OnError(new Exception(String.Format("Process '{0}' terminated with error code {1}", 
             process.StartInfo.FileName, process.ExitCode))); 
           } 
           else 
           { 
            observer.OnCompleted(); 
           } 
          } 
          finally 
          { 
           process.Close(); 
          } 
         }); 

         process.Start(); 

         process.BeginOutputReadLine(); 
         process.BeginErrorReadLine(); 

         return new CompositeDisposable(stdOutSubscription, 
                 stdErrSubscription, 
                 processError); 
        }); 
      }); 

     return observable.Publish(); 
    } 
+0

是否有可能在此解決方案中添加取消功能?我想通過點擊按鈕或其他東西來阻止進程。 – Vegar 2015-02-02 23:34:11