2016-08-03 204 views
1

我想獲取每2分鐘後插入外部mysql數據庫 的數據行。我想用Spark Streaming來做這件事。Spark Streaming MYsql

但我得到了程序運行後,這個錯誤對一個time.So它給我的數據是第一次,但在那之後我得到了下面的錯誤,程序終止

錯誤我是

16/08/02 11:15:44 INFO JdbcRDD: closed connection 
16/08/02 11:15:44 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 620 bytes result sent to driver 
16/08/02 11:15:44 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 451 ms on localhost (1/1) 
16/08/02 11:15:44 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/08/02 11:15:44 INFO DAGScheduler: ResultStage 0 (foreach at databaseread.scala:33) finished in 0.458 s 
16/08/02 11:15:44 INFO DAGScheduler: Job 0 finished: foreach at databaseread.scala:33, took 0.664559 s 
16/08/02 11:15:44 ERROR StreamingContext: Error starting the context, marking it as stopped 
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:543) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:595) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594) 
    at org.test.spark.databaseread$.main(databaseread.scala:41) 
    at org.test.spark.databaseread.main(databaseread.scala) 
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 

我張貼我的代碼在here.Please幫我

package org.test.spark 

import org.xml.sax.helpers.NewInstance 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.JdbcRDD 
import java.sql.DriverManager 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.Seconds 

object databaseread { 
     def main(args:Array[String]) 
     { 
       val url="jdbc:mysql://localhost:3306/dbname" 
       val uname="root" 
       val pwd="root" 
       var i=0 
       val driver="com.mysql.jdbc.Driver" 
       val conf=new SparkConf().setAppName("DBget").setMaster("local") 
       val sc=new SparkContext(conf) 
       val ssc = new StreamingContext(sc, Seconds(60)) 

     val RDD=new JdbcRDD(sc,()=>DriverManager.getConnection(url,uname,pwd), 
      "select * from crimeweathercoords where ? 
       =?",1,1,1,r=>r.getString("Borough")+","+r.getString("Month")) 



     ssc.checkpoint(".") 


     ssc.start() 
     ssc.awaitTermination() 


     } 
    } 

回答

0

星火流的目的不是從像MySQL RDBMS中讀取數據。您可以嘗試創建自己的自定義接收器,但在此時使用常規Spark API可能更容易以塊爲單位檢索數據。

+0

謝謝!請試試這個.. !!! –