2015-10-20 28 views
2

我有這個程序正在讀取parquet文件並將其寫入到MemSQL表中。我可以確認火花正確讀取文件爲MemSQL Spark連接器從Spark插入空值到MemSQL

df.printSchema() 
df.show(5) 

正確打印模式和數據。

當我查詢表格時,我得到了所有行的NULL值。表中的一切都是NULL。我不確定這裏發生了什麼問題。

其寫入文件拼花的代碼memsql

package com.rb.scala 

    import com.memsql.spark.context.MemSQLContext 
    import java.sql.{ DriverManager, ResultSet, Connection, Timestamp } 

    import org.apache.spark._ 
    import org.apache.spark.sql._ 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.sql.catalyst.expressions.RowOrdering 

    import com.memsql.spark.connector._ 
    import com.memsql.spark.connector.OnDupKeyBehavior._ 
    import com.memsql.spark.connector.dataframe._ 
    import com.memsql.spark.connector.rdd._ 

    import scala.util.control.NonFatal 
    import org.apache.log4j.Logger 
    object MemSQLWriter { 

    def main(arg: Array[String]) { 

    var logger = Logger.getLogger(this.getClass()) 

    if (arg.length < 1) { 
     logger.error("=> wrong parameters number") 
     System.err.println("Usage: MainExample <directory containing the source files to be loaded to database > ") 
     System.exit(1) 
    } 

    val jobName = "MemSQLWriter" 
    val conf = new SparkConf().setAppName(jobName) 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 
    val pathToFiles = arg(0) 
    logger.info("=> jobName \"" + jobName + "\"") 
    logger.info("=> pathToFiles \"" + pathToFiles + "\"") 
    val dbHost = "xx.xx.xx.xx" 
    val dbPort = 3306 
    val dbName = "memsqlrdd_db" 
    val user = "root" 
    val password = "" 
    val tableName = "target_table" 
    val dbAddress = "jdbc:mysql://" + dbHost + ":" + dbPort 
    val df = sqlContext.read.parquet("/projects/example/data/") 
    val conn = DriverManager.getConnection(dbAddress, user, password) 
    val stmt = conn.createStatement 
    stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName) 
    stmt.execute("USE " + dbName) 
    stmt.execute("DROP TABLE IF EXISTS " + tableName) 
    df.printSchema() 
    df.show(5) 
    var columnArr = df.columns 
    var createQuery:String = " CREATE TABLE "+tableName+" (" 
    logger.info("=> no of columns : "+columnArr.length) 
    for(column <- columnArr){ 
     createQuery += column 
     createQuery += " VARCHAR(100)," 
    } 
    createQuery += " SHARD KEY ("+columnArr(0)+"))" 
    logger.info("=> create table query "+createQuery) 
    stmt.execute(createQuery) 

    df.select().saveToMemSQL(dbName, tableName, dbHost, dbPort, user, password, upsertBatchSize = 1000, useKeylessShardedOptimization = true) 
    stmt.close() 
    } 
} 

回答

2

您正在創建一個表,一個片鍵,然後設置useKeylessShardingOptimization = true,這將給未定義的行爲。將其設置爲false,並且應該很好。

此外,我不確定df.select().saveToMemSQL...做什麼。試一試df.saveToMemSQL ...

驗證時,請執行類似SELECT * FROM table WHERE col IS NOT NULL LIMIT 10的操作,查看您是否確實有全部空值。

PS:還有df.createMemSQLTableAs,它做你想要的東西。