2017-08-15 89 views
0

我想在我的Akka Streams工作流程中將列表項目轉換爲單個地圖作爲舞臺。舉個例子,說我有下面的課。將轉換列表映射到Akka Stream中的地圖

case class MyClass(myString: String, myInt: Int) 

我想myStringMyClassList一個實例轉換爲Map該鍵他們。

所以,如果我有List(MyClass("hello", 1), MyClass("world", 2), MyClass("hello", 3)),我希望地圖hello映射List(1, 3)world映射List(2)的。

以下是我到目前爲止的內容。

val flowIWant = { 
    Flow[MyClass].map { entry => 
     entry.myString -> entry.myInt 
    } ??? // How to combine tuples into a single map? 
} 

而且,這將是理想的流向最終產生各個地圖的實體,所以我可以用每一獨立工作,爲下一階段(我想每個地圖實體單獨再做一次手術)。

我不確定這是一個fold類型的操作還是什麼。謝謝你的幫助。

+0

每一個都是你寫的:'entry => entry.myString - > entry.myInt'。不確定你的意思/想要達到什麼目的。 –

+0

這隻會將它們轉換爲元組不會嗎?我想彙總數據。所以我想要「你好」 - >列表(1,3)而不是「你好」 - > 1和「你好」 - > 3 –

回答

0

真的不清楚你實際上想要得到什麼。從你說你的問題的方式,我認爲至少有以下變換你可能意味着:

Flow[List[MyClass], Map[String, Int], _] 
Flow[List[MyClass], Map[String, List[Int]], _] 
Flow[MyClass, (String, Int), _] 
Flow[MyClass, (String, List[Int]), _] 

從你的措辭我懷疑很可能是你想要的東西就像是最後一個,但它並沒有真正做感覺有這樣一個轉換,因爲它不能發射任何東西 - 爲了組合所有對應於一個鍵的值,你需要讀取整個輸入。

如果您有一個輸入流MyClass並想從中獲得Map[String, List[Int]],則沒有其他選擇,只能將其連接到摺疊接收器並執行流直到完成。例如:

val source: Source[MyClass, _] = ??? // your source of MyClass instances 

val collect: Sink[MyClass, Future[Map[String, List[Int]]] = 
    Sink.fold[Map[String, List[Int]], MyClass](Map.empty.withDefaultValue(List.empty)) { 
    (m, v) => m + (v.myString -> (v.myInt :: m(v.myString))) 
    } 

val result: Future[Map[String, List[Int]]] = source.toMat(collect)(Keep.right).run() 
0

我想你想scan它:

source.scan((Map.empty[String, Int], None: Option((String, Int))))((acc, next) => { val (map, _) 
    val newMap = map.updated(next._1 -> map.getOrElse(next._1, List())) 
    (newMap, Some(newMap.get(next._1)))}).map(_._2.get) 

這樣,您就可以檢查Map內容直到內存被耗盡。 (與最後一個元素的含量是在最初的元組包裹在一個Option的價值的一部分。)

0

這可能是你在找什麼:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 

import scala.util.{Failure, Success} 

object Stack { 

    def main(args: Array[String]): Unit = { 
    case class MyClass(myString: String, myInt: Int) 
    implicit val actorSystem = ActorSystem("app") 
    implicit val actorMaterializer = ActorMaterializer() 
    import scala.concurrent.ExecutionContext.Implicits.global 

    val list = List(MyClass("hello", 1), MyClass("world", 2), MyClass("hello", 3)) 

    val eventualMap = Source(list).fold(Map[String, List[Int]]())((m, e) => { 
     val newValue = e.myInt :: m.get(e.myString).getOrElse(Nil) 
     m + (e.myString -> newValue) 
    }).runWith(Sink.head) 
    eventualMap.onComplete{ 
     case Success(m) => { 
     println(m) 
     actorSystem.terminate() 
     } 
     case Failure(e) => { 
     e.printStackTrace() 
     actorSystem.terminate() 
     } 
    } 
    } 
} 

有了這個代碼,你會得到下面的輸出:

Map(hello -> List(3, 1), world -> List(2)) 

如果你想有以下輸出:

Vector(Map(), Map(hello -> List(1)), Map(hello -> List(1), world -> List(2)), Map(hello -> List(3, 1), world -> List(2))) 

只需使用掃描而不是摺疊並使用Sink.seq運行。

摺疊和掃描之間的差異是摺疊等待上游完成之前完成按下,而掃描推動每個更新下游。

相關問題