0

我想知道如何創建一個AWS數據管道,它可以從S3獲取json文件並將其導入到DynamoDB表中。我能夠創建一些實現這一點的Java代碼,但我想通過數據管道來完成。我可以看到有從DynamoDB導出到S3並導入備份的模板,但我正在努力弄清楚如何導入普通的json文件。通過AWS數據管道將json文件導入到DynamoDB

回答

1

在文檔中,你會發現從DynamoDb導入和導出數據的例子(http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb.html

下面是它是如何與文檔中描述:

要創建管道

開放AWS數據管道控制檯

您看到的第一個屏幕取決於您是否創建了一個 pi在當前區域的皮膚。

如果您尚未在此區域創建管道,控制檯將顯示一個介紹性屏幕 。立即選擇立即開始。

如果您已在此區域中創建管道,則控制檯 將顯示一個頁面,其中列出了該區域的管道。選擇 創建新的管道。

在名稱中,輸入管道的名稱。

(可選)在描述中,爲管道輸入描述。

對於Source,請選擇使用模板構建,然後選擇以下模板 :從S3導入DynamoDB備份數據。

在參數下,輸入S3文件夾 S3:// elasticmapreduce /樣品/商店/ ProductCatalog,這是一個樣本 數據源,並設置DynamoDB表名到表格的名稱。

在Schedule下,選擇管道激活。

在流水線配置下,啓用記錄日誌。在日誌的S3位置選擇文件夾 圖標,選擇您的一個存儲桶或 文件夾,然後選擇選擇。

如果您願意,可以改爲禁用日誌記錄。

在Security/Access下,將IAM角色設置爲Default。

點擊在建築師中編輯。

接下來,根據活動的結果配置AWS Data 管道執行的Amazon SNS通知操作。

配置成功和失敗操作

在右側窗格中單擊活動。

從添加一個可選字段,選擇成功。

從新添加的成功,選擇新建:操作。

從添加一個可選字段,選擇On Fail。

從新添加的失敗中選擇新建:操作。

在右側窗格中,單擊其他。

對於DefaultAction1,請執行下列操作:

將名稱更改爲SuccessSnsAlarm。

從類型中選擇SnsAlarm。

在主題Arn中,輸入您創建的主題的ARN。

輸入主題和消息。

對於DefaultAction2,請執行下列操作:

將名稱更改爲FailureSnsAlarm。

從類型中選擇SnsAlarm。

在主題Arn中,輸入您創建的主題的ARN。

輸入主題和消息。

公共github網站有一些與DynamoDB合作的示例(https://github.com/awslabs/data-pipeline-samples)。以下是管道定義的一個示例:

{ 
    "objects": [ 
     { 
      "occurrences": "1", 
      "period": "1 Day", 
      "name": "RunOnce", 
      "id": "DefaultSchedule", 
      "type": "Schedule", 
      "startAt": "FIRST_ACTIVATION_DATE_TIME", 
     "maxActiveInstances" : "1" 
     }, 
     { 
      "failureAndRerunMode": "CASCADE", 
      "schedule": { 
       "ref": "DefaultSchedule" 
      }, 
      "resourceRole": "DataPipelineDefaultResourceRole", 
      "role": "DataPipelineDefaultRole", 
      "pipelineLogUri": "s3://", 
      "scheduleType": "cron", 
      "name": "Default", 
      "id": "Default" 
     }, 
     { 
      "maximumRetries": "2", 
      "name": "TableBackupActivity", 
      "step": "s3://dynamodb-emr-us-east-1/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')},#{myDDBTableName},#{myDDBReadThroughputRatio}", 
      "id": "TableBackupActivity", 
      "runsOn": { 
       "ref": "EmrClusterForBackup" 
      }, 
      "type": "EmrActivity" 
     }, 
     { 
      "bootstrapAction": "s3://elasticmapreduce/bootstrap-actions/configure-hadoop, --yarn-key-value, yarn.nodemanager.resource.memory-mb=12800,--yarn-key-value,yarn.scheduler.minimum-allocation-mb=256,--mapred-key-value,mapreduce.map.memory.mb=500,--mapred-key-value,mapreduce.map.java.opts=-Xmx400M,--mapred-key-value,mapreduce.job.reduce.slowstart.completedmaps=1,--mapred-key-value,mapreduce.map.speculative=false", 
      "name": "EmrClusterForBackup", 
      "amiVersion": "3.8.0", 
      "id": "EmrClusterForBackup", 
      "type": "EmrCluster", 
      "masterInstanceType": "m1.medium", 
      "coreInstanceType": "#{myInstanceType}", 
      "coreInstanceCount": "#{myInstanceCount}", 
     "terminateAfter" : "12 hours" 
     } 
    ], 
    "parameters": [ 
     { 
      "description": "OutputS3folder", 
      "id": "myOutputS3Loc", 
      "type": "AWS::S3::ObjectKey" 
     }, 
     { 
      "default": "0.2", 
      "watermark": "Valuebetween0.1-1.0", 
      "description": "DynamoDB Read Throughput Ratio", 
      "id": "myDDBReadThroughputRatio", 
      "type": "Double" 
     }, 
     { 
      "description": "DynamoDB Table Name", 
      "id": "myDDBTableName", 
      "type": "String" 
     }, 
     { 
      "description": "Instance Type", 
      "id": "myInstanceType", 
      "watermark" : "Use m1.medium if Read Capacity Units for the job <= 900. Else use m3.xlarge", 
      "type": "String", 
      "default": "m3.xlarge" 
     }, 
     { 
      "description": "Instance Count", 
     "watermark" : "(Read Capacity Units/300) for m1.medium if RCU <= 900. Else (RCU/1500) for m3.xlarge", 
      "id": "myInstanceCount", 
      "type": "Integer", 
      "default": "1" 
     }, 
    { 
     "description" : "Burst IOPs", 
     "watermark" : "Add IOPS to the DDB table by this percent for the duration of the export job", 
      "id"   : "myBurstIOPS", 
      "type"  : "Double", 
      "default"  : "0.0" 
    } 
    ] 
} 
相關問題