Wymyśliłem rozwiązanie, używając GraphStageWithMaterializedValue
. Ta koncepcja została zapożyczona od Play's maxLength
body parser. Kluczową różnicą między moją pierwszą próbą w moim pytaniu (która nie kompiluje) jest to, że zamiast próbować zmutować strumień, powinienem użyć zmaterializowanej wartości do przekazania informacji o stanie przetwarzania. Podczas gdy ja stworzyłem Flow[ByteString, Either[Result, ByteString], NotUsed]
, okazało się, że potrzebuję Flow[ByteString, ByteString, Future[Boolean]]
.
Więc z tym, moja parser
funkcja kończy się wyglądać jak ten:
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
val parserSink: Sink[ByteString, Future[Either[Result, A]]] = otherParser.apply(request).toSink
Accumulator(flow.toMat(parserSink) { (statusFuture: Future[Boolean], resultFuture: Future[Either[Result, A]]) =>
statusFuture.flatMap { success =>
if (success) {
resultFuture.map {
case Left(result) => Left(result)
case Right(a) => Right(a)
}
} else {
Future.successful(Left(BadRequest))
}
}
})
}
Kluczem jest to, to jedno:
val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
Reszta niby wraca na swoje miejsce, gdy jesteś w stanie utwórz ten przepływ. Niestety, BodyValidator
jest dość gadatliwy i czuje się trochę jak na kotle. W każdym razie jest to w większości łatwe do odczytania. GraphStageWithMaterializedValue
oczekuje, że zaimplementujesz tutaj def shape: S
(S
is FlowShape[ByteString, ByteString]
) w celu określenia typu wejścia i typu wyjścia tego wykresu. Oczekuje również, że użytkownik zostanie przeniesiony tutaj), aby określić, co powinien faktycznie zrobić wykres. Poniżej znajduje się pełny kod BodyValidator
(wytłumaczę bardziej szczegółowo poniżej):
class BodyValidator(expected: Option[String]) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Boolean]] {
val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
val logic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
setHandler(in, new InHandler {
def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out, chunk)
}
override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
})
}
(logic, status.future)
}
}
najpierw chcą stworzyć Inlet
i Outlet
skonfigurować wejścia i wyjścia na wykresie
val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
Następnie użyj ich do zdefiniowania shape
.
def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
Wewnątrz createLogicAndMaterializedValue
trzeba zainicjować wartość, którą zamierzają materialze. Tutaj wykorzystałem obietnicę, którą można rozwiązać, gdy mam pełne dane ze strumienia. Tworzę również ByteStringBuilder
, aby śledzić dane między iteracjami.
val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
Potem utworzyć GraphStageLogic
faktycznie ustawić ten wykres co robi w każdym punkcie przetwarzania. Ustawiono dwa programy obsługi. Jednym z nich jest InHandler
do obsługi danych, ponieważ pochodzi z źródła poprzedzającego. Drugi to OutHandler
do obsługi danych do wysłania w dół. Nie ma nic naprawdę interesującego w OutHandler
, więc zignoruję to tutaj, poza tym, że muszę koniecznie mieć płytę kotła, aby uniknąć IllegalStateException
. Trzy metody są nadpisywane w InHandler
:i onUpstreamFailure
. onPush
jest wywoływane, gdy nowe dane są gotowe od źródła. W tej metodzie pobieram następny fragment danych, zapisuję go na bodyBuffer
i przesyła dane w dół.
def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out, chunk)
}
onUpstreamFinish
jest wywoływana, gdy kończy się proces upstream (niespodzianka). Tu dochodzi do logiki biznesowej porównywania ciała z nagłówkiem.
override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
onUpstreamFailure
realizowany jest tak, że jeśli coś pójdzie nie tak, mogę oznaczyć zmaterializowanej przyszłość jako zawiodły również.
override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
Wtedy właśnie zwracają GraphStageLogic
Utworzyłem i status.future
jak krotki.