2017-05-17 56 views
0

我有一個flink項目將作爲批處理作業插入cassandra表中的數據。我已經有一個flink流項目,它正在向同一個cassandra表寫入一個pojo,但是cassandraOutputFormat需要這個數據作爲一個Tuple(希望改變爲接受像CassandraSink一樣的pojos)。因此,這裏是我的POJO:Flink cassandraOutputFormat元組需要凍結值

@Table(keyspace="mykeyspace", name="mytablename") 
public class AlphaGroupingObject implements Serializable { 

    @Column(name = "jobId") 
    private String jobId; 
    @Column(name = "datalist") 
    @Frozen("list<frozen<dataobj>") 
    private List<CustomDataObj> dataobjs; 
    @Column(name = "userid") 
    private String userid; 

    //Getters and Setters 
} 

而且元組的,我從這個POJO使得數據集:

DataSet<Tuple3<String, List<CustomDataObj>, String>> outputDataSet = listOfAlphaGroupingObject.map(new AlphaGroupingObjectToTuple3Mapper()); 

這裏是觸發輸出,以及行:

outputDataSet.output(new CassandraOutputFormat<>("INSERT INTO mykeyspace.mytablename (jobid, datalist, userid) VALUES (?,?,?);", clusterThatWasBuilt)); 

現在,我有這個問題是,當我嘗試運行此,我得到這個錯誤時,嘗試將其輸出到卡桑德拉表:

Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: 
Codec not found for requested operation: [frozen<mykeyspace.dataobj> <-> flink.custom.data.CustomDataObj] 

所以我知道它是什麼時候pojo,我只需要將@Frozen註釋添加到該字段,但我不知道如何爲元組做。解決這個問題的最佳方式是什麼?或者我做了一些不必要的事情,因爲實際上有一種方法可以通過cassandraOutputFormat發送pojos,我只是沒有找到?

感謝您提前提供任何幫助!

編輯:

下面是CustomDataObj類太代碼:

@UDT(name="dataobj", keyspace = "mykeyspace") 
public class CustomDataObj implements Serializable { 


    @Field(name = "userid") 
    private String userId; 

    @Field(name = "groupid") 
    private String groupId; 

    @Field(name = "valuetext") 
    private String valueText; 

    @Field(name = "comments") 
    private String comments; 

    //Getters and setters 
} 

EDIT 2

包括卡桑德拉表架構的CustomDataObj綁定到和mytablename模式。

CREATE TYPE mykeyspace.dataobj (
    userid text, 
    groupid text, 
    valuetext text, 
    comments text 
); 

CREATE TABLE mykeyspace.mytablename (
    jobid text, 
    datalist list<frozen<dataobj>>, 
    userid text, 
    PRIMARY KEY (jobid, userid) 
); 
+0

是不是'名單<凍結'?有一個缺失的''' –

+0

是的,它仍然運行良好(這真的很奇怪,它沒有問題)。我也加了一個缺失的「>」來確保。 – Jicaar

+0

添加您的表格和類型架構 –

回答

0

我相信我已經找到不必提供一個元組到cassandraOutputFormat一個更好的辦法,但它在技術上仍然沒有回答這個問題,所以我不會紀念這個作爲回答。我最終使用了cassandra的對象映射器,所以我可以將pojo發送到表中。仍然需要驗證數據是否已成功上傳,並確保所有內容都能夠正確執行,但我認爲這有助於任何面臨類似問題的人。

這裏是概括溶液中DOC:http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/using/

0

添加UDT詮釋CustomDataObj

@UDT(name = "dataobj") 
public class CustomDataObj { 
    //...... 
} 

編輯

變化jobid詮釋@Column(name = "jobid")dataobjs冷凍詮釋@Frozen

@Table(keyspace="mykeyspace", name="mytablename") 
public class AlphaGroupingObject implements Serializable { 

    @Column(name = "jobid") 
    private String jobId; 

    @Column(name = "datalist") 
    @Frozen 
    private List<CustomDataObj> dataobjs; 
    @Column(name = "userid") 
    private String userid; 

    //Getters and Setters 
} 
+0

我已經。我會更新我的問題以顯示它。 – Jicaar

+0

在第一次編輯中進行了所做的更改,但仍引發同樣的錯誤。 – Jicaar

+0

檢查此http://docs.datastax.com/en/developer/java-driver/3.1/manual/custom_codecs/#creating-custom-codecs-for-user-defined-types-ud-ts –