2016-05-11 48 views
1

我正嘗試使用cassandra創建密鑰空間和表,但出現錯誤。事實上,我試圖連接火花和卡桑德拉。使用火花流式傳輸連接到Cassandra時出錯

我有以下代碼:

public static void main(String[] args){ 

    SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]"); 
    // Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode 
    sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); 
    // Create the context with 2 seconds batch size 
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 

    Map<String, Integer> topicMap = new HashMap<>(); 
    String[] topics = KafkaProperties.TOPIC.split(","); 
    for (String topic: topics) { 
     topicMap.put(topic, KafkaProperties.NUM_THREADS); 
    } 
    /* connection to cassandra */ 
    CassandraConnector connector = CassandraConnector.apply(sparkConf); 
    System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++"); 

    /* Receive kafka inputs */ 
    JavaPairReceiverInputDStream<String, String> messages = 
      KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap); 
    System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++"); 

    System.out.println(" ----- trying to create tables ------ "); 

    try (Session session = connector.openSession()) { 
     session.execute("DROP KEYSPACE IF EXISTS test"); 
     session.execute("CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"); 
     session.execute("CREATE TABLE test.users (id TEXT PRIMARY KEY, name TEXT)"); 
    } 

    System.out.println("---- tables created ----"); 

但我發現了以下錯誤:

Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042] Unexpected error during transport initialization (com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Connection has been closed))) 
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:196) 
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:80) 
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1145) 
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:313) 
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:182) 
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161) 
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161) 
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36) 
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61) 
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:70) 
at org.kakfa.spark.ConsumerData.main(ConsumerData.java:73) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) 

起初我還以爲是東道主,但後來我改變了連接主機「本地」得到下一個錯誤,我不知道我應該在這裏設置什麼不能得到這個:

Exception in thread "main" java.lang.IllegalArgumentException: Cannot build a cluster without contact points 
at com.datastax.driver.core.Cluster.checkNotEmpty(Cluster.java:108) 
at com.datastax.driver.core.Cluster.<init>(Cluster.java:100) 
at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:169) 
at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1031) 
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:179) 
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161) 
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161) 
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36) 
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61) 
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:70) 
at org.kakfa.spark.ConsumerData.main(ConsumerData.java:73) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) 

在我通過從命令行調用'bin/cassandra -f'啓動cassandra之前添加。

謝謝!

+0

你能從命令行連接到cassandra嗎? 'cqlsh localhost' – maasg

+0

當我嘗試它時,出現以下錯誤:連接錯誤:(無法連接到任何服務器,{'127.0.0.1':錯誤(111,「嘗試連接到[('127.0.0.1' ,9042)]。Last error:Connection refused「)})' –

+0

那麼你需要正確配置你的cassandra服務器。可能'cassandra.yaml'不包含正確的配置。 – maasg

回答

0

問題是我有錯誤的依賴關係。

如果你想了解更多信息,你可以看到solution here