我有一個應用程序需要從MemSQL讀取數據並加載到DataFrame。我正在使用memsql-spark-connector來連接該通信。但是,不幸的是,我被困在了我試圖連接到我的memSQL主節點的地步。我連接到memsql主節點的方式有什麼問題? 其實我試圖在我的本地機器上使用mySQL客戶端來登錄到memsql主節點,它工作。所以我想這個問題與服務器端沒有關係。使用MemSQL的錯誤Spark連接器
這裏是我的異常堆棧跟蹤:
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2294)
at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:2039)
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1533)
at com.memsql.spark.connector.MemSQLConnectionPool$.connect(MemSQLConnectionPool.scala:34)
at com.memsql.spark.connector.MemSQLConnectionPool$.withConnection(MemSQLConnectionPool.scala:38)
at com.memsql.spark.connector.MemSQLCluster$$anonfun$withAggregatorConn$1.apply(MemSQLCluster.scala:26)
at com.memsql.spark.connector.MemSQLCluster$$anonfun$withAggregatorConn$1.apply(MemSQLCluster.scala:26)
at org.apache.spark.sql.memsql.MemSQLCatalog.getDBTablePairs(MemSQLCatalog.scala:64)
at org.apache.spark.sql.memsql.MemSQLCatalog.lookupTable(MemSQLCatalog.scala:58)
at org.apache.spark.sql.memsql.MemSQLCatalog.lookupRelation(MemSQLCatalog.scala:24)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:916)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:916)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
at org.apache.spark.sql.memsql.MemSQLContext.sql(MemSQLContext.scala:48)
at org.apache.spark.sql.memsql.MemSQLContext.sql(MemSQLContext.scala:39)
at MemsqlSparkLoader.load(MemsqlSparkLoader.scala:19)
at MemsqlSparkLoaderTest$$anonfun$1.apply$mcV$sp(MemsqlSparkLoaderTest.scala:20)
at MemsqlSparkLoaderTest$$anonfun$1.apply(MemsqlSparkLoaderTest.scala:14)
at MemsqlSparkLoaderTest$$anonfun$1.apply(MemsqlSparkLoaderTest.scala:14)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1036)
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:338)
at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2232)
at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2265)
at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2064)
at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:790)
at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:395)
at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:325)
at org.apache.commons.dbcp2.DriverConnectionFactory.createConnection(DriverConnectionFactory.java:39)
at org.apache.commons.dbcp2.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:256)
at org.apache.commons.dbcp2.BasicDataSource.validateConnectionFactory(BasicDataSource.java:2304)
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2290)
... 126 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at com.mysql.jdbc.StandardSocketFactory.connect(StandardSocketFactory.java:213)
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:297)
... 142 more
的代碼段的最後一行下面是哪裏此異常被拋出:
val masterHost:String = "XXX"
val masterPort:Int = 3307
val defaultDBName:String = "mydbtest"
val user:String = "user"
val passwd:String = "passwd"
val query:String = "select * from transactions where block_height <= 1000"
val conf:SparkConf = new SparkConf().setAppName("MemsqlSparkLoaderTest").setMaster("local")
conf.set("memsql.host", masterHost)
conf.set("memsql.port", masterPort.toString)
conf.set("memsql.defaultDatabase", defaultDBName)
conf.set("memsql.user", user)
conf.set("memsql.password", passwd)
val sc:SparkContext = new SparkContext(conf)
val msc:MemSQLContext = new MemSQLContext(sc)
val df = msc.sql(query)
我的主人的memsql.cnf
配置文件節點寫成如下:
[server]
basedir = .
bind_address = 0.0.0.0
core_file
durability = on
lc_messages_dir = ./share
lock_wait_timeout = 60
max_connections = 100000
plan_expiration_minutes = 180
redundancy_level = 2
skip_name_resolve = on
snapshot_trigger_size = 256m
socket = memsql.sock
ssl_cert = /var/lib/memsql/certs/server-cert.pem
ssl_key = /var/lib/memsql/certs/server-key.pem
tmpdir = .
transaction_buffer = 64m
; ------------------------------------------------------------------------
; MEMSQL OPS VARIABLES
;
; Variables below this header are controlled by MemSQL Ops.
; Please do not edit any of these values directly.
; ------------------------------------------------------------------------
master_aggregator
port = 3307
您可以發佈MemSQL節點設置(memsql.cnf配置) –
@張貼在我的問題的身體萊特 - 獵鷹強制。這是我memsql集羣主節點上的memsql.cnf文件。 –
檢查:http://stackoverflow.com/questions/6865538/solving-a-communications-link-failure-with-jdbc-and-mysql –