2
我有這個程序正在讀取parquet文件並將其寫入到MemSQL表中。我可以確認火花正確讀取文件爲MemSQL Spark連接器從Spark插入空值到MemSQL
df.printSchema()
df.show(5)
正確打印模式和數據。
當我查詢表格時,我得到了所有行的NULL值。表中的一切都是NULL。我不確定這裏發生了什麼問題。
其寫入文件拼花的代碼memsql
package com.rb.scala
import com.memsql.spark.context.MemSQLContext
import java.sql.{ DriverManager, ResultSet, Connection, Timestamp }
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions.RowOrdering
import com.memsql.spark.connector._
import com.memsql.spark.connector.OnDupKeyBehavior._
import com.memsql.spark.connector.dataframe._
import com.memsql.spark.connector.rdd._
import scala.util.control.NonFatal
import org.apache.log4j.Logger
object MemSQLWriter {
def main(arg: Array[String]) {
var logger = Logger.getLogger(this.getClass())
if (arg.length < 1) {
logger.error("=> wrong parameters number")
System.err.println("Usage: MainExample <directory containing the source files to be loaded to database > ")
System.exit(1)
}
val jobName = "MemSQLWriter"
val conf = new SparkConf().setAppName(jobName)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val pathToFiles = arg(0)
logger.info("=> jobName \"" + jobName + "\"")
logger.info("=> pathToFiles \"" + pathToFiles + "\"")
val dbHost = "xx.xx.xx.xx"
val dbPort = 3306
val dbName = "memsqlrdd_db"
val user = "root"
val password = ""
val tableName = "target_table"
val dbAddress = "jdbc:mysql://" + dbHost + ":" + dbPort
val df = sqlContext.read.parquet("/projects/example/data/")
val conn = DriverManager.getConnection(dbAddress, user, password)
val stmt = conn.createStatement
stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName)
stmt.execute("USE " + dbName)
stmt.execute("DROP TABLE IF EXISTS " + tableName)
df.printSchema()
df.show(5)
var columnArr = df.columns
var createQuery:String = " CREATE TABLE "+tableName+" ("
logger.info("=> no of columns : "+columnArr.length)
for(column <- columnArr){
createQuery += column
createQuery += " VARCHAR(100),"
}
createQuery += " SHARD KEY ("+columnArr(0)+"))"
logger.info("=> create table query "+createQuery)
stmt.execute(createQuery)
df.select().saveToMemSQL(dbName, tableName, dbHost, dbPort, user, password, upsertBatchSize = 1000, useKeylessShardedOptimization = true)
stmt.close()
}
}