2013-08-03 72 views
9

我試圖使用AWS管道從S3鬥CSV數據傳輸到DynamoDB,以下是我管行腳本,它不能正常工作,AWS數據管道CSV數據DynamoDB

CSV文件結構

Name, Designation,Company 

A,TL,C1 

B,Prog, C2 

DynamoDb:N_Table,與名稱的哈希值

{ 
"objects": [ 
    { 
     "id": "Default", 
     "scheduleType": "cron", 
     "name": "Default", 
     "role": "DataPipelineDefaultRole", 
     "resourceRole": "DataPipelineDefaultResourceRole" 
    }, 
    { 
     "id": "DynamoDBDataNodeId635", 
     "schedule": { 
      "ref": "ScheduleId639" 
     }, 
     "tableName": "N_Table", 
     "name": "MyDynamoDBData", 
     "type": "DynamoDBDataNode" 
    }, 
    { 
     "emrLogUri": "s3://onlycsv/error", 
     "id": "EmrClusterId636", 
     "schedule": { 
      "ref": "ScheduleId639" 
     }, 
     "masterInstanceType": "m1.small", 
     "coreInstanceType": "m1.xlarge", 
     "enableDebugging": "true", 
     "installHive": "latest", 
     "name": "ImportCluster", 
     "coreInstanceCount": "1", 
     "logUri": "s3://onlycsv/error1", 
     "type": "EmrCluster" 
    }, 
    { 
     "id": "S3DataNodeId643", 
     "schedule": { 
      "ref": "ScheduleId639" 
     }, 
     "directoryPath": "s3://onlycsv/data.csv", 
     "name": "MyS3Data", 
     "dataFormat": { 
      "ref": "DataFormatId1" 
     }, 
     "type": "S3DataNode" 
    }, 
    { 
     "id": "ScheduleId639", 
     "startDateTime": "2013-08-03T00:00:00", 
     "name": "ImportSchedule", 
     "period": "1 Hours", 
     "type": "Schedule", 
     "endDateTime": "2013-08-04T00:00:00" 
    }, 
    { 
     "id": "EmrActivityId637", 
     "input": { 
      "ref": "S3DataNodeId643" 
     }, 
     "schedule": { 
      "ref": "ScheduleId639" 
     }, 
     "name": "MyImportJob", 
     "runsOn": { 
      "ref": "EmrClusterId636" 
     }, 
     "maximumRetries": "0", 
     "myDynamoDBWriteThroughputRatio": "0.25", 
     "attemptTimeout": "24 hours", 
     "type": "EmrActivity", 
     "output": { 
      "ref": "DynamoDBDataNodeId635" 
     }, 
     "step": "s3://elasticmapreduce/libs/script-runner/script-runner.jar,s3://elasticmapreduce/libs/hive/hive-script,--run-hive-script,--hive-versions,latest,--args,-f,s3://elasticmapreduce/libs/hive/dynamodb/importDynamoDBTableFromS3,-d,DYNAMODB_OUTPUT_TABLE=#{output.tableName},-d,S3_INPUT_BUCKET=#{input.directoryPath},-d,DYNAMODB_WRITE_PERCENT=#{myDynamoDBWriteThroughputRatio},-d,DYNAMODB_ENDPOINT=dynamodb.us-east-1.amazonaws.com" 
    }, 
    { 
     "id": "DataFormatId1", 
     "name": "DefaultDataFormat1", 
     "column": [ 
      "Name", 
      "Designation", 
      "Company" 
     ], 
     "columnSeparator": ",", 
     "recordSeparator": "\n", 
     "type": "Custom" 
    } 
] 

}

出的四個步驟在執行管道,兩者得到完成,但它並沒有完全執行

回答

0

我會建議使用datapipeline,而不是定製提供的CSV數據格式。

爲了調試集羣中的錯誤,你可以查找在EMR控制檯jobflow並查看失敗任務的日誌文件。

5

目前(2015-04)默認進口管道模板不支持導入CSV文件。

如果您的CSV文件不是太大(不到1GB左右),您可以創建一個ShellCommandActivity,將CSV轉換爲DynamoDB JSON格式,並將EmrActivity將其導入到您的表格中。

正如你可以創建樣品DynamoDB表,包括你所需要的,填充虛擬值,然後用管道(導出/導入按鈕DynamoDB控制檯)導出記錄的所有字段類型的第一步。這將爲您提供有關導入管道預期格式的想法。類型名稱不明顯,導入活動對於正確的大小寫非常敏感(例如,您應該爲boolean字段使用bool)。

之後,應該很容易創建一個awk腳本(或任何其他文本轉換器,至少在awk中,您可以使用默認的AMI映像作爲您的shell活動),您可以將它們提供給shellCommandActivity。不要忘記啓用「暫存」標記,因此您的輸出將上傳回S3以供導入活動提取。