2016-03-09 98 views
2

我們正試圖定義一個服務織物無狀態服務的UDP數據,監聽器。服務織物無狀態服務器的自定義UDP監聽器

我們正在與微軟合作誰也說,這是支持的,我應該設置爲TCP;下面是從ServiceManifest.xml文件中的片段:

<Resources> 
    <Endpoints> 
     <!-- This endpoint is used by the communication listener to obtain the port on which to 
      listen. Please note that if your service is partitioned, this port is shared with 
      replicas of different partitions that are placed in your code. --> 
     <Endpoint Name="ServiceEndpoint" Protocol="tcp" Port="12345" Type="Input" /> 
    </Endpoints> 
</Resources> 

服務啓動正常,但我不能讓服務接收任何UDP數據,如果我做了netstat -a我什麼都看不到聽在TCP或端口UDP。

我已經做了很多的研究對在線和我還沒有發現太多有關創建自定義ICommunicationListener但我希望別人也許可以驗證這應該是可能的SF。

下面是ICommunicationListener實現:

public UdpCommunicationListener(string serviceEndPoint, 
      ServiceInitializationParameters serviceInitializationParameters, Action<UdpReceiveResult> connector) 
    { 
     if (serviceInitializationParameters == null) 
     { 
      throw new ArgumentNullException(nameof(serviceInitializationParameters)); 
     } 

     var endPoint = serviceInitializationParameters 
      .CodePackageActivationContext 
      .GetEndpoint(serviceEndPoint ?? "ServiceEndPoint"); 

     _connector = connector; 

     _ipAddress = FabricRuntime.GetNodeContext().IPAddressOrFQDN; 
     _port = endPoint.Port; 

     _server = new UdpServer(_ipAddress, _port); 

     _server.Open(); 
    } 

    public Task<string> OpenAsync(CancellationToken cancellationToken) 
    { 
     _listener = _server.Listen(_connector); 

     return Task.FromResult($"udp::{_ipAddress}:{_port}"); 
    } 

    public Task CloseAsync(CancellationToken cancellationToken) 
    { 
     this.Abort(); 

     return Task.FromResult(true); 
    } 

    public void Abort() 
    { 
     _listener.Dispose(); 
     _server?.Close(); 
    } 
} 

public class UdpServer 
{ 
    private readonly UdpClient _udpClient; 
    private IObservable<UdpReceiveResult> _receiveStream; 

    public UdpServer(string ipAddress, int port) 
    { 
     Id = Guid.NewGuid(); 

     _udpClient = new UdpClient(ipAddress, port); 
    } 

    public Guid Id { get; } 

    public void Open() 
    { 
     _receiveStream = _udpClient.ReceiveStream().Publish().RefCount(); 
    } 

    public void Close() 
    { 
     //TODO: Not sure how to stop the process 
    } 

    public IDisposable Listen(Action<UdpReceiveResult> process) 
    { 
     return _receiveStream.Subscribe(async r => 
     { 
       process(r); 
     }); 
    } 
} 
+0

能否請您發佈'ICommunicationListener'碼? –

回答

1

有與UdpServer組件,我解決的缺陷,這現在工作在服務織物局主辦。

與該行的代碼WA問題:

_udpClient = new UdpClient(ipAddress, port); 

這是監聽的流量錯過載,它需要的是:

_udpClient = new UdpClient(port); 

我曾嘗試:

_udpClient = new UdpClient(new IPAddress(IPAddress.Parse(_ipAddress)),port) 

但這不起作用;因爲來自Communication的線路(如它自己描述的那樣)檢索主機返回的是主機名而不是IPAddress,我認爲你可以通過對清單進行一些更改來改變這種行爲,但現在只需要端口就足夠了。

+0

所以問題是你的UdpServer實際上沒有綁定/監聽? –

+0

請提供解決方案,而不僅僅是您找到一個。否則其他人將無法使用此問題/答案。 –

+1

公平點 - 我已經爲實際解決方案增加了更多細節 –

-1

因爲只有協議HTTP/HTTPS和TCP的支持。我想你不能做一個udp協議有點事情。 Udp不可靠。我們能夠使用SignalR,但我猜Udp不起作用。

編輯:您可以在我的其他帖子看到,UDP是現在的工作。

+0

這是不正確的。所有協議均受支持。請參閱https://azure.microsoft.com/en-us/documentation/articles/service-fabric-reliable-services-communication/ –

+1

就像我上面說的,我得到了它的工作。 –

2

我有工作的udp作爲一個無狀態的服務。這是代碼:

UdpService.cs

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.Fabric; 
using System.Linq; 
using System.Net; 
using System.Threading; 
using System.Threading.Tasks; 
using Microsoft.ServiceFabric.Services.Communication.Runtime; 
using Microsoft.ServiceFabric.Services.Runtime; 

namespace UdpService 
{ 
    /// <summary> 
    /// An instance of this class is created for each service instance by the Service Fabric runtime. 
    /// </summary> 
    internal sealed class UdpService : StatelessService 
    { 
     private UdpCommunicationListener listener; 

     public UdpService(StatelessServiceContext context) 
      : base(context) 
     { } 

     protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners() 
     { 
      yield return new ServiceInstanceListener(initParams => 
      { 
       this.listener = new UdpCommunicationListener(); 
       this.listener.Initialize(initParams.CodePackageActivationContext); 

       return this.listener; 
      }); 
     } 
    } 
} 

