2017-01-26 52 views
0

㈣看到了這一點common-aws on github如何使用它,這就是他們的榜樣(僅發件人因爲這是我所需要的):如何使用scala將消息正確發送到amazon sqs隊列?

import com.amazonaws.services.sqs.AmazonSQSAsyncClient 
import com.pellucid.wrap.sqs.AmazonSQSScalaClient 
import com.mfglabs.commons.aws.sqs._ 

val sqs = new AmazonSQSScalaClient(new AmazonSQSAsyncClient(), ec) 
val builder = SQSStreamBuilder(sqs) 

val sender: Flow[String, SendMessageResult, Unit] = 
    Flow[String].map { body => 
    val req = new SendMessageRequest() 
    req.setMessageBody(body) 
    req.setQueueUrl(queueUrl) 
    req 
    } 
    .via(builder.sendMessageAsStream()) 

,但我得到了一些錯誤,我真的不理解這個例子,我需要的是創建一個func,它需要一個case類的列表,將每個列表序列化爲json並將其發送到sqs隊列...就是這樣,所以這是我到目前爲止所嘗試的:

val queueUrl = "the url to my queue" 

//the objects here are of case class type ObjectUploadRequest 
val listOfObjects = List(Obj1, Obj2, Obj3, Obj4, Obj5) 

def pushListToSQS(listOfObjectsRequests: List[ObjectUploadRequest]): Future[SendMessageRequest] = { 
    listOfObjectsRequests.map(objReq => { 
    val ser = swrite(objReq) 
    val sender: Flow[String, SendMessageResult, Unit] = 
     Flow[String].map { body => 
     val req = new SendMessageRequest() 
     req.setMessageBody(body) 
     req.setQueueUrl(queueUrl) 
     req 
     }.via(builder.sendMessageAsStream()) 
    }) 
} 

而我得到這個錯誤:

enter image description here

希望如果有人能幫助,謝謝

回答

0

如果你不介意使用好老AWS的Java SDK和它同步SQS客戶端,那麼這個工作對我來說:

import com.amazonaws.services.sqs.AmazonSQSClient 
import com.amazonaws.services.sqs.model.SendMessageRequest 

val sqs = new AmazonSQSClient() 

listOfObjects.foreach { obj => 
    val json = // convert obj to json 

    sqs.sendMessage(new SendMessageRequest() 
     .withQueueUrl("your queue url") 
     .withMessageBody(json)) 
} 
相關問題