2016-09-13 92 views
0

您好我是很新的火花和Scala,在這裏,我面臨着將數據保存到Cassandra的一些問題,下面是我的方案的用戶定義的對象保存JavaList卡桑德拉表使用火花背景

1)我得到的名單(說用戶對象,其中包含firstName,lastName等..)從我的java類到scala類,到這裏它的罰款我能夠訪問用戶對象,並能夠打印其內容

2)現在我想保存usersList進入卡桑德拉表使用火花上下文,我已經經歷了很多例子,但我看到創建的每個地方Seq與我們的caseClass和硬編碼值,然後保存到卡珊德拉,我已經試過了,對我工作的罰款如下

import scala.collection.JavaConversions._ 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 

import com.datastax.spark.connector._ 
import java.util.ArrayList 

object SparkCassandra extends App { 
    val conf = new SparkConf() 
     .setMaster("local[*]") 
     .setAppName("SparkCassandra") 
     //set Cassandra host address as your local address 
     .set("spark.cassandra.connection.host", "127.0.0.1") 
    val sc = new SparkContext(conf) 
    val usersList = Test.getUsers 
    usersList.foreach(x => print(x.getFirstName)) 
    val collection = sc.parallelize(Seq(userTable("testName1"), userTable("testName1"))) 
    collection.saveToCassandra("demo", "user", SomeColumns("name")) 
    sc.stop() 
} 

case class userTable(name: String) 

但在這裏我的要求是從我usersList而不是hardcoaded值,或任何其他方式使用動態值做到這一點。

+0

有多少用戶?這些值在哪裏存儲? –

+0

將有高達20k的用戶,actullay我從其他一些javaClass獲得該列表,並需要存儲在cassandra表 –

+0

只要你並行,它應該工作。如何創建一個包含「userList」中的「userTable」的所有案例類對象並並保存並保存的Seq? – Sreekar

回答

0

最後我得到了我的要求,測試,如下做工精細的解決方案:

我的Scala代碼:

import scala.collection.JavaConversions.asScalaBuffer 
import scala.reflect.runtime.universe 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 
import com.datastax.spark.connector.SomeColumns 
import com.datastax.spark.connector.toNamedColumnRef 
import com.datastax.spark.connector.toRDDFunctions 

object JavaListInsert { 
    def randomStores(sc: SparkContext, users: List[User]): RDD[(String, String, String)] = { 
     sc.parallelize(users).map { x => 
     val fistName = x.getFirstName 
     val lastName = x.getLastName 
     val city = x.getCity 
     (fistName, lastName, city) 
    } 
    } 

    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("cassandraInsert") 
    val sc = new SparkContext(conf) 
    val usersList = Test.getUsers.toList 
    randomStores(sc, usersList). 
     saveToCassandra("test", "stores", SomeColumns("first_name", "last_name", "city")) 
    sc.stop 
    } 
} 

的Java POJO的對象:

import java.io.Serializable; 
    public class User implements Serializable{ 
     private static final long serialVersionUID = -187292417543564400L; 
     private String firstName; 
     private String lastName; 
     private String city; 

     public String getFirstName() { 
      return firstName; 
     } 

     public void setFirstName(String firstName) { 
      this.firstName = firstName; 
     } 

     public String getLastName() { 
      return lastName; 
     } 

     public void setLastName(String lastName) { 
      this.lastName = lastName; 
     } 

     public String getCity() { 
      return city; 
     } 

     public void setCity(String city) { 
      this.city = city; 
     } 
} 

的Java類返回列表的用戶:

import java.util.ArrayList; 
import java.util.List; 


public class Test { 
    public static List<User> getUsers() { 
     ArrayList<User> usersList = new ArrayList<User>(); 
     for(int i=1;i<=100;i++) { 
      User user = new User(); 
      user.setFirstName("firstName_+"+i); 
      user.setLastName("lastName_+"+i); 
      user.setCity("city_+"+i); 
      usersList.add(user); 
     } 
     return usersList; 
    } 
} 
0

如果創建RDDCassandraRow對象,則可以直接保存結果而不必指定列或大小寫類。另外,CassandraRow擁有非常方便的fromMap功能,因此您可以將行定義爲Map對象,將其轉換並保存結果。

例子:

val myData = sc.parallelize(
    Seq(
    Map("name" -> "spiffman", "address" -> "127.0.0.1"), 
    Map("name" -> "Shabarinath", "address" -> "127.0.0.1") 
) 
) 

val cassandraRowData = myData.map(rowMap => CassandraRow.fromMap(rowMap)) 

cassandraRowData.saveToCassandra("keyspace", "table") 
+0

thanx答覆,這裏我的要求是不使用硬編碼值來代替「spiffman」和「 shabarinath「我需要使用列表對象的值 –

+0

列表對象的類型是什麼?你可以將它轉換爲Map嗎? – spiffman

+0

包含firstname和getter和setter的簡單pojo用戶對象列表,我想存儲該列表 –