2016-09-22 73 views
1

我有一個外部配置單元分區表,我試圖從Spark中使用HiveContext讀取。但我得到空值。使用spark hivecontext閱讀外部配置單元分區表的問題

val maxClose = hiveContext.sql(「select max(Close)from stock_partitioned_data where symbol ='AAPL'」); maxClose.collect()的foreach(的println)

===== 


scala> import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.sql.hive.HiveContext 

scala> val hiveContext = new HiveContext(sc); 
16/09/22 00:12:47 INFO HiveContext: Initializing execution hive, version 1.1.0 
16/09/22 00:12:47 INFO ClientWrapper: Inspected Hadoop version: 2.6.0-cdh5.5.0 
16/09/22 00:12:47 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.5.0 
hiveContext: org.apache.spark.sql.hive.HiveContext = [email protected] 

scala> val maxClose = hiveContext.sql("select max(Close) from stock_data2") 
16/09/22 00:12:53 INFO ParseDriver: Parsing command: select max(Close) from stock_data2 
16/09/22 00:12:54 INFO ParseDriver: Parse Completed 
16/09/22 00:12:54 INFO ClientWrapper: Inspected Hadoop version: 2.6.0-cdh5.5.0 
16/09/22 00:12:54 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.5.0 
maxClose: org.apache.spark.sql.DataFrame = [_c0: double] 

scala> maxClose.collect().foreach (println) 
16/09/22 00:13:04 INFO deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps 
16/09/22 00:13:04 INFO MemoryStore: ensureFreeSpace(425824) called with curMem=0, maxMem=556038881 
16/09/22 00:13:04 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 415.8 KB, free 529.9 MB) 
16/09/22 00:13:05 INFO MemoryStore: ensureFreeSpace(44793) called with curMem=425824, maxMem=556038881 
16/09/22 00:13:05 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 43.7 KB, free 529.8 MB) 
16/09/22 00:13:05 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.0.2.15:47553 (size: 43.7 KB, free: 530.2 MB) 
16/09/22 00:13:05 INFO SparkContext: Created broadcast 0 from collect at <console>:27 
16/09/22 00:13:05 INFO SparkContext: Starting job: collect at <console>:27 
16/09/22 00:13:06 INFO FileInputFormat: Total input paths to process : 1 
16/09/22 00:13:06 INFO DAGScheduler: Registering RDD 5 (collect at <console>:27) 
16/09/22 00:13:06 INFO DAGScheduler: Got job 0 (collect at <console>:27) with 1 output partitions 
16/09/22 00:13:06 INFO DAGScheduler: Final stage: ResultStage 1(collect at <console>:27) 
16/09/22 00:13:06 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 
16/09/22 00:13:06 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 
16/09/22 00:13:06 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[5] at collect at <console>:27), which has no missing parents 
16/09/22 00:13:06 INFO MemoryStore: ensureFreeSpace(18880) called with curMem=470617, maxMem=556038881 
16/09/22 00:13:06 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 18.4 KB, free 529.8 MB) 
16/09/22 00:13:06 INFO MemoryStore: ensureFreeSpace(8367) called with curMem=489497, maxMem=556038881 
16/09/22 00:13:06 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.2 KB, free 529.8 MB) 
16/09/22 00:13:06 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.0.2.15:47553 (size: 8.2 KB, free: 530.2 MB) 
16/09/22 00:13:06 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861 
16/09/22 00:13:06 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[5] at collect at <console>:27) 
16/09/22 00:13:06 INFO YarnScheduler: Adding task set 0.0 with 2 tasks 
16/09/22 00:13:07 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1) 
16/09/22 00:13:08 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2) 
16/09/22 00:13:11 ERROR ErrorMonitor: AssociationError [akka.tcp://[email protected]:45637] <- [akka.tcp://[email protected]:33635]: Error [Shut down address: akka.tcp://[email protected]:33635] [ 
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://[email protected]:33635 
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. 
] 
akka.event.Logging$Error$NoCause$ 
16/09/22 00:13:12 INFO YarnClientSchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:49490/user/Executor#-842589632]) with ID 1 
16/09/22 00:13:12 INFO ExecutorAllocationManager: New executor 1 has registered (new total is 1) 
16/09/22 00:13:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, quickstart.cloudera, partition 0,NODE_LOCAL, 2291 bytes) 
16/09/22 00:13:13 INFO BlockManagerMasterEndpoint: Registering block manager quickstart.cloudera:56958 with 530.3 MB RAM, BlockManagerId(1, quickstart.cloudera, 56958) 
16/09/22 00:13:13 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on quickstart.cloudera:56958 (size: 8.2 KB, free: 530.3 MB) 
16/09/22 00:13:15 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on quickstart.cloudera:56958 (size: 43.7 KB, free: 530.2 MB) 
16/09/22 00:13:31 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, quickstart.cloudera, partition 1,NODE_LOCAL, 2291 bytes) 
16/09/22 00:13:31 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 18583 ms on quickstart.cloudera (1/2) 
16/09/22 00:13:31 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 157 ms on quickstart.cloudera (2/2) 
16/09/22 00:13:31 INFO DAGScheduler: ShuffleMapStage 0 (collect at <console>:27) finished in 25.082 s 
16/09/22 00:13:31 INFO DAGScheduler: looking for newly runnable stages 
16/09/22 00:13:31 INFO DAGScheduler: running: Set() 
16/09/22 00:13:31 INFO DAGScheduler: waiting: Set(ResultStage 1) 
16/09/22 00:13:31 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/09/22 00:13:31 INFO DAGScheduler: failed: Set() 
16/09/22 00:13:31 INFO DAGScheduler: Missing parents for ResultStage 1: List() 
16/09/22 00:13:31 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at collect at <console>:27), which is now runnable 
16/09/22 00:13:31 INFO MemoryStore: ensureFreeSpace(16544) called with curMem=497864, maxMem=556038881 
16/09/22 00:13:31 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 16.2 KB, free 529.8 MB) 
16/09/22 00:13:31 INFO MemoryStore: ensureFreeSpace(7375) called with curMem=514408, maxMem=556038881 
16/09/22 00:13:31 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 7.2 KB, free 529.8 MB) 
16/09/22 00:13:31 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.0.2.15:47553 (size: 7.2 KB, free: 530.2 MB) 
16/09/22 00:13:31 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:861 
16/09/22 00:13:31 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at collect at <console>:27) 
16/09/22 00:13:31 INFO YarnScheduler: Adding task set 1.0 with 1 tasks 
16/09/22 00:13:31 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, quickstart.cloudera, partition 0,PROCESS_LOCAL, 1914 bytes) 
16/09/22 00:13:31 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on quickstart.cloudera:56958 (size: 7.2 KB, free: 530.2 MB) 
16/09/22 00:13:31 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to quickstart.cloudera:49490 
16/09/22 00:13:31 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 157 bytes 
16/09/22 00:13:31 INFO DAGScheduler: ResultStage 1 (collect at <console>:27) finished in 0.245 s 
16/09/22 00:13:31 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 245 ms on quickstart.cloudera (1/1) 
16/09/22 00:13:31 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/09/22 00:13:31 INFO DAGScheduler: Job 0 finished: collect at <console>:27, took 26.194947 s 
[null] 

