2015-06-05 125 views
2

TCP報文幀是否有任何阿卡的方式實現數據包幀就像二郎神與{包,4}? 包看起來是這樣的:斯卡拉 - 用阿卡

4 bytes length in big endian | body... 

例如:

00 00 00 05 H E L L O 0 0 0 5 W O R L D 

將兩包 「HELLO」 和 「世界」,但他們收到一個。

或者

00 00 00 05 H E L L 

現在阿卡接收這8個字節,但是一個更多的是人仍下落不明,它會在下次呼叫接受,以「接受」

問題是我演員的接收總是被稱爲部分或全部請求,但我只想在收到「body」部分並且只有在完全收到時才能獲得。

所以所有將需要的是,它首先讀取那些4個字節,然後等待,直到它讀其他N字節(N =長度在4字節報頭),然後把它發送消息給我的演員。以某種方式可能嗎?

我的服務器代碼:

import java.net.InetSocketAddress 

import akka.actor.{Props, Actor} 
import akka.io.Tcp.Bind 
import akka.io.{IO, Tcp} 

class Server extends Actor{ 
    import context.system 
    import Tcp._ 
    IO(Tcp) ! Bind(self, new InetSocketAddress("0.0.0.0", 1234)) 

    def receive ={ 
     case bound @ Bound(localAddr) => 
      println("Server is bound to "+localAddr.toString()) 
     case failed @ CommandFailed(_ : Bind) => 
      context stop self 
     case connected @ Connected(remote, local) => 
      val handler = context.actorOf(Props[ClientHandler]) 
      val connection = sender() 
      println(remote.toString + "connected to "+local.toString()) 

      connection ! Register(handler) 
    } 
} 

回答

3

據我所知,有在阿卡或斯卡拉對此沒有庫函數。 Akka交易ByteString進行閱讀和寫作,所以我把一個特質完全按照你的要求做了。您將它傳遞給您的演員ByteString。然後根據報頭中的數據包長度打斷數據流。它是無狀態的,因此它返回一個包含提取數據包列表的元組以及來自TCP流的任何未使用的數據作爲ByteString。您將新的TCP數據連接到此字節字符串中返回的未使用的流部分,如下例所示。從一個演員

trait Buffering { 

    val MAX_PACKET_LEN: Short = 10000 

    /** 
    * Extracts complete packets of the specified length, preserving remainder 
    * data. If there is no complete packet, then we return an empty list. If 
    * there are multiple packets available, all packets are extracted, Any remaining data 
    * is returned to the caller for later submission 
    * @param data A list of the packets extracted from the raw data in order of receipt 
    * @return A list of ByteStrings containing extracted packets as well as any remaining buffer data not consumed 
    */ 
    def getPacket(data: ByteString): (List[ByteString], ByteString) = { 

    val headerSize = 2 

    @tailrec 
    def multiPacket(packets: List[ByteString], current: ByteString): (List[ByteString], ByteString) = { 
     if (current.length < headerSize) { 
     (packets.reverse, current) 
     } else { 
     val len = current.iterator.getShort 
     if (len > MAX_PACKET_LEN || len < 0) throw new RuntimeException(s"Invalid packet length: $len") 
     if (current.length < len + headerSize) { 
      (packets.reverse, current) 
     } else { 
      val rem = current drop headerSize // Pop off header 
      val (front, back) = rem.splitAt(len) // Front contains a completed packet, back contains the remaining data 
      // Pull of the packet and recurse to see if there is another packet available 
      multiPacket(front :: packets, back) 
     } 
     } 
    } 
    multiPacket(List[ByteString](), data) 
    } 

用法是folllows:

def receive = buffer(CompactByteString()) 

def buffer(buf: ByteString): Receive = { 
    // Messages inbound from the network 
    case Received(data) => 
    val (pkt, remainder) = getPacket(buf ++ data) 
    // Do something with your packet 
    context become buffer(remainder) 
    case Other Stuff => // Etc 
}