2016-06-10 57 views
0

我有一個SPARK應用程序,它使用TwitterUtils來讀取Twitter流,並使用流上的map和foreachRDD將Twitter消息放入數據庫。這一切都很好。我如何分離/關閉JavaDStream和TwitterUtils.createStream(...)

我的問題:一旦一切正在運行,什麼是從Twitter流中分離出最合適的方式。假設我只想收集1000條消息或運行集合60秒。

的代碼如下:

SparkConf sparkConf = new SparkConf().setAppName("Java spark twitter stream"); 
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); 
JavaDStream<Status> tweets = TwitterUtils.createStream(ssc, filters); 

JavaDStream<String> statuses = tweets.map(
     new Function<Status, String>() { 
      public String call(Status status) { 
       //combine the strings here. 
       GeoLocation geoLocation = status.getGeoLocation(); 
       if (geoLocation != null) { 
        String text = status.getText().replaceAll("[\r\n]", " "); 
        String line = geoLocation.getLongitude() + ",,,," 
          + geoLocation.getLatitude() + ",,,," 
          + status.getCreatedAt().getTime() 
          + ",,,," + status.getUser().getId() 
          + ",,,," + text; 
        return line; 
       } else { 
        return null; 
       } 
      } 
     } 
     ).filter(new Function<String, Boolean>() { 
      public Boolean call(String input) { 
       return input != null; 
      } 
     }); 
     statuses.print(); 


statuses.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() { 
     @Override 
     public Void call(JavaRDD<String> rdd, Time time) { 
      SQLContext sqlContext 
        = JavaSQLContextSingleton 
          .getInstance(rdd.context()); 
      sqlContext.setConf("spark.sql.tungsten.enabled", "false"); 

      JavaRDD<Row> tweetRowRDD 
        = rdd.map(new TweetMapLoadFunction()); 

      DataFrame statusesDataFrame 
        = sqlContext 
          .createDataFrame(
           tweetRowRDD, 
           tweetSchema.createTweetStructType()); 
      return null; 
     } 
    }); 

    ssc.start(); 
    ssc.awaitTermination(); 

回答

0

這是直接從the documentation

的處理可以手動使用streamingContext.stop()停止。

要記住的要點:

  • 一旦上下文已經開始,沒有新的流計算可以設置或添加到它。
  • 上下文一旦停止,就無法重新啓動。
  • 只有一個StreamingContext可以同時在JVM中處於活動狀態。
  • StreamingContext上的stop()也會停止SparkContext。要僅停止StreamingContext,請將stop()的可選參數stopSparkContext設置爲false。
  • 只要先前的StreamingContext在創建下一個StreamingContext之前停止(不停止SparkContext),就可以重新使用SparkContext來創建多個StreamingContext。