2012-04-27 11 views
7

Więc Play2.0 Enumeratee page pokazuje przykład stosując metodę, &> lub through Aby zmienić Enumerator[String] w produkt Enumerator[Int]:Jak napisać enumeratee do kawałka moduł wyliczający wzdłuż różnych granic

val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt } 
val ints: Enumerator[Int] = strings &> toInt 

Istnieje również Enumeratee.grouped enumeratee, aby utworzyć moduł wyliczający porcje z poszczególnych elementów. Wydawało się, że działa dobrze.

Ale widzę, że zwykły wkład będzie w postaci Array[Byte] (który jest zwracany przez Enumerator.fromFile i Enumerator.fromStream). Mając to na uwadze chciałbym wziąć te wejścia Array[Byte] i przekształcić je w Enumerator[String], na przykład, gdzie każdy ciąg jest linią (zakończoną przez '\n'). Granice dla linii i elementów Array[Byte] zwykle nie pasują. Jak napisać moduł wyliczający, który może przekonwertować porwane tablice na fragmenty łańcuchów?

Celem jest przeniesienie tych linii z powrotem do przeglądarki, ponieważ stają się one dostępne dla każdego Array[Byte] i pozostawienie pozostałych bajtów, które nie były częścią kompletnej linii, dopóki nie pojawi się kolejny fragment wejściowy.

Idealnie chciałabym mieć metodę, która punktację iter: Iteratee[Array[Byte], T] i Enumerator[Array[Byte]] dadzą mi się Enumerator[T], gdzie moi elementy T zostały przeanalizowane przez iter.

Dodatkowe informacje: Miałem trochę czasu, aby posprzątać mój kod i tutaj jest konkretny przykład tego, co próbuję zrobić. Mam następujące iteratees które wykrywają następny wiersz:

import play.api.libs.iteratee._ 
type AB = Array[Byte] 

def takeWhile(pred: Byte => Boolean): Iteratee[AB, AB] = { 
    def step(e: Input[AB], acc: AB): Iteratee[AB, AB] = e match { 
    case Input.EOF => Done(acc, Input.EOF) 
    case Input.Empty => Cont(step(_, acc)) 
    case Input.El(arr) => 
     val (taking, rest) = arr.span(pred) 
     if (rest.length > 0) Done(acC++ taking, Input.El(rest)) 
     else Cont(step(_, acC++ taking)) 
    } 
    Cont(step(_, Array())) 
} 

val line = for { 
    bytes <- takeWhile(b => !(b == '\n' || b == '\r')) 
    _  <- takeWhile(b => b == '\n' || b == '\r') 
} yield bytes 

i co chciałbym zrobić to coś takiego:

Ok.stream(Enumerator.fromFile(filename) &> chunkBy(line)).as("text/plain") 

Odpowiedz

5

https://github.com/playframework/Play20/commit/f979006a7e2c1c08ca56ee0bae67b5463ee099c1#L3R131 Czy coś podobnego do tego, co robisz. Poprawiłem zgrupowane, aby dbać o pozostałe dane wejściowe. Kod zasadniczo wygląda następująco:

val upToNewLine = 
    Traversable.splitOnceAt[String,Char](_ != '\n') &>> 
    Iteratee.consume() 

Enumeratee.grouped(upToNewLine) 

Również muszę naprawić powtórki w taki sam sposób

+0

Chłodzenia. Czułem, że "grupa" powinna zrobić to, co chciałem. – huynhjl

2

Oto co mam po kilku godzinach eksperymentów. Mam nadzieję, że ktoś może wymyślić bardziej eleganckie wdrożenie, ponieważ ledwo mogę nadążyć za moim.

