我有一個flink項目將作爲批處理作業插入cassandra表中的數據。我已經有一個flink流項目,它正在向同一個cassandra表寫入一個pojo,但是cassandraOutputFormat需要這個數據作爲一個Tuple(希望改變爲接受像CassandraSink一樣的pojos)。因此,這裏是我的POJO:Flink cassandraOutputFormat元組需要凍結值
@Table(keyspace="mykeyspace", name="mytablename")
public class AlphaGroupingObject implements Serializable {
@Column(name = "jobId")
private String jobId;
@Column(name = "datalist")
@Frozen("list<frozen<dataobj>")
private List<CustomDataObj> dataobjs;
@Column(name = "userid")
private String userid;
//Getters and Setters
}
而且元組的,我從這個POJO使得數據集:
DataSet<Tuple3<String, List<CustomDataObj>, String>> outputDataSet = listOfAlphaGroupingObject.map(new AlphaGroupingObjectToTuple3Mapper());
這裏是觸發輸出,以及行:
outputDataSet.output(new CassandraOutputFormat<>("INSERT INTO mykeyspace.mytablename (jobid, datalist, userid) VALUES (?,?,?);", clusterThatWasBuilt));
現在,我有這個問題是,當我嘗試運行此,我得到這個錯誤時,嘗試將其輸出到卡桑德拉表:
Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException:
Codec not found for requested operation: [frozen<mykeyspace.dataobj> <-> flink.custom.data.CustomDataObj]
所以我知道它是什麼時候pojo,我只需要將@Frozen註釋添加到該字段,但我不知道如何爲元組做。解決這個問題的最佳方式是什麼?或者我做了一些不必要的事情,因爲實際上有一種方法可以通過cassandraOutputFormat發送pojos,我只是沒有找到?
感謝您提前提供任何幫助!
編輯:
下面是CustomDataObj類太代碼:
@UDT(name="dataobj", keyspace = "mykeyspace")
public class CustomDataObj implements Serializable {
@Field(name = "userid")
private String userId;
@Field(name = "groupid")
private String groupId;
@Field(name = "valuetext")
private String valueText;
@Field(name = "comments")
private String comments;
//Getters and setters
}
EDIT 2
包括卡桑德拉表架構的CustomDataObj綁定到和mytablename模式。
CREATE TYPE mykeyspace.dataobj (
userid text,
groupid text,
valuetext text,
comments text
);
CREATE TABLE mykeyspace.mytablename (
jobid text,
datalist list<frozen<dataobj>>,
userid text,
PRIMARY KEY (jobid, userid)
);
是不是'名單<凍結'?有一個缺失的''' –
是的,它仍然運行良好(這真的很奇怪,它沒有問題)。我也加了一個缺失的「>」來確保。 – Jicaar
添加您的表格和類型架構 –