2017-07-03 71 views
0

我正在嘗試使用CassandraPojoSink類來編寫Flink的Cassandra SINK連接器。我沒有收到任何錯誤/異常,但是沒有記錄提交到Cassandra表中。CassandraPojoSink沒有錯誤,但數據沒有寫入cassandra

我正在使用以下代碼。

========= 水槽連接器代碼快照 ==================

DataStream<Event> stream = eventStream.flatMap(new EventTransformation()); 

    try { 
     stream.addSink(new CassandraPojoSink<>(Event.class, new ClusterBuilder() { 

      private static final long serialVersionUID = -2485105213096858846L; 

      @Override 
      public Cluster buildCluster(Cluster.Builder builder) { 
      return builder.addContactPoint("localhost").withPort(9042).build(); 
     } 
     })); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 

====== POJO類 ================

@Table(keyspace= "cloud", name = "event") 
public class Event implements Serializable { 

    private static final long serialVersionUID = 3284839826384795926L; 

    @Column(name = "name") 
    private String name; 

    @Column(name = "msg") 
     private String msg; 

    public Event(){ 

    } 

    //...... 

} 

回答

0

有許多原因弗林克工作可能無法產生任何輸出。一些常見的原因包括:

  • 應用程序不調用env.execute()
  • 應用程序設置爲使用事件時間,但沒有水印的邏輯是某種困惑水印產生
  • ,和正在生成沒有水印(例如,應用程序是基於所述CPU時鐘,而不是事件時間戳水印,造成每一個事件要晚)
0

在改變POJO到元組,添加時間戳水印碼被正常使用。 我能看到我的數據被寫入Cassandra數據庫。

的數據流中>事件= event_stream.flatMap(新EventTransformation())。assignTimestampsAndWatermarks( 新AssignerWithPeriodicWatermarks>(){

 private static final long serialVersionUID = 1L; 
     private final long maxOutOfOrderness = 1_000L; // 1 
     // second 
     private long currentMaxTimestamp = 0; 

     @Override 
     public long extractTimestamp(Tuple3<String, String, Long> arg0, 
              long arg1) { 
       long timestamp = arg0.f3; // get 
       currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); 
       return timestamp; 
     } 

     @Override 
     public Watermark getCurrentWatermark() { 
      return new Watermark(currentMaxTimestamp - maxOutOfOrderness); 
     } 
    }); 

       event_stream.addSink(new CassandraTupleSink<Tuple3<String, String, Long>("INSERT INTO cloud.condition (name, msg, time) VALUES (?,?,?);", new ClusterBuilder() { 

          /** 
          * 
          */ 
          private static final long serialVersionUID = 1L; 

          @Override 
          protected Cluster buildCluster(Builder builder) { 
           return builder.addContactPoint("localhost").withPort(9042).build(); 
          } 
         })); 

       env.setParallelism(2); 

       env.execute(); 
相關問題