2016-10-31 57 views
1

我使用Dynamo DB流+ lamdba作爲觸發器來調用將Dynamo DB數據放入Redshift的kinesis。 有人可以建議一種方法來使用發電機流在不同地區加載Dynamo DB數據到Redshift。Dynamo DB數據到Redhisft

+1

你使用Kinesis Firehose嗎? –

回答

0

我已經編寫了一個程序,可以將數據從Dynamo數據庫移動到Redshift,但沒有數據流就無法工作,您可以看看代碼並查看這是否有助於您的案例,或者您是否獲得與此相對應的任何Idea。

1.使用Redshift創建連接。 2.創建Prepeared語句插入Redshift。 3.使用分頁批處理從Dynamo獲取數據。 4.將批處理數據批量插入到Resdhift中。

public void createConnectionWithRedshift() { 
     final String DB_URL = "jdbc:redshift://ao.cepuhmobd.us-west-2.redshift.amazonaws.com:5439/events"; 
     // final String DB_URL = args[0]; 
     // Database credentials 
     final String USER = "abc"; 
     final String PASS = "abc"; 
     Connection conn = null; 
     try { 
      // STEP 3: Open a connection 
      System.out.println("Connecting to database..."); 
      conn = DriverManager.getConnection(DB_URL, USER, PASS); 
      // createNewTable(conn); 
      // STEP 4: Execute a query 
      preparedStatement = conn.prepareStatement("insert into Events " + "(Vin,timestamp,eventtype,source,data)" + "VALUES (?,?,?,?,?)"); 
     } catch (SQLException se) { 
      se.printStackTrace(); 
     } 
    }// end main 


    public void replicateDynamoToRedshidt(int pages, int batchSize, int scanSize) 
      throws TableNeverTransitionedToStateException, InterruptedException { 
createConnectionWithRedshift();//Redshift Connection 
for (int i = 0; i < pages; i = i + 1) { 
      List<EventLogEntity> results = findAll(new PageRequest(i, batchSize));//Fetching the data from Dynamo in batches 
      List<HeadUnitData> headUnitDataList = headUnitEvents(results); 
      for (int j = 0; j < headUnitDataList.size(); j++) { 
       HeadUnitData headUnitData = headUnitDataList.get(j); 
       insertData(headUnitData.getVin(), headUnitData.getType(), headUnitData.getSource(), headUnitData.getData());//Inserting the data into Redshidt in batches 
      } 
      try { 
       preparedStatement.executeBatch(); 
       System.out.println("Inserted in Database : " + results.size()); 
      } catch (SQLException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     }