2015-01-21 76 views
0

我的應用程序要求我有多個線程運行從各個HDFS節點獲取數據。爲此,我使用線程執行程序池並分叉線程。 在分叉:在Scala中替換線程執行程序池

val pathSuffixList = fileStatuses.getOrElse("FileStatus", List[Any]()).asInstanceOf[List[Map[String, Any]]] 
    pathSuffixList.foreach(block => { 
    ConsumptionExecutor.execute(new Consumption(webHdfsUri,block)) 
    }) 

我的類消費:

class Consumption(webHdfsUri: String, block:Map[String,Any]) extends Runnable { 

     override def run(): Unit = { 
     val uriSplit = webHdfsUri.split("\\?") 
     val fileOpenUri = uriSplit(0) + "/" + block.getOrElse("pathSuffix", "").toString + "?op=OPEN" 
     val inputStream = new URL(fileOpenUri).openStream() 
     val datumReader = new GenericDatumReader[Void]() 
     val dataStreamReader = new DataFileStream(inputStream, datumReader) 
     //  val schema = dataStreamReader.getSchema() 
     val dataIterator = dataStreamReader.iterator() 
     while (dataIterator.hasNext) { 
      println(" data : " + dataStreamReader.next()) 
     } 
     } 

    } 

ConsumptionExecutor:

object ConsumptionExecutor{ 

    val counter: AtomicLong = new AtomicLong() 

    val executionContext: ExecutorService = Executors.newCachedThreadPool(new ThreadFactory { 
    def newThread(r: Runnable): Thread = { 
     val thread: Thread = new Thread(r) 
     thread.setName("ConsumptionExecutor-" + counter.incrementAndGet()) 
     thread 
    } 
    }) 
    executionContext.asInstanceOf[ThreadPoolExecutor].setMaximumPoolSize(200) 

    def execute(trigger: Runnable) { 
    executionContext.execute(trigger) 
    } 

} 

不過,我想用阿卡流/阿卡演員凡在我不需要提供e固定線程池大小和Akka照顧一切。 我對阿卡和流媒體和演員的概念很新。有人能以示例代碼的形式給我任何潛在客戶以適應我的用例嗎? 在此先感謝!

回答

1

的想法是爲要讀取每個HDFS節點創建ActorPublisher一個(子類)實例,然後Merge他們爲多個Source S IN一個FlowGraph

事情是這樣的僞代碼,其中的ActorPublisher源的細節留給了:

val g = PartialFlowGraph { implicit b => 
    import FlowGraphImplicits._ 
    val in1 = actorSource1 
    val in2 = actorSource2 
    // etc. 

    val out = UndefinedSink[T] 
    val merge = Merge[T] 

    in1 ~> merge ~> out 
    in2 ~> merge 
    // etc. 
} 

這對於演員的來源收集的只是遍歷它們並添加邊緣到改善merge爲每一個,但這給出了這個想法。

相關問題