0
我使用基於TCP的CustomBinding在同一進程(用於測試)中設置WCF雙工服務(單向消息)和多個客戶端。WCF雙工問題與不同線程中的多個客戶端
只要有一個客戶端被回叫,這一切都可以正常工作。但它對多個客戶端失敗。在後一種情況下,一個客戶工作,其他客戶可以發送他們的請求,但沒有得到迴應。服務器可以毫無問題地發送所有響應。 WCF跟蹤顯示在客戶端的EndpointNotFoundException:
There was no channel that could accept the message with action
'http://tempuri.org/IMyService/Response'.
at System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(Exception e, Message message)
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.ProcessItem(TInnerItem item)
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.HandleReceiveResult(IAsyncResult result)
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.OnReceiveCompleteStatic(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception)
at System.ServiceModel.Channels.InputChannel.HelpReceiveAsyncResult.OnReceive(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.InputQueue`1.AsyncQueueReader.Set(Item item)
at System.Runtime.InputQueue`1.Dispatch()
at System.ServiceModel.Channels.SingletonChannelAcceptor`3.DispatchItems()
at System.ServiceModel.Channels.DuplexSessionOneWayChannelListener.ChannelReceiver.OnReceive(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception)
at System.ServiceModel.Channels.FramingDuplexSessionChannel.TryReceiveAsyncResult.OnReceive(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception)
at System.ServiceModel.Channels.SynchronizedMessageSource.ReceiveAsyncResult.OnReceiveComplete(Object state)
at System.ServiceModel.Channels.SessionConnectionReader.OnAsyncReadComplete(Object state)
at System.ServiceModel.Channels.TracingConnection.TracingConnectionState.ExecuteCallback()
at System.ServiceModel.Channels.TracingConnection.WaitCallback(Object state)
at System.ServiceModel.Channels.SocketConnection.FinishRead()
at System.ServiceModel.Channels.SocketConnection.AsyncReadCallback(Boolean haveResult, Int32 error, Int32 bytesRead)
at System.ServiceModel.Channels.OverlappedContext.CompleteCallback(UInt32 error, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
at System.Runtime.Fx.IOCompletionThunk.UnhandledExceptionFrame(UInt32 error, UInt32 bytesRead, NativeOverlapped* nativeOverlapped)
at System.Threading._IOCompletionCallback.PerformIOCompletionCallback(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* pOVERLAP)
在我相信所有的客戶渠道仍然是開放的,因爲它們只關閉後,他們收到的響應異常的時間。 它看起來像客戶端收到消息,但不能將其發送到客戶端實例。
這裏我完整的例子(代碼WCF配置):
using System;
using System.Text;
using System.Collections.Generic;
using System.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Threading;
using System.Threading.Tasks;
namespace WcfDuplex
{
[TestClass]
public class WcfDuplexTest
{
[TestMethod]
public void WcfDuplexTest1()
{
const int NumParallelRequests = 2;
const int NumMessagesPerThread = 1;
using (var host = MyServer.CreateServer(TestContext))
{
Action clientAction =() =>
{
using (var client = MyClient.CreateProxy(TestContext))
{
using (var scope = new OperationContextScope(client))
{
var callback = MyClient.GetCallbackHandler(client);
OperationContext.Current.OutgoingMessageHeaders.ReplyTo = client.LocalAddress;
for (int i = 1; i <= NumMessagesPerThread; i++)
{
string message = String.Format("Message {0} from tread {1}", i, Thread.CurrentThread.ManagedThreadId);
client.Request(message);
bool success = callback.MessageArrived.WaitOne(5000);
Assert.IsTrue(success, "Timeout while waiting for: " + message);
Assert.IsTrue(callback.Message.EndsWith(message));
}
}
client.Close();
}
};
var tasks = new List<Task>();
for (int i = 0; i < NumParallelRequests; i++)
tasks.Add(Task.Factory.StartNew(clientAction));
foreach (var task in tasks)
task.Wait(10000);
}
}
public TestContext TestContext
{
get;
set;
}
}
[ServiceContract(CallbackContract = typeof(IMyCallback))]
interface IMyService
{
[OperationContract(IsOneWay = true)]
void Request(string message);
}
[ServiceContract()]
interface IMyCallback
{
[OperationContract(IsOneWay = true)]
void Response(string message);
}
interface IMyServiceChannel : IMyService, IClientChannel
{ }
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
class MyService : IMyService
{
public void Request(string message)
{
var context = OperationContext.Current.Host.Extensions.Find<TestContextExtension>().TestContext;
var callback = OperationContext.Current.GetCallbackChannel<IMyCallback>();
OperationContext.Current.OutgoingMessageHeaders.To = OperationContext.Current.IncomingMessageHeaders.ReplyTo.Uri;
context.WriteLine("Server received message: {0}. Reply to {1}", message, OperationContext.Current.OutgoingMessageHeaders.To);
string responseMessage = "From server thread " + Thread.CurrentThread.ManagedThreadId + ": " + message;
callback.Response(responseMessage);
context.WriteLine("Server sent response: " + responseMessage);
}
}
class MyCallbackHandler : IMyCallback, IExtension<IContextChannel>
{
private readonly TestContext Context;
public string Message { get; set; }
public AutoResetEvent MessageArrived { get; private set; }
public MyCallbackHandler(TestContext context)
{
Context = context;
MessageArrived = new AutoResetEvent(false);
}
public void Response(string message)
{
Message = message;
Context.WriteLine("Client received message: " + message + " on " + OperationContext.Current.Channel.LocalAddress +
" in thread " + Thread.CurrentThread.ManagedThreadId);
MessageArrived.Set();
}
public void Attach(IContextChannel owner) { }
public void Detach(IContextChannel owner) { }
}
class MyServer
{
public const string Url = "net.tcp://localhost:8731/MyService/";
public static ServiceHost CreateServer(TestContext context)
{
var host = new ServiceHost(typeof(MyService));
host.Extensions.Add(new TestContextExtension { TestContext = context });
host.AddServiceEndpoint(typeof(IMyService), MyClient.GetBinding(), new Uri(MyServer.Url));
host.Open();
return host;
}
}
class TestContextExtension : IExtension<ServiceHostBase>
{
public TestContext TestContext { get; set; }
public void Attach(ServiceHostBase owner) { }
public void Detach(ServiceHostBase owner) { }
}
class MyClient
{
public static MyCallbackHandler GetCallbackHandler(IMyServiceChannel channel)
{
var callback = channel.Extensions.Find<MyCallbackHandler>();
return callback;
}
public static IMyServiceChannel CreateProxy(TestContext testContext)
{
var callback = new MyCallbackHandler(testContext);
var instanceContext = new InstanceContext(callback);
var binding = GetBinding();
int port = 8732;// +Thread.CurrentThread.ManagedThreadId;
binding.Elements.Find<CompositeDuplexBindingElement>().ClientBaseAddress = new Uri(String.Format("net.tcp://localhost:{0}/Client/", port));
binding.OpenTimeout = TimeSpan.FromMinutes(5);
var clientFactory = new DuplexChannelFactory<IMyServiceChannel>(instanceContext, binding);
var client = clientFactory.CreateChannel(new EndpointAddress(MyServer.Url));
client.Extensions.Add(callback);
return client;
}
public static CustomBinding GetBinding()
{
var binding = new CustomBinding(
new CompositeDuplexBindingElement(),
new OneWayBindingElement(),
new BinaryMessageEncodingBindingElement(),
//new ReliableSessionBindingElement(),
new TcpTransportBindingElement());
return binding;
}
}
}