2013-05-30 28 views
0

我使用基於TCP的CustomBinding在同一進程(用於測試)中設置WCF雙工服務(單向消息)和多個客戶端。WCF雙工問題與不同線程中的多個客戶端

只要有一個客戶端被回叫,這一切都可以正常工作。但它對多個客戶端失敗。在後一種情況下,一個客戶工作,其他客戶可以發送他們的請求,但沒有得到迴應。服務器可以毫無問題地發送所有響應。 WCF跟蹤顯示在客戶端的EndpointNotFoundException:

There was no channel that could accept the message with action 
'http://tempuri.org/IMyService/Response'. 
at System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(Exception e, Message message) 
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.ProcessItem(TInnerItem item) 
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.HandleReceiveResult(IAsyncResult result) 
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.OnReceiveCompleteStatic(IAsyncResult result) 
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result) 
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously) 
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception) 
at System.ServiceModel.Channels.InputChannel.HelpReceiveAsyncResult.OnReceive(IAsyncResult result) 
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result) 
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously) 
at System.Runtime.InputQueue`1.AsyncQueueReader.Set(Item item) 
at System.Runtime.InputQueue`1.Dispatch() 
at System.ServiceModel.Channels.SingletonChannelAcceptor`3.DispatchItems() 
at System.ServiceModel.Channels.DuplexSessionOneWayChannelListener.ChannelReceiver.OnReceive(IAsyncResult result) 
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result) 
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously) 
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception) 
at System.ServiceModel.Channels.FramingDuplexSessionChannel.TryReceiveAsyncResult.OnReceive(IAsyncResult result) 
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result) 
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously) 
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception) 
at System.ServiceModel.Channels.SynchronizedMessageSource.ReceiveAsyncResult.OnReceiveComplete(Object state) 
at System.ServiceModel.Channels.SessionConnectionReader.OnAsyncReadComplete(Object state) 
at System.ServiceModel.Channels.TracingConnection.TracingConnectionState.ExecuteCallback() 
at System.ServiceModel.Channels.TracingConnection.WaitCallback(Object state) 
at System.ServiceModel.Channels.SocketConnection.FinishRead() 
at System.ServiceModel.Channels.SocketConnection.AsyncReadCallback(Boolean haveResult, Int32 error, Int32 bytesRead) 
at System.ServiceModel.Channels.OverlappedContext.CompleteCallback(UInt32 error, UInt32 numBytes, NativeOverlapped* nativeOverlapped) 
at System.Runtime.Fx.IOCompletionThunk.UnhandledExceptionFrame(UInt32 error, UInt32 bytesRead, NativeOverlapped* nativeOverlapped) 
at System.Threading._IOCompletionCallback.PerformIOCompletionCallback(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* pOVERLAP) 

在我相信所有的客戶渠道仍然是開放的,因爲它們只關閉後,他們收到的響應異常的時間。 它看起來像客戶端收到消息,但不能將其發送到客戶端實例。

這裏我完整的例子(代碼WCF配置):

using System; 
using System.Text; 
using System.Collections.Generic; 
using System.Linq; 
using Microsoft.VisualStudio.TestTools.UnitTesting; 
using System.ServiceModel; 
using System.ServiceModel.Channels; 
using System.Threading; 
using System.Threading.Tasks; 

namespace WcfDuplex 
{ 
    [TestClass] 
    public class WcfDuplexTest 
    { 
    [TestMethod] 
    public void WcfDuplexTest1() 
    { 
     const int NumParallelRequests = 2; 
     const int NumMessagesPerThread = 1; 
     using (var host = MyServer.CreateServer(TestContext)) 
     { 
     Action clientAction =() => 
      { 
      using (var client = MyClient.CreateProxy(TestContext)) 
      { 
       using (var scope = new OperationContextScope(client)) 
       { 
       var callback = MyClient.GetCallbackHandler(client); 
       OperationContext.Current.OutgoingMessageHeaders.ReplyTo = client.LocalAddress; 
       for (int i = 1; i <= NumMessagesPerThread; i++) 
       { 
        string message = String.Format("Message {0} from tread {1}", i, Thread.CurrentThread.ManagedThreadId); 
        client.Request(message); 
        bool success = callback.MessageArrived.WaitOne(5000); 
        Assert.IsTrue(success, "Timeout while waiting for: " + message); 
        Assert.IsTrue(callback.Message.EndsWith(message)); 
       } 
       } 
       client.Close(); 
      } 
      }; 
     var tasks = new List<Task>(); 
     for (int i = 0; i < NumParallelRequests; i++) 
      tasks.Add(Task.Factory.StartNew(clientAction)); 
     foreach (var task in tasks) 
      task.Wait(10000); 
     } 
    } 

    public TestContext TestContext 
    { 
     get; 
     set; 
    } 
    } 

    [ServiceContract(CallbackContract = typeof(IMyCallback))] 
    interface IMyService 
    { 
    [OperationContract(IsOneWay = true)] 
    void Request(string message); 
    } 

    [ServiceContract()] 
    interface IMyCallback 
    { 
    [OperationContract(IsOneWay = true)] 
    void Response(string message); 
    } 

    interface IMyServiceChannel : IMyService, IClientChannel 
    { } 

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)] 
    class MyService : IMyService 
    { 
    public void Request(string message) 
    { 
     var context = OperationContext.Current.Host.Extensions.Find<TestContextExtension>().TestContext; 
     var callback = OperationContext.Current.GetCallbackChannel<IMyCallback>(); 
     OperationContext.Current.OutgoingMessageHeaders.To = OperationContext.Current.IncomingMessageHeaders.ReplyTo.Uri; 
     context.WriteLine("Server received message: {0}. Reply to {1}", message, OperationContext.Current.OutgoingMessageHeaders.To); 
     string responseMessage = "From server thread " + Thread.CurrentThread.ManagedThreadId + ": " + message; 
     callback.Response(responseMessage); 
     context.WriteLine("Server sent response: " + responseMessage); 
    } 
    } 

    class MyCallbackHandler : IMyCallback, IExtension<IContextChannel> 
    { 

    private readonly TestContext Context; 
    public string Message { get; set; } 
    public AutoResetEvent MessageArrived { get; private set; } 

    public MyCallbackHandler(TestContext context) 
    { 
     Context = context; 
     MessageArrived = new AutoResetEvent(false); 
    } 

    public void Response(string message) 
    { 
     Message = message; 
     Context.WriteLine("Client received message: " + message + " on " + OperationContext.Current.Channel.LocalAddress + 
     " in thread " + Thread.CurrentThread.ManagedThreadId); 
     MessageArrived.Set(); 
    } 

    public void Attach(IContextChannel owner) { } 
    public void Detach(IContextChannel owner) { } 
    } 

    class MyServer 
    { 
    public const string Url = "net.tcp://localhost:8731/MyService/"; 

    public static ServiceHost CreateServer(TestContext context) 
    { 
     var host = new ServiceHost(typeof(MyService)); 
     host.Extensions.Add(new TestContextExtension { TestContext = context }); 
     host.AddServiceEndpoint(typeof(IMyService), MyClient.GetBinding(), new Uri(MyServer.Url)); 
     host.Open(); 
     return host; 
    } 
    } 

    class TestContextExtension : IExtension<ServiceHostBase> 
    { 
    public TestContext TestContext { get; set; } 
    public void Attach(ServiceHostBase owner) { } 
    public void Detach(ServiceHostBase owner) { } 
    } 

    class MyClient 
    { 
    public static MyCallbackHandler GetCallbackHandler(IMyServiceChannel channel) 
    { 
     var callback = channel.Extensions.Find<MyCallbackHandler>(); 
     return callback; 
    } 

    public static IMyServiceChannel CreateProxy(TestContext testContext) 
    { 
     var callback = new MyCallbackHandler(testContext); 
     var instanceContext = new InstanceContext(callback); 
     var binding = GetBinding(); 
     int port = 8732;// +Thread.CurrentThread.ManagedThreadId; 
     binding.Elements.Find<CompositeDuplexBindingElement>().ClientBaseAddress = new Uri(String.Format("net.tcp://localhost:{0}/Client/", port)); 
     binding.OpenTimeout = TimeSpan.FromMinutes(5); 
     var clientFactory = new DuplexChannelFactory<IMyServiceChannel>(instanceContext, binding); 
     var client = clientFactory.CreateChannel(new EndpointAddress(MyServer.Url)); 
     client.Extensions.Add(callback); 
     return client; 
    } 

    public static CustomBinding GetBinding() 
    { 
     var binding = new CustomBinding(
     new CompositeDuplexBindingElement(), 
     new OneWayBindingElement(), 
     new BinaryMessageEncodingBindingElement(), 
     //new ReliableSessionBindingElement(), 
     new TcpTransportBindingElement()); 
     return binding; 
    } 
    } 
} 

我用下面的文章爲幫助:hereherehere

回答

0

問題被綁定。 什麼工作原理是:

var binding = new CustomBinding(
    new BinaryMessageEncodingBindingElement(), 
    new ReliableSessionBindingElement(), 
    new TcpTransportBindingElement()); 

的CompositeDuplexBindingElement和OneWayBindingElement造成麻煩。它們不是TCP類型通道所必需的。

什麼也的作品是:

var binding = new NetTcpBinding(SecurityMode.None); 

var binding = new WSDualHttpBinding(WSDualHttpSecurityMode.None); 
binding.ClientBaseAddress = new Uri("http://localhost:8732/Client/");