2017-02-14 207 views
2

我有一個將Spark工作提交給Spark獨立單節點羣集的Maven Scala應用程序。提交作業時,Spark應用程序嘗試使用spark-cassandra-connector訪問託管在Amazon EC2實例上的cassandra。連接已建立,但結果不會返回。連接器斷開一段時間後。如果我在本地模式下運行spark,它工作正常。 我試圖創建簡單的應用程序,我的代碼如下所示:Spark Cassandra連接器在獨立Spark羣集中不起作用

val sc = SparkContextLoader.getSC 
def runSparkJob():Unit={ 
    val table =sc.cassandraTable("prosolo_logs_zj", "logevents") 
    println(table.collect().mkString("\n")) 
} 

SparkContext.scala

object SparkContextLoader {  
    val sparkConf = new SparkConf() 
    sparkConf.setMaster("spark://127.0.1.1:7077") 

    sparkConf.set("spark.cores_max","2") 
    sparkConf.set("spark.executor.memory","2g") 
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    sparkConf.setAppName("Test application") 
    sparkConf.set("spark.cassandra.connection.host", "xxx.xxx.xxx.xxx") 
    sparkConf.set("spark.cassandra.connection.port", "9042") 
    sparkConf.set("spark.ui.port","4041") 

    val oneJar="/samplesparkmaven/target/samplesparkmaven-jar.jar" 
    sparkConf.setJars(List(oneJar)) 
    @transient val sc = new SparkContext(sparkConf) 

} 

控制檯輸出如下:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
17/02/14 23:11:25 INFO SparkContext: Running Spark version 2.1.0 
17/02/14 23:11:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
17/02/14 23:11:27 WARN Utils: Your hostname, zoran-Latitude-E5420 resolves to a loopback address: 127.0.1.1; using 192.168.2.68 instead (on interface wlp2s0) 
17/02/14 23:11:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 
17/02/14 23:11:27 INFO SecurityManager: Changing view acls to: zoran 
17/02/14 23:11:27 INFO SecurityManager: Changing modify acls to: zoran 
17/02/14 23:11:27 INFO SecurityManager: Changing view acls groups to: 
17/02/14 23:11:27 INFO SecurityManager: Changing modify acls groups to: 
17/02/14 23:11:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(zoran); groups with view permissions: Set(); users with modify permissions: Set(zoran); groups with modify permissions: Set() 
17/02/14 23:11:28 INFO Utils: Successfully started service 'sparkDriver' on port 33995. 
17/02/14 23:11:28 INFO SparkEnv: Registering MapOutputTracker 
17/02/14 23:11:28 INFO SparkEnv: Registering BlockManagerMaster 
17/02/14 23:11:28 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 
17/02/14 23:11:28 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 
17/02/14 23:11:28 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7b25a4cc-cb37-4332-a59b-e36fa45511cd 
17/02/14 23:11:28 INFO MemoryStore: MemoryStore started with capacity 870.9 MB 
17/02/14 23:11:28 INFO SparkEnv: Registering OutputCommitCoordinator 
17/02/14 23:11:28 INFO Utils: Successfully started service 'SparkUI' on port 4041. 
17/02/14 23:11:28 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.2.68:4041 
17/02/14 23:11:28 INFO SparkContext: Added JAR /samplesparkmaven/target/samplesparkmaven-jar.jar at spark://192.168.2.68:33995/jars/samplesparkmaven-jar.jar with timestamp 1487142688817 
17/02/14 23:11:28 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://127.0.1.1:7077... 
17/02/14 23:11:28 INFO TransportClientFactory: Successfully created connection to /127.0.1.1:7077 after 62 ms (0 ms spent in bootstraps) 
17/02/14 23:11:29 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20170214231129-0016 
17/02/14 23:11:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36901. 
17/02/14 23:11:29 INFO NettyBlockTransferService: Server created on 192.168.2.68:36901 
17/02/14 23:11:29 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 
17/02/14 23:11:29 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.2.68, 36901, None) 
17/02/14 23:11:29 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.2.68:36901 with 870.9 MB RAM, BlockManagerId(driver, 192.168.2.68, 36901, None) 
17/02/14 23:11:29 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.2.68, 36901, None) 
17/02/14 23:11:29 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.2.68, 36901, None) 
17/02/14 23:11:29 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 
17/02/14 23:11:29 INFO NettyUtil: Found Netty's native epoll transport in the classpath, using it 
17/02/14 23:11:31 INFO Cluster: New Cassandra host /xxx.xxx.xxx.xxx:9042 added 
17/02/14 23:11:31 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster 
17/02/14 23:11:32 INFO SparkContext: Starting job: collect at SparkConnector.scala:28 
17/02/14 23:11:32 INFO DAGScheduler: Got job 0 (collect at SparkConnector.scala:28) with 6 output partitions 
17/02/14 23:11:32 INFO DAGScheduler: Final stage: ResultStage 0 (collect at SparkConnector.scala:28) 
17/02/14 23:11:32 INFO DAGScheduler: Parents of final stage: List() 
17/02/14 23:11:32 INFO DAGScheduler: Missing parents: List() 
17/02/14 23:11:32 INFO DAGScheduler: Submitting ResultStage 0 (CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:18), which has no missing parents 
17/02/14 23:11:32 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 8.4 KB, free 870.9 MB) 
17/02/14 23:11:32 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.4 KB, free 870.9 MB) 
17/02/14 23:11:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.68:36901 (size: 4.4 KB, free: 870.9 MB) 
17/02/14 23:11:32 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996 
17/02/14 23:11:32 INFO DAGScheduler: Submitting 6 missing tasks from ResultStage 0 (CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:18) 
17/02/14 23:11:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 6 tasks 
17/02/14 23:11:39 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster 

我使用

  • scala 2.11.6
  • 火花2.1.0(既用於獨立火花和在應用依賴性)
  • 火花卡桑德拉連接器2.0.0-M3
  • 卡桑德拉Java驅動程序3.0.0
  • 阿帕奇卡桑德拉3.9

cassandra連接器的版本兼容性表沒有顯示任何問題,但我找不出可能是問題的其他任何問題。

回答

1

我終於解決了我的問題。事實證明,這是路徑問題。我正在使用jar的本地路徑,但錯過了添加「。」在開始時,它被視爲絕對路徑。 不幸的是,應用程序中並沒有例外,表明該文件在所提供的路徑中不存在,唯一的例外是工作人員在Spark集羣中找不到jar文件。

相關問題