2015-10-29 49 views
3

我使用DSE 4.7 datastax-enterprise,C * 2.1.5,spark 1.2.1,並需要將數據從大表遷移到新的空表具有不同的模式和額外的列,需要從大表中的一個切除列生成。C *遷移 - 將1B錶行數據移動到新的模式表中

我知道,表中的數據與新架構中的另一個表遷移可以通過火花或複製命令在cqlsh進行到CSV文件,但我感興趣的是工具,可以給我長爲未來遷移提供長期解決方案,以及管理和規劃遷移的更多選擇。

我認爲這是一個常見的問題,我沒有找到任何固體解決方案。

任何想法?

+1

數據框直到1.4纔可用,因此您必須升級。我將分享一個可以從頭開始並調整用例的示例工作。 – phact

+1

查看https://github.com/rssvihla/spark_commons/blob/master/examples/spark_bulk_operations/src/main/scala/pro/foundev/scala/SchemaMigration.scala – phact

+1

https://github.com/rssvihla https://github.com/rssvihla /spark_commons/blob/master/examples/spark_bulk_operations/src/main/scala/pro/foundev/scala/Cas​​sandraCapable.scala#L69 – phact

回答

1

我一直堅信Spark是這份工作的最佳工具。 我測試了下面的代碼,結果很好。

import java.sql.Date 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.sql.{Row, SQLContext} 
import com.datastax.spark.connector._ 
import com.datastax.spark.connector.cql.CassandraConnector 
import java.sql._ 
import com.github.nscala_time.time.Imports._ 


object Migration { 
    def main(args: scala.Array[String]) { 

    def changeDate(created: java.util.Date) : String = { 
     var sDate = new DateTime(created) 
     var sDay = sDate.getDayOfMonth() 
     var sMonth = sDate.getMonthOfYear() 
     var sYear = sDate.getYear() 
     var created_date = "" + sYear + "-" + sMonth + "-" + sDay 
     created_date //return 
    } 

    //spark configuration 
    val conf = new SparkConf().setAppName("migration") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 
    val connector = CassandraConnector(conf) 

    val rdd = sc.cassandraTable("keyspace", "table_a") 

    println("Starting migration...") 

    rdd.map(row => { 
     val x = new java.util.Date(row.getLong("x")) 
     val y = new java.util.Date(row.getLong("y")) 
     val z = row.getString("z") 
     val t = row.getString("t") 
     val k = changeDate(x) 

     connector.withSessionDo(session => { 
      val statement = session.prepare(s"INSERT INTO keyspace.table_b (k, y, z, x, t) " + "values (?, ?, ?, ?, ?)") 
      val bound = statement.bind(k, y, z, x, t) 
      session.executeAsync(bound) 
     }) 
    }).foreach(x => x.getUninterruptibly()) 

    println("Done.") 



} }