2016-02-05 78 views
4

我正在做Scala與Java Reactive Spec實現之間的比較,分別使用akka-streamRxJava。我的用例是一個簡單的grep:給定一個目錄,一個文件過濾器和一個搜索文本,我在該目錄中查找所有包含文本的匹配文件。然後流(文件名 - >匹配行)對。 這適用於Java,但對於Scala,沒有任何打印。沒有例外,但沒有輸出。 測試數據是從互聯網下載的,但正如您所看到的,代碼也可以很容易地用任何本地目錄進行測試。Scala vs Java Streaming:Scala不打印任何東西,Java可以工作

斯卡拉

object Transformer { 
    implicit val system = ActorSystem("transformer") 
    implicit val materializer = ActorMaterializer() 
    implicit val executionContext: ExecutionContext = { 
    implicitly 
    } 

    import collection.JavaConverters._ 

    def run(path: String, text: String, fileFilter: String) = { 
    Source.fromIterator {() => 
     Files.newDirectoryStream(Paths.get(path), fileFilter).iterator().asScala 
    }.map(p => { 
     val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList] 
     (p, lines) 
    }) 
     .runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}"))) 
    } 
} 

的Java:使用Scala的測試

public class Transformer { 
    public static void run(String path, String text, String fileFilter) { 
     Observable.from(files(path, fileFilter)).flatMap(p -> { 
      try { 
       return Observable.from((Iterable<Map.Entry<String, List<String>>>) Files.lines(p) 
         .filter(line -> line.contains(text)) 
         .map(String::trim) 
         .collect(collectingAndThen(groupingBy(pp -> p.toAbsolutePath().toString()), Map::entrySet))); 
      } catch (IOException e) { 
       throw new UncheckedIOException(e); 
      } 
     }).toBlocking().forEach(e -> System.out.printf("%s -> %s.%n", e.getKey(), e.getValue())); 
    } 

    private static Iterable<Path> files(String path, String fileFilter) { 
     try { 
      return Files.newDirectoryStream(Paths.get(path), fileFilter); 
     } catch (IOException e) { 
      throw new UncheckedIOException(e); 
     } 
    } 
} 

單元測試

class TransformerSpec extends FlatSpec with Matchers { 
    "Transformer" should "extract temperature" in { 
    Transformer.run(NoaaClient.currentConditionsPath(), "temp_f", "*.xml") 
    } 

    "Java Transformer" should "extract temperature" in { 
    JavaTransformer.run(JavaNoaaClient.currentConditionsPath(false), "temp_f", "*.xml") 
    } 
} 
+1

我不知道阿卡流,但在你的Java版本你將可觀測'toBlocking()'所以你的執行流將無法完成,直到鏈完成,但它看起來像阿卡版本全部運行異步。這可能僅僅是你的VM /測試在Akka版本有機會完成之前終止的事實嗎? – tddmonkey

+0

您是否嘗試過在scala版本中包含打印語句以及它是否實際運行? –

+0

@MrWiggles你很近,請看下面的答案。謝謝。 –

回答

1

蕩,我忘了Source返回Future,這意味着流程從未運行。 @MrWiggles的評論給了我一個提示。以下Scala代碼與Java版本產生相同的結果。

注意:在我的問題的代碼並沒有關閉DirectoryStream其中,對於具有大量文件的目錄,引起了java.io.IOException: Too many open files in system。下面的代碼正確地關閉了資源。

def run(path: String, text: String, fileFilter: String) = { 
    val files = Files.newDirectoryStream(Paths.get(path), fileFilter) 

    val future = Source(files.asScala.toList).map(p => { 
    val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList] 
    (p, lines) 
    }) 
    .filter(!_._2.isEmpty) 
    .runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}"))) 

    Await.result(future, 10.seconds) 

    files.close 

    true // for testing 
}