我正在嘗試使用CassandraPojoSink類來編寫Flink的Cassandra SINK連接器。我沒有收到任何錯誤/異常,但是沒有記錄提交到Cassandra表中。CassandraPojoSink沒有錯誤,但數據沒有寫入cassandra
我正在使用以下代碼。
========= 水槽連接器代碼快照 ==================
DataStream<Event> stream = eventStream.flatMap(new EventTransformation());
try {
stream.addSink(new CassandraPojoSink<>(Event.class, new ClusterBuilder() {
private static final long serialVersionUID = -2485105213096858846L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("localhost").withPort(9042).build();
}
}));
} catch (Exception e) {
e.printStackTrace();
}
====== POJO類 ================
@Table(keyspace= "cloud", name = "event")
public class Event implements Serializable {
private static final long serialVersionUID = 3284839826384795926L;
@Column(name = "name")
private String name;
@Column(name = "msg")
private String msg;
public Event(){
}
//......
}