2012-02-07 12 views
9

Używam biblioteki Netty (wersja 4 z GitHub). Działa świetnie w Scali, ale mam nadzieję, że moja biblioteka będzie mogła używać stylu przekazywania kontynuacji dla asynchronicznego oczekiwania.Używanie kontynuacji scala ze słuchaczami netty/NIO

Tradycyjnie z Netty byś zrobił coś takiego (przykład asynchroniczny połączyć pracy):

//client is a ClientBootstrap 
val future:ChannelFuture = client.connect(remoteAddr); 
future.addListener(new ChannelFutureListener { 
    def operationComplete (f:ChannelFuture) = { 
     //here goes the code that happens when the connection is made 
    } 
}) 

Jeśli wdrożenie biblioteki (którym jestem), to w zasadzie są trzy proste opcje, aby umożliwić użytkownikowi biblioteki do robienia rzeczy po nawiązaniu połączenia:

  1. Po prostu zwróć ChannelFuture ze swojej metody connect i pozwól użytkownikowi poradzić sobie z tym - to nie daje wiele abstrakcji z netty.
  2. Pobierz ChannelFutureListener jako parametr metody connect i dodaj go jako detektor do ChannelFuture.
  3. Take obiektu funkcja zwrotna jako parametr swojej metody connect i nazywają to od wewnątrz ChannelFutureListener tworzonej (byłoby to zrobić za pomocą zwrotnej napędzane stylu trochę jak node.js)

Co Ja próbowanie zrobić to czwarta opcja; Nie uwzględniłem tego w powyższym liczniku, ponieważ nie jest to proste.

chcę użyć Scala rozdzielany kontynuacje, aby korzystanie z biblioteki być trochę jak blokowanie biblioteki, ale będzie to powodują blokowania za kulisami:

class MyLibraryClient { 
    def connect(remoteAddr:SocketAddress) = { 
     shift { retrn: (Unit => Unit) => { 
       val future:ChannelFuture = client.connect(remoteAddr); 
       future.addListener(new ChannelFutureListener { 
        def operationComplete(f:ChannelFuture) = { 
         retrn(); 
        } 
       }); 
      } 
     } 
    } 
} 

Wyobraź inne operacje odczytu/zapisu są realizowane w taki sam sposób. Celem tej istoty, że kod użytkownika może wyglądać mniej więcej tak:

reset { 
    val conn = new MyLibraryClient(); 
    conn.connect(new InetSocketAddress("127.0.0.1", 1337)); 
    println("This will happen after the connection is finished"); 
} 

Innymi słowy, program będzie wyglądać prosty program blokujący stylu, ale za kulisami nie będzie żadnego blokowania lub gwintowania .

Kłopot, na który natrafiłem, polega na tym, że nie w pełni rozumiem, jak działa pisanie ciągów z ograniczeniami. Kiedy próbuję zaimplementować go w powyższy sposób, kompilator narzeka, że ​​moja implementacja operationComplete faktycznie zwraca Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit] zamiast Unit. Rozumiem, że istnieje coś takiego jak "Css" w scala CPS, w którym musisz opisać metodę zwracania metody shift za pomocą @suspendable, która jest przekazywana do stosu wywołań aż do reset, ale nie ma żadnego sposobu na pogodzenie się to z wcześniej istniejącą biblioteką Java, która nie ma pojęcia rozdzielonych kontynuacji.

Wydaje mi się, że tak naprawdę musi być jakiś sposób obejścia tego problemu - jeśli Swarm może serializować kontynuacje i zacinać je w sieci, aby można je było obliczyć w innym miejscu, musi być możliwe po prostu wywołanie kontynuacji z wcześniej istniejącej klasy Java. Ale nie mogę zrozumieć, jak można to zrobić. Czy musiałbym przerobić całe części netty w Scali, aby tak się stało?

+0

Nie wiem howto rozwiązać rzeczy scala ale proponuję wobec swojego pomysłu. Pozwól mi powiedzieć, dlaczego. Ale sprawiając, że użytkownik "nie zdaje sobie sprawy" z asynchronicznej natury twojej biblioteki, powiesz mu, że to jest ok "blokowanie" połączeń w kodzie odbiornika. W rzeczywistości nie wiedziałby, że nawet pisze swój kod w słuchaczu. Wywołanie blokady w słuchaczu może prowadzić do różnego rodzaju problemów. Problem, który można zauważyć przez większość czasu, polega na tym, że "spowalnia" inne zadania związane z komputerem i ogranicza w ten sposób przepustowość. –

+1

Masz dobry punkt, ale nie zgadzam się. Myślę, że użytkownik mojej biblioteki, jeśli nawet jest poza mną, prawdopodobnie będzie musiał zrozumieć, co oznacza "reset", a zatem zrozumie, że połączenia nie są blokujące. To jest naprawdę tylko droga do A) lepszego zrozumienia ograniczonych kontynuacji, oraz B) eksperymentowania z pisaniem kodu opartego na zasadzie zwrotnej w czystszy sposób. – Jeremy

Odpowiedz

4

Znalazłem to wyjaśnienie Scala's continuations niezwykle pomocne, gdy zacząłem. Zwróć szczególną uwagę na części, w których wyjaśnia on: shift[A, B, C] i . Dodanie atrapa null jako ostatniej instrukcji z operationComplete powinno pomóc.

Przy okazji należy wywołać retrn() w innym reset, jeśli może być w nim zagnieżdżona shift.

Edit: Tutaj jest przykład roboczych

import scala.util.continuations._ 
import java.util.concurrent.Executors 

object Test { 

    val execService = Executors.newFixedThreadPool(2) 

    def main(args: Array[String]): Unit = { 
    reset { 
     val conn = new MyLibraryClient(); 
     conn.connect("127.0.0.1"); 
     println("This will happen after the connection is finished"); 
    } 
    println("Outside reset"); 
    } 
} 

class ChannelFuture { 
    def addListener(listener: ChannelFutureListener): Unit = { 
    val future = this 
    Test.execService.submit(new Runnable { 
     def run(): Unit = { 
     listener.operationComplete(future) 
     } 
    }) 
    } 
} 

trait ChannelFutureListener { 
    def operationComplete(f: ChannelFuture): Unit 
} 

class MyLibraryClient { 
    def connect(remoteAddr: String): [email protected][Unit] = { 
    shift { 
     retrn: (Unit => Unit) => { 
     val future: ChannelFuture = new ChannelFuture() 
     future.addListener(new ChannelFutureListener { 
      def operationComplete(f: ChannelFuture): Unit = { 
      println("operationComplete starts") 
      retrn(); 
      null 
      } 
     }); 
     } 
    } 
    } 
} 

z możliwością wyjścia:

Outside reset 
operationComplete starts 
This will happen after the connection is finished 
+0

To sprawia, że ​​kompilator jest szczęśliwy, a nawet wydaje się działać poprawnie. Kluczem jest to, że przesunąłeś 'shift' poza anonimowe' ChannelFutureListener' i użyłeś zamknięcia, aby wywołać kontynuację z wewnątrz 'operationComplete'. Nie jestem pewien, czy rozumiem, dlaczego to działa, a inaczej nie, ale wezmę to. Dzięki! – Jeremy

+0

I to jest bardzo dobra lektura o kontynuacjach scala. Powinni usunąć bezwartościowe przykłady ze strony scala-lang.org o kontynuacji i zastąpić je artykułem, który łączyłeś. – Jeremy

+0

@Jeremy tak, ten artykuł jest bardzo dobry :) – shams