0
我的應用程序要求我有多個線程運行從各個HDFS節點獲取數據。爲此,我使用線程執行程序池並分叉線程。 在分叉:在Scala中替換線程執行程序池
val pathSuffixList = fileStatuses.getOrElse("FileStatus", List[Any]()).asInstanceOf[List[Map[String, Any]]]
pathSuffixList.foreach(block => {
ConsumptionExecutor.execute(new Consumption(webHdfsUri,block))
})
我的類消費:
class Consumption(webHdfsUri: String, block:Map[String,Any]) extends Runnable {
override def run(): Unit = {
val uriSplit = webHdfsUri.split("\\?")
val fileOpenUri = uriSplit(0) + "/" + block.getOrElse("pathSuffix", "").toString + "?op=OPEN"
val inputStream = new URL(fileOpenUri).openStream()
val datumReader = new GenericDatumReader[Void]()
val dataStreamReader = new DataFileStream(inputStream, datumReader)
// val schema = dataStreamReader.getSchema()
val dataIterator = dataStreamReader.iterator()
while (dataIterator.hasNext) {
println(" data : " + dataStreamReader.next())
}
}
}
ConsumptionExecutor:
object ConsumptionExecutor{
val counter: AtomicLong = new AtomicLong()
val executionContext: ExecutorService = Executors.newCachedThreadPool(new ThreadFactory {
def newThread(r: Runnable): Thread = {
val thread: Thread = new Thread(r)
thread.setName("ConsumptionExecutor-" + counter.incrementAndGet())
thread
}
})
executionContext.asInstanceOf[ThreadPoolExecutor].setMaximumPoolSize(200)
def execute(trigger: Runnable) {
executionContext.execute(trigger)
}
}
不過,我想用阿卡流/阿卡演員凡在我不需要提供e固定線程池大小和Akka照顧一切。 我對阿卡和流媒體和演員的概念很新。有人能以示例代碼的形式給我任何潛在客戶以適應我的用例嗎? 在此先感謝!