2016-10-16 113 views
0

我想模擬一個系統,它是一個網站的負載生成器。該網站的頁面現在是Strings。同時運行斯卡拉FS2流

type Page = String 

val pages: Vector[Page] = Stream.eval(Task.delay { 
    new Random().alphanumeric.take(10).mkString 
}).repeat.take(100).runLog.unsafeRun 

該網站有被建模作爲開始將瀏覽的網站在隨機時間用戶:

case class User(userName: String) 

def newUser: Task[User] = Task.delay { 
    val random = new Random 
    val userName = random.alphanumeric.take(5).mkString 
    val sleepTime = random.nextInt(10000) 
    println(s"${new Date}: ${Thread.currentThread.getName} Generating user ${userName} after ${sleepTime}ms") 
    Thread.sleep(sleepTime) 
    User(userName) 
} 

每個用戶過渡一頁一頁,那就是建模爲:

case class Transition(from: Page, to: Page, thinkTime: Int) 

def newTransitions(user: User): Stream[Task, Transition] = { 
    val init = Transition(pages(0), pages(1), 100) 
    Stream.iterateEval(init) { t => nextTransition(user)(t) } 
} 

def nextTransition(user: User)(curr: Transition): Task[Transition] = Task.delay { 
    val random = new Random 
    val from: Page = curr.to 
    val to: Page = pages(random.nextInt(pages.size)) 
    val thinkTime: Int = random.nextInt(10000) 
    Thread.sleep(thinkTime) 
    println(s"${new Date}: ${Thread.currentThread.getName} ${user} transitioning from ${from} to ${to} after thinking for ${thinkTime}ms") 
    Transition(from, to, thinkTime) 
} 

我試圖運行這個模擬,4個用戶在隨機間隔後開始瀏覽本網站,每個用戶隨機前往三頁:

implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker") 
val users: Stream[Task, User] = Stream.eval(newUser).repeat 
val transitions: Stream[Task, Stream[Task, Transition]] = users.map(newTransitions(_).take(3)) 

transitions.take(4).runLog.async.unsafeRun.map { _.runLog.async.unsafeRun } 

輸出看起來像:

Sun Oct 16 12:58:04 PDT 2016: worker-1 Generating user 97qkE after 3700ms 
Sun Oct 16 12:58:08 PDT 2016: worker-1 Generating user sMIJs after 2074ms 
Sun Oct 16 12:58:10 PDT 2016: worker-1 Generating user CpWWf after 1334ms 
Sun Oct 16 12:58:11 PDT 2016: worker-1 Generating user xlQmj after 8825ms 
Sun Oct 16 12:58:28 PDT 2016: worker-7 User(97qkE) transitioning from sLBbTbXKhO to 7ROMneDkzl after thinking for 8040ms 
Sun Oct 16 12:58:34 PDT 2016: worker-7 User(97qkE) transitioning from 7ROMneDkzl to AK66SuDENi after thinking for 6227ms 
Sun Oct 16 12:58:41 PDT 2016: worker-6 User(sMIJs) transitioning from sLBbTbXKhO to U2Qu8Th1oH after thinking for 6599ms 
Sun Oct 16 12:58:46 PDT 2016: worker-6 User(sMIJs) transitioning from U2Qu8Th1oH to 3bRBWYUeS0 after thinking for 4920ms 
Sun Oct 16 12:58:52 PDT 2016: worker-4 User(CpWWf) transitioning from sLBbTbXKhO to ZVwRydhPe9 after thinking for 6395ms 
Sun Oct 16 12:58:54 PDT 2016: worker-4 User(CpWWf) transitioning from ZVwRydhPe9 to yHtly5IrMC after thinking for 1963ms 
Sun Oct 16 12:58:57 PDT 2016: worker-2 User(xlQmj) transitioning from sLBbTbXKhO to 6jn7TmxW0O after thinking for 2565ms 
Sun Oct 16 12:59:05 PDT 2016: worker-2 User(xlQmj) transitioning from 6jn7TmxW0O to FFROVBvpHJ after thinking for 8535ms 

即沒有貨幣,所有的隨機睡在嚴格的順序發生。

如何更改此程序,使這些不同的流(新用戶開始瀏覽網站,以及由每個用戶進行的轉換)同時發生?


這裏是我完整的程序:

import fs2._ 
import scala.util.Random 
import java.util.Date 

object Question { 
    type Page = String 

    val pages: Vector[Page] = Stream.eval(Task.delay { 
    new Random().alphanumeric.take(10).mkString 
    }).repeat.take(100).runLog.unsafeRun 

    case class User(userName: String) 

    case class Transition(from: Page, to: Page, thinkTime: Int) 

    def newUser: Task[User] = Task.delay { 
    val random = new Random 
    val userName = random.alphanumeric.take(5).mkString 
    val sleepTime = random.nextInt(10000) 
    println(s"${new Date}: ${Thread.currentThread.getName} Generating user ${userName} after ${sleepTime}ms") 
    Thread.sleep(sleepTime) 
    User(userName) 
    } 

    def nextTransition(user: User)(curr: Transition): Task[Transition] = Task.delay { 
    val random = new Random 
    val from: Page = curr.to 
    val to: Page = pages(random.nextInt(pages.size)) 
    val thinkTime: Int = random.nextInt(10000) 
    Thread.sleep(thinkTime) 
    println(s"${new Date}: ${Thread.currentThread.getName} ${user} transitioning from ${from} to ${to} after thinking for ${thinkTime}ms") 
    Transition(from, to, thinkTime) 
    } 

    def newTransitions(user: User): Stream[Task, Transition] = { 
    val init = Transition(pages(0), pages(1), 100) 
    Stream.iterateEval(init) { t => nextTransition(user)(t) } 
    } 

    def main(args: Array[String]) { 
    implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker") 
    val users: Stream[Task, User] = Stream.eval(newUser).repeat 
    val transitions: Stream[Task, Stream[Task, Transition]] = users.map(newTransitions(_).take(3)) 

    transitions.take(4).runLog.async.unsafeRun.map { _.runLog.async.unsafeRun } 
    } 
} 

回答

3

首先,在你的object Question,貌似有一個錯字。記得切換:

Thread.sleep(sleepTime) 
println(s"${new Date}: ${Thread.currentThread.getName} Generating user ${userName} after ${sleepTime}ms") 

接下來,你想使用Task { ... },而不是Task.delay,因爲你想在工作線程中運行它。

最後,在主函數的最後一行應該是這樣的:

fs2.concurrent.join(maxOpen = 3)(transitions).run.unsafeRun 

而且,我猜的Thread.sleep只是佔位符 - 你不會希望有那些實際的代碼。 println對於調試很好,但最好是將日誌記錄作爲單獨的流構造一旦完成。

+0

謝謝,這工作! –