2017-03-16 137 views
0

我正在使用AWS SDK從將數據發佈到Kinesis流的Java應用程序寫入數據。使用下面的代碼一次批量完成10條記錄;將Kinesis中的數據寫入S3

// Convert to JSON object, and then to bytes... 
       ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); 
       String json = ow.writeValueAsString(transaction); 

       // Add byte array to PutRecordsRequestEntry 
       PutRecordsRequestEntry record = new PutRecordsRequestEntry(); 
       record.setPartitionKey(String.valueOf(java.util.UUID.randomUUID())); 
       record.setData(ByteBuffer.wrap(json.getBytes())); 

       // Add to list... 
       batch.add(record); 

       // Check and send batches 
       if(counter>=batchLimit){ 

        logger.info("Sending batch of " + batchLimit + " rows."); 

        putRecordsRequest.setRecords(batch); 
        PutRecordsResult result = amazonKinesisClient.putRecords(putRecordsRequest); 
        batch = new ArrayList<>(); 
        counter=0; 

       }else{ 
        counter++; 
       } 

然後我有室壁運動收到了被觸發。每筆交易的NodeJS lambda函數,這種想法是爲它寫從室壁運動來進行交易,並把它們轉化爲他們的數據來流水流保存到S3。

var AWS = require('aws-sdk'); 
var firehose = new AWS.Firehose(); 

exports.handler = function(event, context) { 

    console.log(event); 

    var params = { 
     DeliveryStreamName: "transaction-postings", 
     Record: { 
      Data: decodeURIComponent(event) 
     } 
    }; 
    firehose.putRecord(params, function(err, data) { 
     if (err) console.log(err, err.stack); // an error occurred 
     else { 
      console.log(data);   // successful response 
     } 

     context.done(); 
    }); 
}; 

然而,在S3上的數據看時,我看到的是下面的,並沒有像我期待的對象的JSON列表...

[object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object] 

可有人請點我什麼我錯過了將Kinesis中的數據作爲JSON對象流向s3嗎?

回答

1
Data: decodeURIComponent(event) 

您需要序列化事件,因爲Lambda會自動對參數進行反序列化。即:

Data: JSON.stringify(decodeURIComponent(event)) 
+0

剛做了改變,仍然得到「[object object]」「[object object]」「[object object]」「[object Object]」「[object Object]」「[object Object]」 「[對象對象]」「[對象對象]」「[對象對象]」「[對象對象]」「[對象對象]」「[對象對象]結果是 – Mez

+0

實際上,當我將這行代碼更改爲「Data:event」時,我在S3中得到了例子:{「Records」:[{「kinesis」:{「kinesisSchemaVersion」:「1.0」,「partitionKey 「:」2a4bb9d9-a023-4c03-8616-ef3e7c567459「,」sequenceNumber「:」49571132156681255058105982946244422009241197082071531522「,」data「:」ewogICJyb3dJZCIgOiA3MjEzMTU0NSwKICAi ......爲什麼我沒有得到上面發送的實際JSON對象? – Mez

+0

我認爲這是因爲當設置數據record.setData(ByteBuffer.wrap(json.getBytes()))...我需要轉換回utf8。 – Mez

0

對於那些想知道所需的代碼更改...要寫入S3從生產者發送的實際信息,需要被解碼的PutRecordsRequestEntry的數據屬性。換句話說,這些代碼塊表示使用的依賴性,拉姆達從室壁運動流解析數據...

var AWS = require('aws-sdk'); 
var firehose = new AWS.Firehose(); 
var firehoseStreamName = "transaction-postings"; 

exports.handler = function(event, context) { 

    // This is the actual transaction, encapsulated with Kinesis Put properties 
    var transaction = event; 

    // Convert data object because this is all that we need 
    var buf = new Buffer(transaction.data, "base64"); 

    // Convert to actual string which is readable 
    var jsonString = buf.toString("utf8"); 

    // Prepare storage to postings firehose stream... 
    var params = { 
     DeliveryStreamName: firehoseStreamName, 
     Record: { 
      Data: jsonString 
     } 
    }; 

    // Store data! 
    firehose.putRecord(params, function(err, data) { 
     if (err) { 

      // This needs to be fired to Kinesis in the future... 
      console.log(err, err.stack); 
     } 
     else{ 
      console.log(data);    
     } 

     context.done(); 
    }); 
}; 

這是因爲,記錄使用AWS生成器依賴性下面

<dependency> 
     <groupId>com.amazonaws</groupId> 
     <artifactId>amazon-kinesis-producer</artifactId> 
     <version>0.12.3</version> 
    </dependency> 

發送會看起來像這樣;

{ 
    "kinesisSchemaVersion": "1.0", 
    "partitionKey": "cb3ff3cd-769e-4d48-969d-918b5378e81b", 
    "sequenceNumber": "49571132156681255058105982949134963643939775644952428546", 
    "data": "[base64 string]", 
    "approximateArrivalTimestamp": 1490191017.614 
} 
相關問題