2015-11-14 86 views
1

我想簡單的文件使用Sparkstreaming流例如(火花streaming_2.10,版本:1.5.1)星火FileStreaming問題

public class DStreamExample { 

    public static void main(final String[] args) { 

     final SparkConf sparkConf = new SparkConf(); 
     sparkConf.setAppName("SparkJob"); 
     sparkConf.setMaster("local[4]"); // for local 

     final JavaSparkContext sc = new JavaSparkContext(sparkConf); 

     final JavaStreamingContext ssc = new JavaStreamingContext(sc, 
      new Duration(2000)); 

     final JavaDStream<String> lines = ssc.textFileStream("/opt/test/"); 
     lines.print(); 

     ssc.start(); 
     ssc.awaitTermination(); 
    } 
} 

當我運行單個文件或主管該代碼它不打印任何東西從文件中,我在日誌中看到它不斷輪詢,但沒有打印任何內容。這個程序運行時,我嘗試將文件移動到目錄。

有什麼我失蹤?我嘗試在線路RDD上應用地圖功能,這也不起作用。

回答

1

的API textFileStream不應該讀取現有目錄中的內容,相反,它的目的是監控更改給定的Hadoop兼容的文件系統路徑,文件必須通過從其他位置的「移動」它們寫進監視位置在相同的文件系統中。 簡而言之,您正在訂閱目錄更改,並且將在受監視位置內收到新出現的文件的內容 - 在監視快照時出現該文件的狀態(該狀態是2000毫秒的持續時間你的情況),並且任何進一步的文件更新都不會到達該流,只有目錄更新(新文件)將會執行。

可以模擬更新的方式是你的監視會話過程中創建新的文件:

import org.apache.commons.io.FileUtils; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 

import java.io.File; 
import java.io.IOException; 
import java.util.List; 

public class DStreamExample { 

public static void main(final String[] args) throws IOException { 

    final SparkConf sparkConf = new SparkConf(); 
    sparkConf.setAppName("SparkJob"); 
    sparkConf.setMaster("local[4]"); // for local 

    final JavaSparkContext sc = new JavaSparkContext(sparkConf); 

    final JavaStreamingContext ssc = new JavaStreamingContext(sc, 
      new Duration(2000)); 

    final JavaDStream<String> lines = ssc.textFileStream("/opt/test/"); 

    // spawn the thread which will create new file within the monitored directory soon 
    Runnable r =() -> { 
     try { 
      Thread.sleep(5000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     try { 
      FileUtils.write(new File("/opt/test/newfile1"), "whatever"); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    }; 

    new Thread(r).start(); 


    lines.foreachRDD((Function<JavaRDD<String>, Void>) rdd -> { 
     List<String> lines1 = rdd.collect(); 
     lines1.stream().forEach(l -> System.out.println(l)); 
     return null; 
    }); 

    ssc.start(); 
    ssc.awaitTermination(); 
} 

}

+0

感謝您迴應!現在,當我修改文件內容並移至監控目錄時,Sparkstreaming正在選擇要處理的文件。 – Ravi