2016-10-29 46 views
2

插入在卡桑德拉表/更新行的最好方法這是如何通過得到卡桑德拉表數據的方式的Java +星火+ SparkSession什麼是通過Java +星火+ SparkSession

SparkSession spark = SparkSession 
      .builder() 
      .appName("JavaDemoDataSet") 
      .config("spark.sql.warehouse.dir", "/file:C:/temp") 
      .config("spark.cassandra.connection.host", "127.0.0.1") 
      .config("spark.cassandra.connection.port", "9042") 
      .master("local[2]") 
      .getOrCreate(); 

Dataset<Row> dataset = spark.read() 
     .format("org.apache.spark.sql.cassandra") 
     .options(new HashMap<String, String>() { 
      { 
       put("keyspace", "chat"); 
       put("table", "dictionary"); 
      } 
     }) 
     .load() 
     .filter("value_id BETWEEN 1 AND 5"); 

但當我正在研究如何在此表中添加或修改行時(至少有一行) - 我找不到最好的方法。 例如,我正在用GUI開發簡單的應用程序,我需要爲「Dictionary」表添加一個新值。所以,在這種情況下,從我的角度來看 - 我不需要DataSet來做到這一點。

當我正在研究如何通過SparkSession添加單個行時 - 我無法找到Java + Spark + Sparksession示例如何做到這一點。 我絕對可以通過聲明與CQL語句做到這一點,但是哪種方法可以最好地更新或添加1行或2行?特別是當我使用SparkSession來閱讀它們時。如果可能的話,我會非常欣賞這樣的例子(甚至超鏈接,我研究了很多,但可能是我錯過了一些重要的東西),因爲我對這些都很陌生。

謝謝!

回答

1

這裏是保存和使用Java + SparkSession + CassandraConnector讀取樣品。

public class SparkCassandraDatasetApplication { 
public static void main(String[] args) { 
    SparkSession spark = SparkSession 
     .builder() 
     .appName("SparkCassandraDatasetApplication") 
     .config("spark.sql.warehouse.dir", "/file:C:/temp") 
     .config("spark.cassandra.connection.host", "127.0.0.1") 
     .config("spark.cassandra.connection.port", "9042") 
     .master("local") 
     .getOrCreate(); 

    //Data 
    MyData data = new MyData(); 
    data.setId("111"); 
    data.setUsername("userOne"); 
    List<MyData> users = Arrays.asList(data); 
    Dataset<MyData> datasetWrite = spark.createDataset(users, Encoders.bean(MyData.class)); 

    //Save data to Cassandra 
    datasetWrite.write().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() { 
     { 
      put("keyspace", "mykeyspace"); 
      put("table", "mytable"); 
     } 
    }).mode(SaveMode.Append).save(); 

    //Read data back 
    Dataset<Row> datasetRead = spark.read().format("org.apache.spark.sql.cassandra") 
      .options(new HashMap<String, String>() { 
       { 
        put("keyspace", "mykeyspace"); 
        put("table", "mytable"); 
       } 
      }).load(); 

    datasetRead.show(); 
    spark.stop(); 
    } 
} 
+0

感謝你的回答,只是一件事 - 當我在DataSet中獲得6時,我的數據庫中只有最後一行,我該怎麼處理它? '的System.out.println( 「數據集項目金額:」 + datasetWrite.count()); //我在控制檯中得到了6這裏 \t \t //數據保存到卡桑德拉 \t datasetWrite.write()格式( 「org.apache.spark.sql.cassandra」) \t可供選項(新的HashMap <字符串,字符串>(){ \t { \t放( 「密鑰空間」, 「聊天」); \t放( 「表」 ,「dictionary」); \t} \t})。mode(SaveMode.Append).save();' –

+1

檢查您的數據,否則代碼應該正常工作。如果數據已經存在於數據庫中,SaveMode.Append將追加數據。 – abaghel

+0

固定,謝謝!我認爲我應該使用foreach來解決我的問題 –