=== 

但如果我直接從蜂箱控制檯做到這一點,我得到的結果。

hive> select max(Close) from stock_data2 
    > ; 
Query ID = cloudera_20160922001414_4b684522-3e42-4957-8260-ff6b4da67c8f 
Total jobs = 1 
Launching Job 1 out of 1 
Number of reduce tasks determined at compile time: 1 
In order to change the average load for a reducer (in bytes): 
    set hive.exec.reducers.bytes.per.reducer=<number> 
In order to limit the maximum number of reducers: 
    set hive.exec.reducers.max=<number> 
In order to set a constant number of reducers: 
    set mapreduce.job.reduces=<number> 
Starting Job = job_1474445009419_0005, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1474445009419_0005/ 
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1474445009419_0005 
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 
2016-09-22 00:14:45,000 Stage-1 map = 0%, reduce = 0% 
2016-09-22 00:14:55,165 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.28 sec 
2016-09-22 00:15:03,707 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.68 sec 
MapReduce Total cumulative CPU time: 2 seconds 680 msec 
Ended Job = job_1474445009419_0005 
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 2.68 sec HDFS Read: 43379 HDFS Write: 10 SUCCESS 
Total MapReduce CPU Time Spent: 2 seconds 680 msec 
OK 
52.369999 
Time taken: 42.57 seconds, Fetched: 1 row(s) 

我得到計數​​(*)很好,但查詢列值和最大值爲空。

+0

我能夠正常(非分區和內部)表中讀取它,沒有任何問題。如果它是外部的或分區的,我會得到空的RDD –

回答

1

Spark版本1.6中已解決此問題