我在Spark Streaming中使用Java編寫簡單的數據管道,從Kafka中提取JSON數據,將JSON解析爲自定義類(Transaction
),然後插入數據轉換成卡桑德拉表,但我無法使mapToRow()
函數正常工作。Spark Streaming - Java - 從Kafka插入JSON到Cassandra
我見過噸的例子是說,你所要做的就是沿着這個線的東西:
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
JavaDStream<String> lines = stream.map(
new Function<Tuple2<String,String>, String>(){
@Override
public String call(Tuple2<String,String> tuple2) {
return tuple2._2();
}
}
);
javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra();
然而,當我這樣做,我得到的錯誤:
The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions
我想我所缺乏的是我班上的某種裝飾,但是我沒有成功地弄清楚哪一個。我試着去光禿禿的骨頭,從根本上讓類的屬性袋:
public class Transaction implements java.io.Serializable{
public int TransactionId;
...
public Transaction(){}
}
我已經嘗試了所有的DataStax映射註釋:
@Table(keyspace = "myKeyspace", name = "myTableName",
readConsistency = "QUORUM",
writeConsistency = "QUORUM",
caseSensitiveKeyspace = false,
caseSensitiveTable = false)
public class Transaction implements java.io.Serializable{
@PartitionKey(0)
@Column(name="transaction_id")
public int TransactionId;
...
public Transaction(){}
}
我也嘗試建立公共get/set方法每個屬性和設置屬性以私人:
public class Transaction implements java.io.Serializable{
private int transactionId;
...
public Transaction(){}
public int getTransactionId() {
return transactionId;
}
public void setTransactionId(int transactionId) {
this.transactionId = transactionId;
}
}
我已經能夠解析DStream
到的RDD
210使用下面的類:
public class Transaction implements java.io.Serializable{
...
public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> {
public Iterable<Transaction> call(Iterator<String> lines) throws Exception {
ArrayList<Transaction> transactions = new ArrayList<Transaction>();
ObjectMapper mapper = new ObjectMapper();
while (lines.hasNext()) {
String line = lines.next();
try {
transactions.add(mapper.readValue(line, Transaction.class));
} catch (Exception e) {
System.out.println("Skipped:" + e);
}
}
return transactions;
}
}
}
在用下面的代碼的同時,作用在lines
對象上從上方:
JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON());
但是,一旦我有它在此從它仍然不工作用writeBuilder()。saveToCassandra()鏈。
這裏的任何幫助,非常感謝。