2016-04-04 5 views
7

W przykładach korzystania ze strumieni reaktywnych przedstawiono tylko dane do odczytu w charakterze DatabasePublisher. Ale co się dzieje, gdy chcesz używać bazy danych jako Zlewu i zwrotu wstecznego na podstawie współczynnika wstawienia?W jaki sposób wykorzystywane są strumienie reaktywne używane w programie Slick do wstawiania danych?

Szukałem odpowiednika DatabaseSubscriber, ale on nie istnieje. Więc pytanie brzmi, czy mam źródła, mówią:

val source = Source(0 to 100)

jak mogę kreta zlewu z Slick że pisze te wartości do tabeli ze schematem:

create table NumberTable (value INT)

Odpowiedz

7

Serial Inserts

Najprostszym sposobem byłoby zrobienie inserts w ramach Sink.foreach.

Zakładając, że użył „NumberTable” schema code generation i dodatkowo zakładając tabela o nazwie

//Tables file was auto-generated by the schema code generation 
import Tables.{Numbertable, NumbertableRow} 

val numberTableDB = Database forConfig "NumberTableConfig" 

możemy napisać funkcję, która robi wstawiania

def insertIntoDb(num : Int) = 
    numberTableDB run (Numbertable += NumbertableRow(num)) 

I że funkcja może być umieszczony w zlewie

val insertSink = Sink[Int] foreach insertIntoDb 

Source(0 to 100) runWith insertSink 

Ba tched Wstawki

można dodatkowo rozszerzyć metodologię Sink wkładkami dozowania N naraz:

def batchInsertIntoDb(nums : Seq[Int]) = 
    numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply)) 

val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb 

Ten batched Umywalka może być podawany przez Flow który dokłada grupowania partii:

val batchSize = 10 

Source(0 to 100).via(Flow[Int].grouped(batchSize)) 
       .runWith(batchInsertSink) 
2

Mimo że można do tego celu użyć Sink.foreach (o czym wspomniał Ramon), bezpieczniej i prawdopodobnie szybciej (uruchamiając przekładki równolegle), można użyć mapAsyncFlow. Problemem, który napotkasz przy użyciu Sink.foreach jest to, że nie ma on wartości zwracanej. Wstawienie do bazy danych za pomocą slicks db.run Metoda zwraca Future, która następnie ucieknie z oparów zwrócony Future[Done], który kończy się, gdy kończy się Sink.foreach.

implicit val system = ActorSystem("system") 
implicit val materializer = ActorMaterializer() 

class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") { 
    def value = column[Int]("value") 
    def * = value 
} 

val numbers = TableQuery[Numbers] 

val db = Database.forConfig("postgres") 
Await.result(db.run(numbers.schema.create), Duration.Inf) 

val streamFuture: Future[Done] = Source(0 to 100) 
    .runWith(Sink.foreach[Int] { (i: Int) => 
    db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done")) 
    }) 
Await.result(streamFuture, Duration.Inf) 
println("stream 1 done") 

//// sample 1 output: //// 
// stream 1 insert 1 done 
// ... 
// stream 1 insert 99 done 
// stream 1 done <-- stream Future[Done] returned before inserts finished 
// stream 1 insert 100 done 

Z drugiej strony def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])Flow pozwala na uruchamianie wkładki równolegle poprzez paramerter równoległości i przyjmuje funkcję z wartością upstream się do przyszłości jakiegoś typu. To pasuje do naszej funkcji i => db.run(numbers += i). Wielką zaletą tego Flow jest to, że następnie podaje wynik tych Futures w dół.

val streamFuture2: Future[Done] = Source(0 to 100) 
    .mapAsync(1) { (i: Int) => 
    db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r } 
    } 
    .runWith(Sink.ignore) 
Await.result(streamFuture2, Duration.Inf) 
println("stream 2 done") 

//// sample 2 output: //// 
// stream 2 insert 1 done 
// ... 
// stream 2 insert 100 done 
// stream 1 done <-- stream Future[Done] returned after inserts finished 

celu udowodnienia tej tezy można jeszcze powrócić prawdziwy wynik ze strumienia zamiast Future[Done] (Z Gotowe reprezentujących jednostki).Strumień ten doda również wyższą wartość paralelizmu i grupowania dla dodatkowej wydajności. *

val streamFuture3: Future[Int] = Source(0 to 100) 
    .via(Flow[Int].grouped(10)) // Batch in size 10 
    .mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count 
    .runWith(Sink.fold(0)(_+_)) // count all inserts and return total 
val rowsInserted = Await.result(streamFuture3, Duration.Inf) 
println(s"stream 3 done, inserted $rowsInserted rows") 

// sample 3 output: 
// stream 3 done, inserted 101 rows 
  • Uwaga: Prawdopodobnie nie będzie widać lepszą wydajność dla takiego małego zbioru danych, ale kiedy mam do czynienia z 1,7 wstawić udało mi się uzyskać najlepszą wydajność na moim komputerze z wielkość partii 1000 i wartość równoległości 8, lokalnie z postgresql. Było to około dwa razy tyle, co nie równolegle. Jak zawsze, gdy masz do czynienia z wynikami, wyniki mogą się różnić i powinieneś mierzyć dla siebie.
Powiązane problemy