2015-04-01 47 views
0

我有一個與datastax spark-Cassandra-connector相關的問題。當我試圖測試我們的spark-Cassandra連接時,我使用波紋管代碼。我的問題是這段代碼在半小時之後會拋出異常。我認爲存在一些連接問題,任何人都可以幫助,我被卡住了。Spark cassandra連接器連接錯誤,無需更多主機嘗試

SparkConf conf = new SparkConf(true) 
    .setMaster("local") 
    .set("spark.cassandra.connection.host", 
      Config.CASSANDRA_CONTACT_POINT) 
    .setAppName(Config.CASSANDRA_DB_NAME) 
    .set("spark.executor.memory", 
      Config.Spark_Executor_Memory); 
    SparkContext javaSparkContext = new SparkContext(conf); 
    SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(javaSparkContext); 

    for(;;){ 
    JavaRDD<ObjectHandler> obj = functions.cassandraTable(Config.CASSANDRA_DB_NAME, 
      "my_users", ObjectHandler.class); 
    System.out.println("#####" + obj.count() + "#####"); 
    } 

錯誤:

java.lang.OutOfMemoryError: Java heap space 
at org.jboss.netty.buffer.HeapChannelBuffer.slice(HeapChannelBuffer.java:201) 
at org.jboss.netty.buffer.AbstractChannelBuffer.readSlice(AbstractChannelBuffer.java:323) 
at com.datastax.driver.core.CBUtil.readValue(CBUtil.java:247) 
at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:395) 
at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:383) 
at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:201) 
at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:198) 
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:182) 
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66) 
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) 
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) 
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) 
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310) 
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) 
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) 
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) 
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) 
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) 
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) 
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) 
at java.lang.Thread.run(Thread.java:722) 
19:11:12.311 DEBUG [New I/O worker #1612][com.datastax.driver.core.Connection] Defuncting connection to /192.168.1.26:9042 
com.datastax.driver.core.TransportException: [/192.168.1.26:9042] Unexpected exception triggered (java.lang.OutOfMemoryError: Java heap space) 
    at com.datastax.driver.core.Connection$Dispatcher.exceptionCaught(Connection.java:614) 
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) 
    at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:60) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377) 
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) 
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525) 
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:48) 
    at org.jboss.netty.channel.DefaultChannelPipeline.notifyHandlerException(DefaultChannelPipeline.java:658) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:566) 
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310) 
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) 
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) 
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) 
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) 
    at java.lang.Thread.run(Thread.java:722) 
Caused by: java.lang.OutOfMemoryError: Java heap space 
    at org.jboss.netty.buffer.HeapChannelBuffer.slice(HeapChannelBuffer.java:201) 
    at org.jboss.netty.buffer.AbstractChannelBuffer.readSlice(AbstractChannelBuffer.java:323) 
    at com.datastax.driver.core.CBUtil.readValue(CBUtil.java:247) 
    at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:395) 
    at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:383) 
    at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:201) 
    at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:198) 
    at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:182) 
    at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) 
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) 
    ... 3 more 
19:11:13.549 DEBUG [New I/O worker #1612][com.datastax.driver.core.Connection] [/192.168.1.26:9042-1] closing connection 
19:11:12.311 DEBUG [main][com.datastax.driver.core.ControlConnection] [Control connection] error on /192.168.1.26:9042 connection, no more host to try 
com.datastax.driver.core.ConnectionException: [/192.168.1.26:9042] Operation timed out 
    at com.datastax.driver.core.DefaultResultSetFuture.onTimeout(DefaultResultSetFuture.java:138) 
    at com.datastax.driver.core.Connection$ResponseHandler$1.run(Connection.java:763) 
    at org.jboss.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:546) 
    at org.jboss.netty.util.HashedWheelTimer$Worker.notifyExpiredTimeouts(HashedWheelTimer.java:446) 
    at org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:395) 
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) 
    at java.lang.Thread.run(Thread.java:722) 
19:11:13.551 DEBUG [main][com.datastax.driver.core.Cluster] Shutting down 
Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.1.26:9042 (com.datastax.driver.core.ConnectionException: [/192.168.1.26:9042] Operation timed out)) 
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:195) 
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79) 
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1143) 
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:313) 
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:166) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:151) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:151) 
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36) 
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61) 
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:72) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97) 
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:108) 
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:131) 
    at com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:206) 
    at com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:205) 
    at com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:212) 
    at com.datastax.spark.connector.SparkContextFunctions.cassandraTable(SparkContextFunctions.scala:48) 
    at com.datastax.spark.connector.SparkContextJavaFunctions.cassandraTable(SparkContextJavaFunctions.java:47) 
    at com.datastax.spark.connector.SparkContextJavaFunctions.cassandraTable(SparkContextJavaFunctions.java:89) 
    at com.datastax.spark.connector.SparkContextJavaFunctions.cassandraTable(SparkContextJavaFunctions.java:140) 
    at com.shephertz.app42.paas.spark.SegmentationWorker.main(SegmentationWorker.java:52) 

回答

1

看起來你跑出來的堆空間:

java.lang.OutOfMemoryError: Java heap space 

的Java的應用程序(什麼火花連接器使用與卡桑德拉互動)因爲處理請求時拋出了OutOfMemoryError而導致連接斷開。當連接失效時,其主機被關閉。

NoHostAvailableException可能因爲所有主機都被關閉,因爲它們的連接已被取消,可能是因爲OutOfMemoryError。

你知道爲什麼你會得到一個OutOfMemoryError嗎?你的堆大小是多少?你是否在做任何會導致你的JVM中有很多對象堆積的問題?你可能有內存泄漏?

+0

我不知道爲什麼我得到OutOfMemoryError,我使用的是我一直給這個線程的相同的代碼,我不知道哪個對象是這個錯誤的原因。 – 2015-04-01 16:08:27

+0

這可能有助於理解爲什麼你會得到OOM錯誤:http://stackoverflow.com/questions/4512147/how-to-debug-java-outofmemory-exceptions – 2015-04-01 16:21:15

+0

類加載器/組件「sun.misc.Launcher $ AppClassLoader @ 0x29780338 「佔用175,467,272(96.10%)個字節。內存在由「sun.misc.Launcher $ AppClassLoader @ 0x29780338」加載的「scala.collection.concurrent.BasicNode []」的一個實例中累積。 關鍵詞 scala.collection.concurrent.BasicNode [] sun.misc.Launcher $ AppClassLoader @ 0x29780338 – 2015-05-21 10:34:36

0

您的錯誤可能在於如何配置JVM。如果設置未正確調整,垃圾收集可能會導致一些問題。如果您使用的卡珊德拉> 2.0看Datastax's "Tuning Java Resources"

卡桑德拉如何使用內存從文件:

Using a java-based system like Cassandra, you can typically allocate about 8GB of memory on the heap before garbage collection pause time starts to become a problem. Modern machines have much more memory than that and Cassandra can make use of additional memory as page cache when files on disk are accessed. Allocating more than 8GB of memory on the heap poses a problem due to the amount of Cassandra metadata about data on disk. The Cassandra metadata resides in memory and is proportional to total data. Some of the components grow proportionally to the size of total memory.

In Cassandra 1.2 and later, the Bloom filter and compression offset map that store this metadata reside off-heap, greatly increasing the capacity per node of data that Cassandra can handle efficiently. In Cassandra 2.0, the partition summary also resides off-heap.

請發表您的JVM選項以獲得更多幫助。

+0

我的回答應該是一個評論,但的確有點長。一旦OP爲他的Cassandra實例提供JVM選項,我將更新它。 – Nathan 2015-09-18 14:54:51

相關問題