2016-07-01 118 views
0

我在Spark Streaming中使用Java編寫簡單的數據管道,從Kafka中提取JSON數據,將JSON解析爲自定義類(Transaction),然後插入數據轉換成卡桑德拉表,但我無法使mapToRow()函數正常工作。Spark Streaming - Java - 從Kafka插入JSON到Cassandra

我見過噸的例子是說,你所要做的就是沿着這個線的東西:

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
     streamingContext, 
     String.class, 
     String.class, 
     StringDecoder.class, 
     StringDecoder.class, 
     kafkaParams, 
     topicsSet 
); 

JavaDStream<String> lines = stream.map(
    new Function<Tuple2<String,String>, String>(){ 
     @Override 
     public String call(Tuple2<String,String> tuple2) { 
      return tuple2._2(); 
     } 
    } 
); 

javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra(); 

然而,當我這樣做,我得到的錯誤:

The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions 

我想我所缺乏的是我班上的某種裝飾,但是我沒有成功地弄清楚哪一個。我試着去光禿禿的骨頭,從根本上讓類的屬性袋:

public class Transaction implements java.io.Serializable{ 

    public int TransactionId; 
    ... 

    public Transaction(){} 
} 

我已經嘗試了所有的DataStax映射註釋:

@Table(keyspace = "myKeyspace", name = "myTableName", 
     readConsistency = "QUORUM", 
     writeConsistency = "QUORUM", 
     caseSensitiveKeyspace = false, 
     caseSensitiveTable = false) 
public class Transaction implements java.io.Serializable{ 

    @PartitionKey(0) 
    @Column(name="transaction_id") 
    public int TransactionId; 
    ... 

    public Transaction(){} 
} 

我也嘗試建立公共get/set方法每個屬性和設置屬性以私人:

public class Transaction implements java.io.Serializable{ 

    private int transactionId; 
    ... 

    public Transaction(){} 

    public int getTransactionId() { 
     return transactionId; 
    } 

    public void setTransactionId(int transactionId) { 
     this.transactionId = transactionId; 
    } 
} 

我已經能夠解析DStream到的RDD 210使用下面的類:

public class Transaction implements java.io.Serializable{ 

    ... 

    public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> { 
     public Iterable<Transaction> call(Iterator<String> lines) throws Exception { 
      ArrayList<Transaction> transactions = new ArrayList<Transaction>(); 
       ObjectMapper mapper = new ObjectMapper(); 
       while (lines.hasNext()) { 
        String line = lines.next(); 
        try { 
         transactions.add(mapper.readValue(line, Transaction.class)); 
        } catch (Exception e) { 
         System.out.println("Skipped:" + e); 
        } 
       } 

       return transactions; 
     } 
    } 
} 

在用下面的代碼的同時,作用在lines對象上從上方:

JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON()); 

但是,一旦我有它在此從它仍然不工作用writeBuilder()。saveToCassandra()鏈。

這裏的任何幫助,非常感謝。

回答

0

原來這個問題只是一個導入問題。我曾導入com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*,認爲它會給我所需的所有東西,但我還需要爲.mapToRow()函數引入com.datastax.spark.connector.japi.CassandraJavaUtil.*

一旦我解決了這一點,我開始收到以下錯誤:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$ 
    at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5) 
    at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala) 
    at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38) 
    at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10) 
    at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93) 
    at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala) 
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204) 
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222) 
    at globalTransactions.Process.main(Process.java:77) 
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$ 
    at java.net.URLClassLoader.findClass(Unknown Source) 
    at java.lang.ClassLoader.loadClass(Unknown Source) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) 
    at java.lang.ClassLoader.loadClass(Unknown Source) 
    ... 9 more 

這是由在火花SQL項目拉動解決:

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-sql_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

希望這有助於未來的傢伙/加侖。

相關問題