2017-02-07 27 views
5

Biorąc pod uwagę funkcję z tym podpisem:Komponowanie BodyParser w zagraj 2,5

def parser[A](otherParser: BodyParser[A]): BodyParser[A] 

Jak mogę napisać funkcję w taki sposób, że ciało żądanie jest zbadane i zweryfikowane, zanim zostanie przekazany do otherParser?

Dla uproszczenia załóżmy, że chcę sprawdzić, czy nagłówek ("Some-Header", być może) ma wartość, która dokładnie pasuje do ciała. Więc jeśli mam tę akcję:

def post(): Action(parser(parse.tolerantText)) { request => 
    Ok(request.body) 
} 

Kiedy złożyć wniosek jak curl -H "Some-Header: hello" -d "hello" http://localhost:9000/post powinien wrócić „cześć” w organizmie reakcji ze statusem 200. Jeśli moja prośba jest curl -H "Some-Header: hello" -d "hi" http://localhost:9000/post należy zwrócić 400 bez ciała .

Oto, co próbowałem.

Ten nie kompiluje się, ponieważ otherParser(request).through(flow) oczekuje, że flow wyśle ​​ByteString. Pomysł polegał na tym, że przepływ mógł powiadomić akumulator, czy kontynuować przetwarzanie za pomocą wyjścia Either. Nie wiem, jak pozwolić akumulatorowi poznać status poprzedniego kroku.

def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request => 
    val flow: Flow[ByteString, Either[Result, ByteString], NotUsed] = Flow[ByteString].map { bytes => 
    if (request.headers.get("Some-Header").contains(bytes.utf8String)) { 
     Right(bytes) 
    } else { 
     Left(BadRequest) 
    } 
    } 

    val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request) 

    // This fails to compile because flow needs to output a ByteString 
    acc.through(flow) 
} 

Próbowałem również użyć filtra. Ten kompiluje się, a treść odpowiedzi jest poprawna. Jednak zawsze zwraca status odpowiedzi 200 Ok.

def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request => 
    val flow: Flow[ByteString, ByteString, akka.NotUsed] = Flow[ByteString].filter { bytes => 
    request.headers.get("Some-Header").contains(bytes.utf8String) 
    } 

    val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request) 

    acc.through(flow) 
} 

Odpowiedz

3

Wymyśliłem rozwiązanie, używając GraphStageWithMaterializedValue. Ta koncepcja została zapożyczona od Play's maxLength body parser. Kluczową różnicą między moją pierwszą próbą w moim pytaniu (która nie kompiluje) jest to, że zamiast próbować zmutować strumień, powinienem użyć zmaterializowanej wartości do przekazania informacji o stanie przetwarzania. Podczas gdy ja stworzyłem Flow[ByteString, Either[Result, ByteString], NotUsed], okazało się, że potrzebuję Flow[ByteString, ByteString, Future[Boolean]].

Więc z tym, moja parser funkcja kończy się wyglądać jak ten:

def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request => 
    val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header"))) 

    val parserSink: Sink[ByteString, Future[Either[Result, A]]] = otherParser.apply(request).toSink 

    Accumulator(flow.toMat(parserSink) { (statusFuture: Future[Boolean], resultFuture: Future[Either[Result, A]]) => 
    statusFuture.flatMap { success => 
     if (success) { 
     resultFuture.map { 
      case Left(result) => Left(result) 
      case Right(a) => Right(a) 
     } 
     } else { 
     Future.successful(Left(BadRequest)) 
     } 
    } 
    }) 
} 

Kluczem jest to, to jedno:

val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header"))) 

