見工作示例:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import slick.backend.DatabasePublisher
import slick.driver.H2Driver.api._
import scala.concurrent.Await
import scala.concurrent.duration._
case class Emp(id: Int, name: String)
object Demo extends App {
implicit val system = ActorSystem("Sys")
val db = Database.forConfig("h2mem1")
val empTableQuery = TableQuery[EmployeeTable]
val insertQuery = empTableQuery ++= Seq(Emp(1, "emp1"), Emp(2, "emp2"), Emp(3, "emp3"), Emp(4, "emp4"))
val action = DBIO.seq(empTableQuery.schema.create, insertQuery)
//create schema and insert record
Await.result(db.run(action), 1000 second)
// print db record
Await.result(db.run(empTableQuery.result), 1000 second).foreach(println)
val publisher: DatabasePublisher[Emp] = db.stream(empTableQuery.result)
import system.dispatcher
implicit val materializer = ActorMaterializer()
//consume using stream
println("Steaming data::::::::")
val source = Source.fromPublisher(publisher).map(emp => emp.id + " : " + emp.name).runForeach(println)
class EmployeeTable(tag: Tag) extends Table[Emp](tag, "emp") {
val id = column[Int]("id", O.PrimaryKey)
val name = column[String]("name")
def * = (id, name) <>(Emp.tupled, Emp.unapply)
}
source.onComplete(_ => system.terminate())
}
build.sbt
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"mysql" % "mysql-connector-java" % "5.1.36",
"com.typesafe.slick" %% "slick-hikaricp" % "3.1.1",
"ch.qos.logback" % "logback-classic" % "1.1.3",
"com.typesafe.slick" %% "slick" % "3.1.1",
"com.typesafe.akka" %% "akka-stream" % "2.4.6",
"org.scalatest" %% "scalatest" % "2.2.5" % "test",
"com.h2database" % "h2" % "1.4.187"
)
application.conf
h2mem1 = {
url = "jdbc:h2:mem:test1"
driver = org.h2.Driver
connectionPool = disabled
keepAliveConnection = true
}
來源
2016-06-02 18:41:14
Sky
你能顯示完整的代碼片段嗎 – Sky