1
我使用Dynamo DB流+ lamdba作爲觸發器來調用將Dynamo DB數據放入Redshift的kinesis。 有人可以建議一種方法來使用發電機流在不同地區加載Dynamo DB數據到Redshift。Dynamo DB數據到Redhisft
我使用Dynamo DB流+ lamdba作爲觸發器來調用將Dynamo DB數據放入Redshift的kinesis。 有人可以建議一種方法來使用發電機流在不同地區加載Dynamo DB數據到Redshift。Dynamo DB數據到Redhisft
我已經編寫了一個程序,可以將數據從Dynamo數據庫移動到Redshift,但沒有數據流就無法工作,您可以看看代碼並查看這是否有助於您的案例,或者您是否獲得與此相對應的任何Idea。
1.使用Redshift創建連接。 2.創建Prepeared語句插入Redshift。 3.使用分頁批處理從Dynamo獲取數據。 4.將批處理數據批量插入到Resdhift中。
public void createConnectionWithRedshift() {
final String DB_URL = "jdbc:redshift://ao.cepuhmobd.us-west-2.redshift.amazonaws.com:5439/events";
// final String DB_URL = args[0];
// Database credentials
final String USER = "abc";
final String PASS = "abc";
Connection conn = null;
try {
// STEP 3: Open a connection
System.out.println("Connecting to database...");
conn = DriverManager.getConnection(DB_URL, USER, PASS);
// createNewTable(conn);
// STEP 4: Execute a query
preparedStatement = conn.prepareStatement("insert into Events " + "(Vin,timestamp,eventtype,source,data)" + "VALUES (?,?,?,?,?)");
} catch (SQLException se) {
se.printStackTrace();
}
}// end main
public void replicateDynamoToRedshidt(int pages, int batchSize, int scanSize)
throws TableNeverTransitionedToStateException, InterruptedException {
createConnectionWithRedshift();//Redshift Connection
for (int i = 0; i < pages; i = i + 1) {
List<EventLogEntity> results = findAll(new PageRequest(i, batchSize));//Fetching the data from Dynamo in batches
List<HeadUnitData> headUnitDataList = headUnitEvents(results);
for (int j = 0; j < headUnitDataList.size(); j++) {
HeadUnitData headUnitData = headUnitDataList.get(j);
insertData(headUnitData.getVin(), headUnitData.getType(), headUnitData.getSource(), headUnitData.getData());//Inserting the data into Redshidt in batches
}
try {
preparedStatement.executeBatch();
System.out.println("Inserted in Database : " + results.size());
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
你使用Kinesis Firehose嗎? –