2017-09-06 47 views
0

我想聽聽使用阿卡流SQS阿卡流的狀態,我得到的消息從它的Q 使用此代碼段:如何管理使用TTL

implicit val system = ActorSystem() 
implicit val mat = ActorMaterializer() 
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(ConfigUtils.dtsConfiguration.ioThreadPoolSize)) 
val awsSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder 
    .standard() 
    .withCredentials(new ClasspathPropertiesFileCredentialsProvider()) 
    .withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, ConfigUtils.dtsConfiguration.regionName)) 
    .build() 

val future = SqsSource(sqsEndpoint)(awsSqsClient) 
    .takeWhile(_ => true) 
    .mapAsync(parallelism = 2)(m => { 
    val msgBody = SqsMessage.deserializeJson(m.getBody) 
    msgBody match { 
     case Right(body) => //for each stream add (body.ID,body.Record.FileContent) or concatenate the new fileContent 
          // with current map (of same id) 
          // that for each key in the map - if the filecontent size > 100 kb remove the relevant tuple from 
         the map and perform an operation on it 
    } 
    Future(m, Ack()) 
    }) 
    .to(SqsAckSink(sqsEndpoint)(awsSqsClient)) 
    .run() 

我已經在評論代碼中的特定點需要操作流。

我需要基本上是每個記錄獲取從SQS要做到這一點:

我希望把它的內容到地圖[詮釋,字符串]這個int代表鍵,該字符串是記錄的內容。 (對於其他鍵我將連接它的內容,直到它的大小大於1kb)

(就像保存每個鍵的狀態一樣)。

然後我想要執行以下操作:

我想爲每一個元組(不斷地流) 執行操作時,它的內容大小> 1KB,然後從地圖中移除。

我還需要一個ttl的記錄,沒有更新太空peiod說30秒的地圖。

可以使用阿卡流嗎?

謝謝。

回答

0

Flow.statefulMapConcat是一種swizz-army-knife,它可以讓你保持狀態並有條件地向下遊排放價值。如果您只是在元素到達時才高興地丟棄TTL,那麼這也是可行的,但是觸發驅逐的勾號有點棘手,並且可以通過實現自定義GraphStage來代替。

這裏是一個簡化的例子,它將累積每個鍵的值,直到達到限制值,然後向下遊發射。

import scala.collection.immutable.Iterable 
val theThing: Flow[(String, Int), (String, Int), NotUsed] = 
    Flow[(String, Int)] 
    .statefulMapConcat {() => 
     // state kept in factory function scope 
     var state = Map[String, Int]() 

     // for each incoming tuple 
     { 
     case (key, value) => 
      val newValueForKey = state.getOrElse(key, 0) 

      // ... evicting old elements could go here ... 

      if (newValueForKey > 10) { 
      // max size, emit something downstream 
      state = state - key 
      Iterable(key -> newValueForKey) 
      } else { 
      // just update state, don't emit anything 
      state = state + (key -> newValueForKey) 
      Iterable.empty 
      } 
     } 
    } 

要做到這個,你會以某種方式引入蜱元素剔,但這可能還回壓的影響,因此,導致回自定義GraphStage。 GraphStages有一個計時器API,允許他們將滴答作爲不受背壓影響的側通道。您可以在Akka文檔的這一部分找到有關如何實現此類功能的詳細信息:http://doc.akka.io/docs/akka/current/scala/stream/stream-customize.html#custom-linear-processing-stages-using-graphstage