我想聽聽使用阿卡流SQS阿卡流的狀態,我得到的消息從它的Q 使用此代碼段:如何管理使用TTL
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(ConfigUtils.dtsConfiguration.ioThreadPoolSize))
val awsSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder
.standard()
.withCredentials(new ClasspathPropertiesFileCredentialsProvider())
.withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, ConfigUtils.dtsConfiguration.regionName))
.build()
val future = SqsSource(sqsEndpoint)(awsSqsClient)
.takeWhile(_ => true)
.mapAsync(parallelism = 2)(m => {
val msgBody = SqsMessage.deserializeJson(m.getBody)
msgBody match {
case Right(body) => //for each stream add (body.ID,body.Record.FileContent) or concatenate the new fileContent
// with current map (of same id)
// that for each key in the map - if the filecontent size > 100 kb remove the relevant tuple from
the map and perform an operation on it
}
Future(m, Ack())
})
.to(SqsAckSink(sqsEndpoint)(awsSqsClient))
.run()
我已經在評論代碼中的特定點需要操作流。
我需要基本上是每個記錄獲取從SQS要做到這一點:
我希望把它的內容到地圖[詮釋,字符串]這個int代表鍵,該字符串是記錄的內容。 (對於其他鍵我將連接它的內容,直到它的大小大於1kb)
(就像保存每個鍵的狀態一樣)。
然後我想要執行以下操作:
我想爲每一個元組(不斷地流) 執行操作時,它的內容大小> 1KB,然後從地圖中移除。
我還需要一個ttl的記錄,沒有更新太空peiod說30秒的地圖。
可以使用阿卡流嗎?
謝謝。