2015-12-22 53 views
0

我用akka-streams api編寫了一個簡單的流,假設它會處理我的源,但不幸的是它不。我確信我在源頭上做錯了什麼。我只是簡單地創建了一個迭代器,它產生了大量的元素,假設它不重要,因爲akka流api將負責背壓。我做錯了什麼,這是我的迭代器。在阿卡流中沒有正確處理背壓

def createData(args: Array[String]): Iterator[TimeSeriesValue] = { 
var data = new ListBuffer[TimeSeriesValue]() 
for (i <- 1 to range) { 
    sessionId = UUID.randomUUID() 
    for (j <- 1 to countersPerSession) { 
    time = DateTime.now() 
    keyName = s"Encoder-${sessionId.toString}-Controller.CaptureFrameCount.$j" 
    for (k <- 1 to snapShotCount) { 
     time = time.plusSeconds(2) 
     fValue = new Random().nextLong() 
     data += TimeSeriesValue(sessionId, keyName, time, fValue) 
     totalRows += 1 
    } 
    } 
} 
data.iterator 

}

回答

2

的問題主要是在該行

data += TimeSeriesValue(sessionId, keyName, time, fValue) 

您不斷添加到ListBuffer用 「非常大量的元素」。這是咀嚼所有的RAM。 data.iterator行只是簡單地將大量的ListBuffer blob包裝在迭代器中,每次只提供一個元素,它基本上只是一個表演。

您的假設「因爲背壓而無關緊要」,部分事實是,阿卡流將會以被動方式處理TimeSeriesValue值,但即使在到達源之前您仍然在創建大量的值構造函數。

如果你想要這個迭代器是「懶」,需要時即只能產生價值,而不是消耗內存,然後進行以下修改(注意:我解體了代碼,使其更具可讀性):

def createTimeSeries(startTime: Time, snapShotCount : Int, sessionId : UUID, keyName : String) = 
    Iterator.range(1, snapShotCount) 
      .map(_ * 2) 
      .map(startTime plusSeconds _) 
      .map(t => TimeSeriesValue(sessionId, keyName, t, ThreadLocalRandom.current().nextLong())) 

def sessionGenerator(countersPerSession : Int, sessionID : UUID) = 
    Iterator.range(1, countersPerSession) 
      .map(j => s"Encoder-${sessionId.toString}-Controller.CaptureFrameCount.$j") 
      .flatMap { keyName => 
    createTimeSeries(DateTime.now(), snapShotCount, sessionID, keyName) 
    } 

object UUIDIterator extends Iterator[UUID] { 
    def hasNext : Boolean = true 
    def next() : UUID = UUID.randomUUID() 
} 

def iterateOverIDs(range : Int) = 
    UUIDIterator.take(range)    
       .flatMap(sessionID => sessionGenerator(countersPerSession, sessionID)) 

上述每個函數都返回一個Iterator。因此,調用iterateOverIDs應該是瞬時的,因爲沒有立即完成工作並且正在消耗內存。這個迭代器然後可以傳入你的流...

+0

嗨拉蒙,非常感謝迴應。非常有幫助。我完全不瞭解您的代碼,但非常優雅的東西,併爲我工作。我如何成爲斯卡拉風格思維方面的專家,我很難在不可變的異步和功能方面進行思考。任何首選的閱讀或練習? – Lahiru

+1

@Lahiru歡迎您。對於scala/fp的介紹,我絕對推薦coursera課程:https://www.coursera.org/course/progfun。該會話現在不活躍,但我相信你仍然可以觀看視頻。快樂的黑客! –

+2

@Lahiru「scala中的函數式編程」書籍非常好讓你在FP風格中思考,並且具有良好的學習曲線。 – jlr