Ahoi全能的開發者。我在Spark中運行一些基本分析,我正在查詢多節點Cassandra。我跑哪裏我處理一些unlogic的代碼是:大查詢時間Spark + Cassandra
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import com.datastax.spark.connector._
object cassSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.set("spark.cassandra.connection.host","192.168.56.101")
.set("spark.cassandra.connection.host","192.168.56.102")
.set("spark.cassandra.connection.local_dc", "datacenter1")
.setMaster("local[*]")
.setAppName("cassSpark")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
time{
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "nth1", "keyspace"->"nth", "cluster" -> "Test Cluster"))
.load().cache()
df.count()
}
def time[A](f: => A) = {
val s = System.nanoTime
val ret = f
println("time: " + (System.nanoTime - s)/1e6 + "ms")
ret
}
}
}
所以,星火的版本是1.6.0,卡桑德拉v3.0.10,連接器也是1.6.0。 Keyspace有replication_factor: 1
,桌子有5列,實際上只有1排。你可以看到有兩個節點(在OracleVM中製作的虛擬機)。
我的問題是,當我測量查詢的時間從火花到卡桑德拉我得到了約20秒結果怎麼一回事,因爲沒有在表中只有一行這是不正常的我。我是否錯過了某些東西,或者我正在測量某些錯誤,或者可能正在使用我的代碼。有人能幫助我,或者告訴我如何有效地做到這一點或處理它。
[編輯]
由於@Artem阿利耶夫想,這是整個信息日誌:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/12/06 12:31:04 INFO SparkContext: Running Spark version 1.6.0
16/12/06 12:31:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/12/06 12:31:07 INFO SecurityManager: Changing view acls to: superbrainbug
16/12/06 12:31:07 INFO SecurityManager: Changing modify acls to: Ivan Majnaric
16/12/06 12:31:07 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(superbrainbug); users with modify permissions: Set(superbrainbug)
16/12/06 12:31:12 INFO Utils: Successfully started service 'sparkDriver' on port 62101.
16/12/06 12:31:14 INFO Slf4jLogger: Slf4jLogger started
16/12/06 12:31:14 INFO Remoting: Starting remoting
16/12/06 12:31:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:62114]
16/12/06 12:31:15 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 62114.
16/12/06 12:31:15 INFO SparkEnv: Registering MapOutputTracker
16/12/06 12:31:15 INFO SparkEnv: Registering BlockManagerMaster
16/12/06 12:31:15 INFO DiskBlockManager: Created local directory at C:\Users\superbrainbug\AppData\Local\Temp\blockmgr-8b664e71-ead1-4462-b171-bf542a5eb444
16/12/06 12:31:15 INFO MemoryStore: MemoryStore started with capacity 1124.6 MB
16/12/06 12:31:18 INFO SparkEnv: Registering OutputCommitCoordinator
16/12/06 12:31:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/12/06 12:31:20 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/12/06 12:31:20 INFO Executor: Starting executor ID driver on host localhost
16/12/06 12:31:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 62136.
16/12/06 12:31:20 INFO NettyBlockTransferService: Server created on 62136
16/12/06 12:31:20 INFO BlockManagerMaster: Trying to register BlockManager
16/12/06 12:31:21 INFO BlockManagerMasterEndpoint: Registering block manager localhost:62136 with 1124.6 MB RAM, BlockManagerId(driver, localhost, 62136)
16/12/06 12:31:21 INFO BlockManagerMaster: Registered BlockManager
16/12/06 12:31:23 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead.
16/12/06 12:31:24 INFO Cluster: New Cassandra host /192.168.56.101:9042 added
16/12/06 12:31:24 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
16/12/06 12:31:27 INFO CassandraSourceRelation: Input Predicates: []
16/12/06 12:31:27 INFO CassandraSourceRelation: Input Predicates: []
16/12/06 12:31:31 INFO SparkContext: Starting job: count at cassSpark.scala:69
16/12/06 12:31:31 INFO DAGScheduler: Registering RDD 7 (count at cassSpark.scala:69)
16/12/06 12:31:31 INFO DAGScheduler: Got job 0 (count at cassSpark.scala:69) with 1 output partitions
16/12/06 12:31:31 INFO DAGScheduler: Final stage: ResultStage 1 (count at cassSpark.scala:69)
16/12/06 12:31:31 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/12/06 12:31:31 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/12/06 12:31:31 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[7] at count at cassSpark.scala:69), which has no missing parents
16/12/06 12:31:35 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 20.0 KB, free 20.0 KB)
16/12/06 12:31:35 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.2 KB, free 29.2 KB)
16/12/06 12:31:35 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:62136 (size: 9.2 KB, free: 1124.6 MB)
16/12/06 12:31:35 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/12/06 12:31:35 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[7] at count at cassSpark.scala:69)
16/12/06 12:31:35 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/12/06 12:31:35 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 20008 bytes)
16/12/06 12:31:35 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/12/06 12:31:35 INFO CacheManager: Partition rdd_4_0 not found, computing it
16/12/06 12:31:36 INFO GenerateUnsafeProjection: Code generated in 395.192925 ms
16/12/06 12:31:39 INFO MemoryStore: Block rdd_4_0 stored as values in memory (estimated size 1168.0 B, free 30.3 KB)
16/12/06 12:31:39 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:62136 (size: 1168.0 B, free: 1124.6 MB)
16/12/06 12:31:39 INFO GeneratePredicate: Code generated in 35.193967 ms
16/12/06 12:31:39 INFO GenerateColumnAccessor: Code generated in 52.555004 ms
16/12/06 12:31:39 INFO GenerateMutableProjection: Code generated in 14.129896 ms
16/12/06 12:31:39 INFO GenerateUnsafeProjection: Code generated in 14.130749 ms
16/12/06 12:31:40 INFO GenerateMutableProjection: Code generated in 19.217034 ms
16/12/06 12:31:40 INFO GenerateUnsafeRowJoiner: Code generated in 72.736302 ms
16/12/06 12:31:40 INFO GenerateUnsafeProjection: Code generated in 133.407346 ms
16/12/06 12:31:41 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 3895 bytes result sent to driver
16/12/06 12:31:41 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 5452 ms on localhost (1/1)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/12/06 12:31:41 INFO DAGScheduler: ShuffleMapStage 0 (count at cassSpark.scala:69) finished in 5,542 s
16/12/06 12:31:41 INFO DAGScheduler: looking for newly runnable stages
16/12/06 12:31:41 INFO DAGScheduler: running: Set()
16/12/06 12:31:41 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/12/06 12:31:41 INFO DAGScheduler: failed: Set()
16/12/06 12:31:41 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at count at cassSpark.scala:69), which has no missing parents
16/12/06 12:31:41 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.3 KB, free 39.6 KB)
16/12/06 12:31:41 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.6 KB, free 44.1 KB)
16/12/06 12:31:41 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:62136 (size: 4.6 KB, free: 1124.6 MB)
16/12/06 12:31:41 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/12/06 12:31:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at count at cassSpark.scala:69)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/12/06 12:31:41 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1999 bytes)
16/12/06 12:31:41 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/12/06 12:31:41 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/12/06 12:31:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 15 ms
16/12/06 12:31:41 INFO GenerateMutableProjection: Code generated in 92.969655 ms
16/12/06 12:31:41 INFO GenerateMutableProjection: Code generated in 11.48414 ms
16/12/06 12:31:41 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1830 bytes result sent to driver
16/12/06 12:31:41 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 602 ms on localhost (1/1)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/12/06 12:31:41 INFO DAGScheduler: ResultStage 1 (count at cassSpark.scala:69) finished in 0,605 s
16/12/06 12:31:41 INFO DAGScheduler: Job 0 finished: count at cassSpark.scala:69, took 10,592173 s
time: 19242.858679ms
16/12/06 12:31:48 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/12/06 12:31:48 INFO SparkContext: Invoking stop() from shutdown hook
16/12/06 12:31:48 INFO SerialShutdownHooks: Successfully executed shutdown hook: Clearing session cache for C* connector
16/12/06 12:31:48 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/12/06 12:31:48 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/12/06 12:31:48 INFO MemoryStore: MemoryStore cleared
16/12/06 12:31:48 INFO BlockManager: BlockManager stopped
16/12/06 12:31:48 INFO BlockManagerMaster: BlockManagerMaster stopped
16/12/06 12:31:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/12/06 12:31:48 INFO SparkContext: Successfully stopped SparkContext
16/12/06 12:31:48 INFO ShutdownHookManager: Shutdown hook called
16/12/06 12:31:48 INFO ShutdownHookManager: Deleting directory C:\Users\superbrainbug\AppData\Local\Temp\spark-17606f05-6bb8-4144-acb5-015f15ea1ea9
Process finished with exit code 0
另外,@Yves DARMAILLAC如果我刪除.cache()
運行時間12S
嗯,從當地的這種切換到獨立的火花簇看起來很不錯的技巧,我來試試。你有更多具體的提示來改善我的代碼嗎? –