2

我是Amazon DynamoDB的新手。我目前有20000行需要添加到表中。但是,根據我讀過的內容,似乎我最多隻能使用25個WriteRequests的BatchWriteItem類一次寫入25行。可以增加這個嗎?我怎樣才能一次寫超過25行?目前大約需要15分鐘來編寫所有20000行。謝謝。如何爲DynamoDB在表中寫入超過25個項目/行?

回答

4

只能發送最多25個項目在一個單一的BatchWriteItem請求,但是隻要你想一次就可以發送儘可能多的BatchWriteItem請求。假設你有provisioned enough write throughput,你應該能夠通過在多個線程/進程/主機之間分割這些20k行並將它們並行推送到數據庫來顯着加速。

這也許對小數據集的位重量級的,但你可以使用AWS Data Pipeline從S3提取數據。它基本上自動化了創建Hadoop集羣的過程,以便從S3中抽取數據並通過一堆並行的BatchWriteItem請求將其發送到DynamoDB。

+0

謝謝大衛。我會嘗試使用一些並行線程。吞吐量是多少? – codeshark

+0

如果我使用AWS Data Pipeline,是否意味着我應該將所有數據從我的應用程序輸出到S3中?是輸出到S3 ==>數據管道==> DynamoDB與對比的好處。直接寫入DynamoDB將文件寫入S3的速度? – codeshark

+0

我在鏈接中編輯了有關預配置吞吐量的更多信息,但簡短的故事是您在創建表時提前告訴DynamoDB每秒要對其執行的讀/寫數量。如果您發送的請求比這更快,超出的請求將被拒絕。 –

1

我一直在尋找一些代碼使用JavaScript SDK做到這一點。我找不到它,所以我把它放在一起。我希望這可以幫助別人!

function multiWrite(table, data, cb) { 
    var AWS = require('aws-sdk'); 
    var db = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'}); 

    // Build the batches 
    var batches = []; 
    var current_batch = []; 
    var item_count = 0; 
    for(var x in data) { 
     // Add the item to the current batch 
     item_count++; 
     current_batch.push({ 
      PutRequest: { 
       Item: data[x] 
      } 
     }); 
     // If we've added 25 items, add the current batch to the batches array 
     // and reset it 
     if(item_count%25 == 0) { 
      batches.push(current_batch); 
      current_batch = []; 
     } 
    } 
    // Add the last batch if it has records and is not equal to 25 
    if(current_batch.length > 0 && current_batch.length != 25) batches.push(current_batch); 

    // Handler for the database operations 
    var completed_requests = 0; 
    var errors = false; 
    function handler(request) { 
     return function(err, data) { 
      // Increment the completed requests 
      completed_requests++; 

      // Set the errors flag 
      errors = (errors) ? true : err; 

      // Log the error if we got one 
      if(err) { 
       console.error(JSON.stringify(err, null, 2)); 
       console.error("Request that caused database error:"); 
       console.error(JSON.stringify(request, null, 2)); 
      } 

      // Make the callback if we've completed all the requests 
      if(completed_requests == batches.length) { 
       cb(errors); 
      } 
     } 
    } 

    // Make the requests 
    var params; 
    for(x in batches) { 
     // Items go in params.RequestItems.id array 
     // Format for the items is {PutRequest: {Item: ITEM_OBJECT}} 
     params = '{"RequestItems": {"' + table + '": []}}'; 
     params = JSON.parse(params); 
     params.RequestItems[table] = batches[x]; 

     // Perform the batchWrite operation 
     db.batchWrite(params, handler(params)); 
    } 
} 
0
function putInHistory(data,cb) { 
    var arrayOfArray25 = _.chunk(data, 25); 
    async.every(arrayOfArray25, function(arrayOf25, callback) { 
    var params = { 
    RequestItems: { 
    [TABLES.historyTable]: [] 
    } 
}; 
arrayOf25.forEach(function(item){ 
    params.RequestItems[TABLES.historyTable].push({ 
    PutRequest: { 
     Item: item 
    } 
    }) 
}); 
docClient.batchWrite(params, function(err, data) { 
    if (err){ 
    console.log(err); 
    callback(err); 
    } else { 
    console.log(data); 
    callback(null, true); 
    }; 
}); 
}, function(err, result) { 
if(err){ 
    cb(err); 
} else { 
    if(result){ 
    cb(null,{allWritten:true}); 
    } else { 
    cb(null,{allWritten:false}); 
    } 
} 
}); 
} 

您可以使用lodash使從陣列的數據塊,然後使用異步庫的每個/所有的方法都是做batchWrite上25種元素的大塊