Reszta niby wraca na swoje miejsce, gdy jesteś w stanie utwórz ten przepływ. Niestety, BodyValidator jest dość gadatliwy i czuje się trochę jak na kotle. W każdym razie jest to w większości łatwe do odczytania. GraphStageWithMaterializedValue oczekuje, że zaimplementujesz tutaj def shape: S (S is FlowShape[ByteString, ByteString]) w celu określenia typu wejścia i typu wyjścia tego wykresu. Oczekuje również, że użytkownik zostanie przeniesiony tutaj), aby określić, co powinien faktycznie zrobić wykres. Poniżej znajduje się pełny kod BodyValidator (wytłumaczę bardziej szczegółowo poniżej):

class BodyValidator(expected: Option[String]) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Boolean]] { 
    val in = Inlet[ByteString]("BodyValidator.in") 
    val out = Outlet[ByteString]("BodyValidator.out") 

    override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out) 

    override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = { 
    val status = Promise[Boolean]() 
    val bodyBuffer = new ByteStringBuilder() 

    val logic = new GraphStageLogic(shape) { 
     setHandler(out, new OutHandler { 
     override def onPull(): Unit = pull(in) 
     }) 

     setHandler(in, new InHandler { 
     def onPush(): Unit = { 
      val chunk = grab(in) 
      bodyBuffer.append(chunk) 
      push(out, chunk) 
     } 

     override def onUpstreamFinish(): Unit = { 
      val fullBody = bodyBuffer.result() 
      status.success(expected.map(ByteString(_)).contains(fullBody)) 
      completeStage() 
     } 

     override def onUpstreamFailure(e: Throwable): Unit = { 
      status.failure(e) 
      failStage(e) 
     } 
     }) 
    } 

    (logic, status.future) 
    } 
} 

najpierw chcą stworzyć Inlet i Outlet skonfigurować wejścia i wyjścia na wykresie

val in = Inlet[ByteString]("BodyValidator.in") 
val out = Outlet[ByteString]("BodyValidator.out") 

Następnie użyj ich do zdefiniowania shape.

def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out) 

Wewnątrz createLogicAndMaterializedValue trzeba zainicjować wartość, którą zamierzają materialze. Tutaj wykorzystałem obietnicę, którą można rozwiązać, gdy mam pełne dane ze strumienia. Tworzę również ByteStringBuilder, aby śledzić dane między iteracjami.

val status = Promise[Boolean]() 
val bodyBuffer = new ByteStringBuilder() 

Potem utworzyć GraphStageLogic faktycznie ustawić ten wykres co robi w każdym punkcie przetwarzania. Ustawiono dwa programy obsługi. Jednym z nich jest InHandler do obsługi danych, ponieważ pochodzi z źródła poprzedzającego. Drugi to OutHandler do obsługi danych do wysłania w dół. Nie ma nic naprawdę interesującego w OutHandler, więc zignoruję to tutaj, poza tym, że muszę koniecznie mieć płytę kotła, aby uniknąć IllegalStateException. Trzy metody są nadpisywane w InHandler:i onUpstreamFailure. onPush jest wywoływane, gdy nowe dane są gotowe od źródła. W tej metodzie pobieram następny fragment danych, zapisuję go na bodyBuffer i przesyła dane w dół.

def onPush(): Unit = { 
    val chunk = grab(in) 
    bodyBuffer.append(chunk) 
    push(out, chunk) 
} 

onUpstreamFinish jest wywoływana, gdy kończy się proces upstream (niespodzianka). Tu dochodzi do logiki biznesowej porównywania ciała z nagłówkiem.

override def onUpstreamFinish(): Unit = { 
    val fullBody = bodyBuffer.result() 
    status.success(expected.map(ByteString(_)).contains(fullBody)) 
    completeStage() 
} 

onUpstreamFailure realizowany jest tak, że jeśli coś pójdzie nie tak, mogę oznaczyć zmaterializowanej przyszłość jako zawiodły również.

override def onUpstreamFailure(e: Throwable): Unit = { 
    status.failure(e) 
    failStage(e) 
} 

Wtedy właśnie zwracają GraphStageLogic Utworzyłem i status.future jak krotki.

Powiązane problemy