1

我正在學習Apache Kafka-storm-cassandra集成。我正在使用Kafka Spout從Kafka集羣讀取JSON字符串。然後將它傳遞給解析JSON的螺栓,並將需要的值發送給將其寫入Cassandra DB的第二個螺栓。將數據插入Cassandra時出錯

但我得到這些錯誤。

java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '' expecting '''

java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at 
backtype.storm.daemon.executor$fn__4722$fn__4734$fn__4781.invoke(executor.clj:748) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) 
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:35) at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at 
com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) at bolts.WordCounter.execute(WordCounter.java:103) at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) at 
backtype.storm.daemon.executor$fn__4722$tuple_action_fn__4724.invoke(executor.clj:633) at backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:401) at backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58) at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ... 6 more Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at com.datastax.driver.core.Responses$Error.asException(Responses.java:101) at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) at 
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) at 
com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at 
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at 
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at 
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more 

TableAlreadyExists錯誤:

com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85)

com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85) at 
com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) at 
com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) at bolts.WordCounter.prepare(WordCounter.java:77) at backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:43) at backtype.storm.daemon.executor$fn__4722$fn__4734.invoke(executor.clj:692) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at 
com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85) at 
com.datastax.driver.core.Responses$Error.asException(Responses.java:105) at 

com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) at 
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) at 
com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more Caused by: com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:70) at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:38) at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:168) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66) ... 21 more 

我maintopology:

public class TopologyQueryCounterMain { 


static final Logger logger = Logger.getLogger(TopologyQueryCounterMain.class); 


private static final String SPOUT_ID = "QueryCounter"; 


public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 

    int numSpoutExecutors = 1; 
    logger.debug("This is SpoutConfig"); 
    KafkaSpout kspout = QueryCounter(); 
    TopologyBuilder builder = new TopologyBuilder(); 
    logger.debug("This is Set Spout"); 
    builder.setSpout(SPOUT_ID, kspout, numSpoutExecutors); 
    logger.debug("This is Set bolt"); 
    builder.setBolt("word-normalizer", new WordNormalizer()) 
     .shuffleGrouping(SPOUT_ID); 
    builder.setBolt("word-counter", new WordCounter(),1) 
     .shuffleGrouping("word-normalizer", "stream1"); 


    Config conf = new Config(); 
    LocalCluster cluster = new LocalCluster(); 
    logger.debug("This is Submit cluster"); 
    conf.put(Config.NIMBUS_HOST, "192.168.1.229"); 
    conf.put(Config.NIMBUS_THRIFT_PORT, 6627); 
    System.setProperty("storm.jar", "/home/ubuntu/workspace/QueryCounter/target/QueryCounter-0.0.1-SNAPSHOT.jar"); 
    conf.setNumWorkers(20); 
    conf.setMaxSpoutPending(5000); 

    if (args != null && args.length > 0) { 
     StormSubmitter. submitTopology(args[0], conf, builder.createTopology()); 
    } 

    else 
    { 
     cluster.submitTopology("QueryCounter", conf, builder.createTopology()); 
     Utils.sleep(10000); 
     cluster.killTopology("QueryCounter"); 
     logger.debug("This is ShutDown cluster"); 
     cluster.shutdown(); 
    } 
} 


private static KafkaSpout QueryCounter() { 
    String zkHostPort = "localhost:2181"; 
    String topic = "RandomQuery"; 

    String zkRoot = "/QueryCounter"; 
    String zkSpoutId = "QueryCounter-spout"; 
    ZkHosts zkHosts = new ZkHosts(zkHostPort); 

    logger.debug("This is Inside kafka spout cluster"); 
    SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId); 
    spoutCfg.scheme=new SchemeAsMultiScheme(new StringScheme()); 
    KafkaSpout kafkaSpout = new KafkaSpout(spoutCfg); 
    return kafkaSpout; 
    } 

} 

正規化博爾特:

public class WordNormalizer extends BaseBasicBolt { 
static final Logger logger = Logger.getLogger(WordNormalizer.class); 
public void cleanup() {} 

/** 
* The bolt will receive the line from the 
* words file and process it to Normalize this line 
* 
* The normalize will be put the words in lower case 
* and split the line to get all words in this 
*/ 
public void execute(Tuple input, BasicOutputCollector collector) { 
String feed = input.getString(0); 

    String searchTerm = null; 
    String pageNo = null; 
    boolean sortOrder = true; 
    boolean category = true; 
    boolean field = true; 
    boolean filter = true; 
    String pc = null; 
    int ProductCount = 0; 
    String timestamp = null; 

    JSONObject obj = null; 
    try { 
     obj = new JSONObject(feed); 
    } catch (JSONException e1) { 
     // TODO Auto-generated catch block 
     //e1.printStackTrace(); 

    } 

    try { 
      searchTerm = obj.getJSONObject("body").getString("correctedWord"); 

      pageNo = obj.getJSONObject("body").getString("pageNo"); 
      sortOrder = obj.getJSONObject("body").isNull("sortOrder"); 
      category = obj.getJSONObject("body").isNull("category"); 
      field = obj.getJSONObject("body").isNull("field"); 
      filter = obj.getJSONObject("body").getJSONObject("filter").isNull("filters"); 
      pc = obj.getJSONObject("body").getString("ProductCount").replaceAll("[^\\d]", ""); 
      ProductCount = Integer.parseInt(pc); 
      timestamp = (obj.getJSONObject("envelope").get("timestamp")).toString().replaceAll("[^\\d]", ""); 
    } catch (JSONException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 

    } 

    searchTerm = searchTerm.trim(); 

    //Condition to eliminate pagination 
    if(!searchTerm.isEmpty()){ 
     if ((pageNo.equals("1")) && (sortOrder == true) && (category == true) && (field == true) && (filter == true)){ 
      searchTerm = searchTerm.toLowerCase(); 

      System.out.println("In Normalizer term : "+searchTerm+","+timestamp+","+ProductCount); 
      System.out.println("Entire Json : "+feed); 

      collector.emit("stream1", new Values(searchTerm , timestamp , ProductCount)); 

      } 
    } 


    } 



/** 
* The bolt will only emit the field "word" 
*/ 
public void declareOutputFields(OutputFieldsDeclarer declarer) { 

    declarer.declareStream("stream1", new Fields("searchTerm" ,"timestamp" ,"ProductCount")); 

} 
} 

CassandraWriter螺栓:

public class WordCounter extends BaseBasicBolt { 
static final Logger logger = Logger.getLogger(WordCounter.class); 
Integer id; 
String name; 
Map<String, Integer> counters; 
Cluster cluster ; 
Session session ; 

/** 
* At the end of the spout (when the cluster is shutdown 
* We will show the word counters 
*/ 
@Override 
public void cleanup() { 

} 
public static Session getSessionWithRetry(Cluster cluster, String keyspace) { 
     while (true) { 
      try { 
       return cluster.connect(keyspace); 
      } catch (NoHostAvailableException e) { 

       Utils.sleep(1000); 
      } 
     } 

    } 
public static Cluster setupCassandraClient() { 
    return Cluster.builder().addContactPoint("192.168.1.229").build(); 
} 
/** 
* On create 
*/ 
@Override 
public void prepare(Map stormConf, TopologyContext context) { 
    this.counters = new HashMap<String, Integer>(); 
    this.name = context.getThisComponentId(); 
    this.id = context.getThisTaskId(); 
    cluster = setupCassandraClient(); 
    session = WordCounter.getSessionWithRetry(cluster,"query"); 

    String query = "CREATE TABLE IF NOT EXISTS ProductCount(uid uuid PRIMARY KEY, " 
      + "term text , " 
      + "ProductCount varint," 
      +"timestamp text);"; 

    session.executeAsync(query); 
} 

@Override 
public void declareOutputFields(OutputFieldsDeclarer declarer) {} 


@Override 
public void execute(Tuple input, BasicOutputCollector collector) { 
    String term = input.getString(0); 
    String timestamp = input.getString(1); 
    int ProductCount = input.getInteger(2); 

    System.out.println("In Counter : " +term+","+ProductCount+","+timestamp); 
    /** 
    * If the word dosn't exist in the map we will create 
    * this, if not We will add 1 
    */ 



String insertIntoTable = "INSERT INTO ProductCount (uid, term, ProductCount, timestamp)" 

    + " VALUES("+UUID.randomUUID()+","+"\'"+term+"\'"+","+ProductCount+","+"\'"+timestamp+"\'"+");" ; 
    session.executeAsync(insertIntoTable); 



} 
} 

請提示我,我需要做的修改。

Thanks in Advnace !!

回答

5

錯誤指向其中一個查詢中缺少'。如果你只是串聯字符串,這通常是問題。爲了避免這種問題,您應該使用prepared statements或至少Session.execute(query, params)javadoc

+0

謝謝,我想了一會兒之後。 –