Próbuję utworzyć prosty serwer proxy dla połączeń Websocket, używając strumieni Play i akka. przepływ ruchu jest tak:Serwer proxy sieci Web korzystający z kanałów Play 2.6 i akka
(Client) request -> -> request (Server)
Proxy
(Client) response <- <- response (Server)
wpadłem na następujący kod po wykonaniu kilka przykładów:
def socket = WebSocket.accept[String, String] { request =>
val uuid = UUID.randomUUID().toString
// wsOut - actor that deals with incoming websocket frame from the Client
// wsIn - publisher of the frame for the Server
val (wsOut: ActorRef, wsIn: Publisher[String]) = {
val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail)
val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false)
source.toMat(sink)(Keep.both).run()
}
// sink that deals with the incoming messages from the Server
val serverIncoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println("The server has sent: " + message.text)
}
// source for sending a message over the WebSocket
val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
serverOutgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(serverIncoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
val actor = system.actorOf(WebSocketProxyActor.props(wsOut, uuid))
val finalFlow = {
val sink = Sink.actorRef(actor, akka.actor.Status.Success(()))
val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ???
Flow.fromSinkAndSource(sink, source)
}
finalFlow
Z tym kodem, ruch idzie od klienta do serwera proxy do serwera , z powrotem do Proxy i to wszystko. Nie dociera do klienta. Jak mogę to naprawić? myślę, że trzeba jakoś połączyć serverIncoming
opadają na source
w finalFlow
, ale nie mogę dowiedzieć się, jak to zrobić ...
Albo jestem całkowicie błędne z takim podejściem? Czy lepiej używać urządzenia Bidiflow
lub Graph
? Jestem nowy w strumieniach akka i wciąż próbuję to rozgryźć.
To nie jest proxy. Jest to bardzo prosty serwer, który przesuwa łańcuchy znaków i wysyła je do klienta za pomocą gniazd internetowych. –
@morganfreeman Miałem na myśli, że sam kontroler jest proxy. Górna usługa może być zastąpiona przez aktora, który wywołuje zewnętrzną usługę do rzeczywistego przetwarzania, coś w tym stylu: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-integrations.html#Integrating_with_External_Services . Ale prawdopodobnie nie rozumiem dokładnie twojego problemu. – botkop
Prawda, usługa UpperService mogłaby teoretycznie wysłać ramki za pośrednictwem gniazda sieci Web do serwera. Próbowałem go na początku. To gniazdo sieciowe na serwerze to przepływ. Nie mogłem "połączyć" metody odbioru aktora z Źródłem Przepływu i Zlewem przepływu z powrotem do 'gniazda' (dane wracają do klienta). Mógłbym zainicjować przepływ Web Socket w metodzie receive, ale to otwierałoby połączenie z serwerem za każdym razem. –