2015-02-09 44 views
5

我們能夠成功地整合Drools的火花,當我們嘗試應用從Drools的規則,我們能夠爲批處理文件HICH做的是目前在HDFS,但我們試圖用滴料的流媒體文件這樣我們就可以做出決定瞬間,但我們無法弄清楚如何做it.Below是我們正在努力實現的代碼snipet。Drools的火花的流文件

SparkConf conf = new SparkConf().setAppName("sample"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaRDD<String> javaRDD = sc.textFile("/user/root/spark/sample.dat"); 
    List<String> store = new ArrayList<String>(); 
    store = javaRDD.collect(); 

第二種情況,當我們使用流上下文

SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming"); 
    JavaStreamingContext ssc = 
       new JavaStreamingContext(sparkconf, new Duration(1)); 

    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx); 

在第一種情況下,我們能夠適用於可變商店我們的規則,而在第二種情況下,我們無法對應用規則dstream線。如果有人有一些想法,我們如何能得到很大的幫助。提前致謝。

+0

'/ user/root'?呃 – fge 2015-02-09 10:38:12

+0

例如我已經給用戶/根/,原來的將是用戶/ vish /火花/ sample.dat – beginner 2015-02-09 10:40:52

+0

難道@奎師那gajula的回答工作? – gijswijs 2015-05-22 11:55:01

回答

1

下面是完成它的一種方式。

  1. 創建您的知識會議與業務規則第一。

    //Create knowledge and session here 
    KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase(); 
    KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); 
    kbuilder.add(ResourceFactory.newFileResource("rulefile.drl"), 
         ResourceType.DRL); 
    Collection<KnowledgePackage> pkgs = kbuilder.getKnowledgePackages(); 
    kbase.addKnowledgePackages(pkgs); 
    final StatelessKnowledgeSession ksession = kbase.newStatelessKnowledgeSession(); 
    
  2. 使用StreamingContext創建JavaDStream。

    SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming"); 
    JavaStreamingContext ssc = 
          new JavaStreamingContext(sparkconf, new Duration(1)); 
    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx); 
    
  3. 調用DStream的foreachRDD來創建事實並激發您的規則。

    lines.foreachRDD(new Function<JavaRDD<String>, Void>() { 
        @Override 
        public Void call(JavaRDD<String> rdd) throws Exception { 
        List<String> facts = rdd.collect(); 
        //Apply rules on facts here 
        ksession.execute(facts); 
        return null; 
        } 
    });