2011-11-11 76 views
2

我使用PLINQ運行測試串行端口以確定它們是否爲GPS設備的函數。Parallel Linq - 返回返回的第一個結果

立即發現一些串行端口是有效的GPS。在這種情況下,我希望第一個完成測試是返回的測試。我不想等待其餘的結果。

我可以用PLINQ來做到這一點,還是必須安排一批任務並等待一個返回?

+0

我正要發佈這個確切的問題,並找到了這個。不幸的是,接受的答案是不正確的。這是不可能的4.0(我可以看到它可能在4.5)。 – bj0

回答

0

經過進一步審查,你顯然可以使用FirstOrDefault來解決這個問題。 PLINQ將不會保留默認的排序,並且使用未緩存的查詢將立即返回。

http://msdn.microsoft.com/en-us/library/dd460677.aspx

+1

我沒有看到這種行爲。 FirstOrDefault仍然返回PLINQ查詢中第一個輸入的結果(即使使用AsUnordered)。 – bj0

6

PLINQ在這裏可能不夠用。雖然您可以在.NET 4中使用.First,但這會導致它按順序運行,從而影響目的。 (請注意,這will be improved in .NET 4.5

的TPL,不過,最有可能是這裏的正確答案。您可以爲每個串行端口創建一個Task<Location>,然後使用Task.WaitAny等待第一次成功操作。

這提供了一個簡單的方法來安排一堆「任務」,然後只使用第一個結果。

+0

這個問題是,我現在需要爲每個任務WaitAny,查看結果,如果這是一個積極的結果,取消剩餘的任務,或者如果它是否定的,繼續等待,同時仍然小心不要等待,如果沒有任務離開。對於這樣一個看似簡單的任務來說,它的邏輯相當複雜;有沒有更好的辦法? –

+0

@DavidPfeffer你可以在'BlockingCollection '中推送結果,並有一個線程調用它的GetConsumingEnumerable ...只要你得到一個「有效」的結果,觸發一個取消令牌並使用它。 –

+0

如果沒有任何端口有效,情況如何? –

0

我一直在思考這個打開和關閉,在過去幾天,我無法找到一個內置的PLINQ辦法做到這一點在C#4.0。使用FirstOrDefault這個問題的接受答案在完整的PLINQ查詢完成並且仍然返回(有序的)第一個結果之前不會返回值。下面極端的例子顯示了行爲:

var cts = new CancellationTokenSource(); 
var rnd = new ThreadLocal<Random>(() => new Random()); 

var q = Enumerable.Range(0, 11).Select(x => x).AsParallel() 
    .WithCancellation(cts.Token).WithMergeOptions(ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered() 
    .Where(i => i % 2 == 0) 
    .Select(i => 
    { 
     if(i == 0) 
      Thread.Sleep(3000); 
     else 
      Thread.Sleep(rnd.Value.Next(50, 100)); 
     return string.Format("dat {0}", i).Dump(); 
    }); 

cts.CancelAfter(5000); 

// waits until all results are in, then returns first 
q.FirstOrDefault().Dump("result"); 

我沒有看到一個內置的方式立即獲得第一個可用的結果,但我能拿出兩種解決方法。

第一個創建任務來完成工作並返回任務,從而導致快速完成PLINQ查詢。所得到的任務可以被傳遞到了WaitAny儘快它是可用獲得的第一個結果:

var cts = new CancellationTokenSource(); 
var rnd = new ThreadLocal<Random>(() => new Random()); 

var q = Enumerable.Range(0, 11).Select(x => x).AsParallel() 
    .WithCancellation(cts.Token).WithMergeOptions(ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered() 
    .Where(i => i % 2 == 0) 
    .Select(i => 
    { 
     return Task.Factory.StartNew(() => 
     { 
     if(i == 0) 
      Thread.Sleep(3000); 
     else 
      Thread.Sleep(rnd.Value.Next(50, 100)); 
     return string.Format("dat {0}", i).Dump(); 
     }); 
    }); 

cts.CancelAfter(5000); 

// returns as soon as the tasks are created 
var ts = q.ToArray(); 

// wait till the first task finishes 
var idx = Task.WaitAny(ts); 
ts[idx].Result.Dump("res"); 

這可能是一個可怕的方式來做到這一點。由於PLINQ查詢的實際工作只是一個非常快速的Task.Factory.StartNew,所以完全可以使用PLINQ。 IEnumerable上的一個簡單的.Select(i => Task.Factory.StartNew(...更簡潔,可能更快。

第二個解決方法使用隊列(BlockingCollection),只是插入導致進入這個隊列一旦被計算:

var cts = new CancellationTokenSource(); 
var rnd = new ThreadLocal<Random>(() => new Random()); 

var q = Enumerable.Range(0, 11).Select(x => x).AsParallel() 
    .WithCancellation(cts.Token).WithMergeOptions(ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered() 
    .Where(i => i % 2 == 0) 
    .Select(i => 
    { 
     if(i == 0) 
      Thread.Sleep(3000); 
     else 
      Thread.Sleep(rnd.Value.Next(50, 100)); 
     return string.Format("dat {0}", i).Dump(); 
    }); 

cts.CancelAfter(5000); 

var qu = new BlockingCollection<string>(); 

// ForAll blocks until PLINQ query is complete 
Task.Factory.StartNew(() => q.ForAll(x => qu.Add(x))); 

// get first result asap 
qu.Take().Dump("result"); 

使用這種方法,工作使用PLINQ做的,而BlockingCollecion我們以()將一旦由PLINQ查詢插入,就返回第一個結果。

雖然這會產生預期的結果,我不知道它只是用簡單的任務+過任何優勢了WaitAny

0

要在.NET 4完全用PLINQ做到這一點。0:

SerialPorts.      // Your IEnumerable of serial ports 
    AsParallel().AsUnordered().  // Run as an unordered parallel query 
    Where(IsGps).     // Matching the predicate IsGps (Func<SerialPort, bool>) 
    Take(1).      // Taking the first match 
    FirstOrDefault();    // And unwrap it from the IEnumerable (or null if none are found 

的關鍵是,直到你已經指定了你只在乎曾經找到一個沒有使用像第一或FirstOrDefault有序的評價。