UdpCommunicationListener

using System; 
using System.Diagnostics; 
using System.Fabric; 
using System.Fabric.Description; 
using System.Globalization; 
using System.Net; 
using System.Net.Sockets; 
using System.Reflection; 
using System.Runtime.CompilerServices; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 

using Microsoft.ServiceFabric.Services.Communication.Runtime; 

namespace UdpService 
{ 
    public class UdpCommunicationListener : ICommunicationListener, IDisposable 
    { 
     private readonly CancellationTokenSource processRequestsCancellation = new CancellationTokenSource(); 

     public int Port { get; set; } 

     private UdpClient server; 

     /// <summary> 
     /// Stops the Server Ungracefully 
     /// </summary> 
     public void Abort() 
     { 
      this.StopWebServer(); 
     } 

     /// <summary> 
     /// Stops the Server Gracefully 
     /// </summary> 
     /// <param name="cancellationToken">Cancellation Token</param> 
     /// <returns>Task for Asynchron usage</returns> 
     public Task CloseAsync(CancellationToken cancellationToken) 
     { 
      this.StopWebServer(); 

      return Task.FromResult(true); 
     } 

     /// <summary> 
     /// Free Resources 
     /// </summary> 
     public void Dispose() 
     { 
      this.Dispose(true); 
      GC.SuppressFinalize(this); 
     } 

     /// <summary> 
     /// Initializes Configuration 
     /// </summary> 
     /// <param name="context">Code Package Activation Context</param> 
     public void Initialize(ICodePackageActivationContext context) 
     { 
      EndpointResourceDescription serviceEndpoint = context.GetEndpoint("ServiceEndpoint"); 
      this.Port = serviceEndpoint.Port; 
     } 

     /// <summary> 
     /// Starts the Server 
     /// </summary> 
     /// <param name="cancellationToken">Cancellation Token</param> 
     /// <returns>Task for Asynchron usage</returns> 
     public Task<string> OpenAsync(CancellationToken cancellationToken) 
     { 
      try 
      { 
       this.server = new UdpClient(this.Port); 
      } 
      catch (Exception ex) 
      { 
      } 

      ThreadPool.QueueUserWorkItem((state) => 
      { 
       this.MessageHandling(this.processRequestsCancellation.Token); 
      }); 

      return Task.FromResult("udp://" + FabricRuntime.GetNodeContext().IPAddressOrFQDN + ":" + this.Port); 
     } 

     protected void MessageHandling(CancellationToken cancellationToken) 
     { 
      while (!cancellationToken.IsCancellationRequested) 
      { 
       IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, this.Port); 
       byte[] receivedBytes = this.server.Receive(ref ipEndPoint); 
       this.server.Send(receivedBytes, receivedBytes.Length, ipEndPoint); 
       Debug.WriteLine("Received bytes: " + receivedBytes.Length.ToString()); 
      } 
     } 

     /// <summary> 
     /// Receives the specified endpoint. 
     /// </summary> 
     /// <param name="endpoint">The endpoint.</param> 
     /// <returns></returns> 
     public Task<byte[]> Receive(ref IPEndPoint endpoint) 
     { 
      return Task.FromResult(this.server.Receive(ref endpoint)); 
     } 

     /// <summary> 
     /// Free Resources and Stop Server 
     /// </summary> 
     /// <param name="disposing">Disposing .NET Resources?</param> 
     protected virtual void Dispose(bool disposing) 
     { 
      if (disposing) 
      { 
       if (this.server != null) 
       { 
        try 
        { 
         this.server.Close(); 
         this.server = null; 
        } 
        catch (Exception ex) 
        { 
         ServiceEventSource.Current.Message(ex.Message); 
        } 
       } 
      } 
     } 

     /// <summary> 
     /// Stops Server and Free Handles 
     /// </summary> 
     private void StopWebServer() 
     { 
      this.processRequestsCancellation.Cancel(); 
      this.Dispose(); 
     } 
    } 
} 

最後但並非最不重要的ServiceManifest.xml

<?xml version="1.0" encoding="utf-8"?> 
<ServiceManifest Name="UdpServicePkg" 
       Version="1.0.0" 
       xmlns="http://schemas.microsoft.com/2011/01/fabric" 
       xmlns:xsd="http://www.w3.org/2001/XMLSchema" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> 
    <ServiceTypes> 
    <!-- This is the name of your ServiceType. 
     This name must match the string used in RegisterServiceType call in Program.cs. --> 
    <StatelessServiceType ServiceTypeName="UdpServiceType" /> 
    </ServiceTypes> 

    <!-- Code package is your service executable. --> 
    <CodePackage Name="Code" Version="1.0.0"> 
    <EntryPoint> 
     <ExeHost> 
     <Program>UdpService.exe</Program> 
     </ExeHost> 
    </EntryPoint> 
    </CodePackage> 

    <!-- Config package is the contents of the Config directoy under PackageRoot that contains an 
     independently-updateable and versioned set of custom configuration settings for your service. --> 
    <ConfigPackage Name="Config" Version="1.0.0" /> 

    <Resources> 
    <Endpoints> 
     <!-- This endpoint is used by the communication listener to obtain the port on which to 
      listen. Please note that if your service is partitioned, this port is shared with 
      replicas of different partitions that are placed in your code. --> 
     <Endpoint Name="ServiceEndpoint" Port="5555" /> 
    </Endpoints> 
    </Resources> 
</ServiceManifest>