def chunkBy(chunker: Iteratee[AB, AB]) = new Enumeratee[AB, AB] { 
    def applyOn[A](inner: Iteratee[AB, A]): Iteratee[AB, Iteratee[AB, A]] = { 
    def step(e: Input[AB], in: Iteratee[AB, A], leftover: Input[AB]): 
      Iteratee[AB, Iteratee[AB, A]] = { 
     e match { 
     case Input.EOF => 
      // if we have a leftover and it's a chunk, then output it 
      leftover match { 
      case Input.EOF | Input.Empty => Done(in, leftover) 
      case Input.El(_) => 
       val lastChunk = Iteratee.flatten(Enumerator.enumInput(leftover) 
       >>> Enumerator.eof |>> chunker) 
       lastChunk.pureFlatFold(
       done = { (chunk, rest) => 
        val nextIn = Iteratee.flatten(Enumerator(chunk) |>> in) 
        nextIn.pureFlatFold(
        done = (a, e2) => Done(nextIn, e2), 
        // nothing more will come 
        cont = k => Done(nextIn, Input.EOF), 
        error = (msg, e2) => Error(msg, e2)) 
       }, 
       // not enough content to get a chunk, so drop content 
       cont = k => Done(in, Input.EOF), 
       error = (msg, e2) => Error(msg, e2)) 
      } 
     case Input.Empty => Cont(step(_, in, leftover)) 
     case Input.El(arr) => 
      // feed through chunker 
      val iChunks = Iteratee.flatten(
      Enumerator.enumInput(leftover) 
       >>> Enumerator(arr) 
       >>> Enumerator.eof // to extract the leftover 
       |>> repeat(chunker)) 
      iChunks.pureFlatFold(
      done = { (chunks, rest) => 
       // we have our chunks, feed them to the inner iteratee 
       val nextIn = Iteratee.flatten(Enumerator(chunks: _*) |>> in) 
       nextIn.pureFlatFold(
       done = (a, e2) => Done(nextIn, e2), 
       // inner iteratee needs more data 
       cont = k => Cont(step(_: Input[AB], nextIn, 
        // we have to ignore the EOF we fed to repeat 
        if (rest == Input.EOF) Input.Empty else rest)), 
       error = (msg, e2) => Error(msg, e2)) 
      }, 
      // not enough content to get a chunk, continue 
      cont = k => Cont(step(_: Input[AB], in, leftover)), 
      error = (msg, e2) => Error(msg, e2)) 
     } 
    } 
    Cont(step(_, inner, Input.Empty)) 
    } 
} 

Oto definicja z moim zwyczajem repeat:

// withhold the last chunk so that it may be concatenated with the next one 
def repeat(chunker: Iteratee[AB, AB]) = { 
    def loop(e: Input[AB], ch: Iteratee[AB, AB], acc: Vector[AB], 
     leftover: Input[AB]): Iteratee[AB, Vector[AB]] = e match { 
    case Input.EOF => ch.pureFlatFold(
     done = (a, e) => Done(acc, leftover), 
     cont = k => k(Input.EOF).pureFlatFold(
     done = (a, e) => Done(acc, Input.El(a)), 
     cont = k => sys.error("divergent iter"), 
     error = (msg, e) => Error(msg, e)), 
     error = (msg, e) => Error(msg, e)) 
    case Input.Empty => Cont(loop(_, ch, acc, leftover)) 
    case Input.El(_) => 
     val i = Iteratee.flatten(Enumerator.enumInput(leftover) 
      >>> Enumerator.enumInput(e) |>> ch) 
     i.pureFlatFold(
     done = (a, e) => loop(e, chunker, acc :+ a, Input.Empty), 
     cont = k => Cont(loop(_, i, acc, Input.Empty)), 
     error = (msg, e) => Error(msg, e)) 
    } 
    Cont(loop(_: Input[AB], chunker, Vector(), Input.Empty)) 
} 

To działa na kilku próbek łącznie z tym opisywanym:

val source = Enumerator(
    "bippy".getBytes, 
    "foo\n\rbar\n\r\n\rbaz\nb".getBytes, 
    "azam\ntoto\n\n".getBytes) 
Ok.stream(source 
    &> chunkBy(line) 
    &> Enumeratee.map(l => l ++ ".\n".getBytes) 
).as("text/plain") 

która drukuje:

bippyfoo. 
bar. 
baz. 
bazam. 
toto. 
Powiązane problemy