我們正試圖定義一個服務織物無狀態服務的UDP數據,監聽器。服務織物無狀態服務器的自定義UDP監聽器
我們正在與微軟合作誰也說,這是支持的,我應該設置爲TCP;下面是從ServiceManifest.xml文件中的片段:
<Resources>
<Endpoints>
<!-- This endpoint is used by the communication listener to obtain the port on which to
listen. Please note that if your service is partitioned, this port is shared with
replicas of different partitions that are placed in your code. -->
<Endpoint Name="ServiceEndpoint" Protocol="tcp" Port="12345" Type="Input" />
</Endpoints>
</Resources>
服務啓動正常,但我不能讓服務接收任何UDP數據,如果我做了netstat -a
我什麼都看不到聽在TCP或端口UDP。
我已經做了很多的研究對在線和我還沒有發現太多有關創建自定義ICommunicationListener但我希望別人也許可以驗證這應該是可能的SF。
下面是ICommunicationListener實現:
public UdpCommunicationListener(string serviceEndPoint,
ServiceInitializationParameters serviceInitializationParameters, Action<UdpReceiveResult> connector)
{
if (serviceInitializationParameters == null)
{
throw new ArgumentNullException(nameof(serviceInitializationParameters));
}
var endPoint = serviceInitializationParameters
.CodePackageActivationContext
.GetEndpoint(serviceEndPoint ?? "ServiceEndPoint");
_connector = connector;
_ipAddress = FabricRuntime.GetNodeContext().IPAddressOrFQDN;
_port = endPoint.Port;
_server = new UdpServer(_ipAddress, _port);
_server.Open();
}
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
_listener = _server.Listen(_connector);
return Task.FromResult($"udp::{_ipAddress}:{_port}");
}
public Task CloseAsync(CancellationToken cancellationToken)
{
this.Abort();
return Task.FromResult(true);
}
public void Abort()
{
_listener.Dispose();
_server?.Close();
}
}
public class UdpServer
{
private readonly UdpClient _udpClient;
private IObservable<UdpReceiveResult> _receiveStream;
public UdpServer(string ipAddress, int port)
{
Id = Guid.NewGuid();
_udpClient = new UdpClient(ipAddress, port);
}
public Guid Id { get; }
public void Open()
{
_receiveStream = _udpClient.ReceiveStream().Publish().RefCount();
}
public void Close()
{
//TODO: Not sure how to stop the process
}
public IDisposable Listen(Action<UdpReceiveResult> process)
{
return _receiveStream.Subscribe(async r =>
{
process(r);
});
}
}
能否請您發佈'ICommunicationListener'碼? –