2015-03-10 11 views
5

piszę kod oparty na „asynchroniczne iteratorów dla dużych zbiorów fonograficznych” opisywanych w https://github.com/websudos/phantom#partial-select-querieswartość plaster nie jest członkiem play.api.libs.iteratee.Enumerator

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 

import org.joda.time.DateTime 
import org.joda.time.format.DateTimeFormat 
import org.joda.time.format.DateTimeFormatter 

import com.anomaly42.aml.dao.CassandraConnector 
import com.websudos.phantom.CassandraTable 
import com.websudos.phantom.Implicits._ 

object People extends People { 
    def getPersonByUpdatedAt(from:String, to:String, start: Int, limit: Int) = { 
    val dtf:DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ"); 
    val fromDateTime = dtf.parseDateTime(from) 
    val toDateTime = dtf.parseDateTime(to) 

    People.select(_.updated_at, _.firstName).allowFiltering.where(_.updated_at gte fromDateTime).and(_.updated_at lte toDateTime).fetchEnumerator().slice(start, limit).collect 
    } 
} 

Używam następujących zależnościach bibliotek:

scalaVersion := "2.11.6" 
libraryDependencies ++= Seq(
    "com.websudos"  %% "phantom-dsl"  % "1.5.4", 
    many more... 
) 

ale pojawia się następujący błąd podczas kompilacji:

value slice is not a member of play.api.libs.iteratee.Enumerator[(org.joda.time.DateTime, Option[String])] 

Co ja próbuję g do zrobienia jest napisanie zapytania, które przywraca następną "limit" liczbę wyników zaczynając od "start", za każdym razem wywoływana jest metoda getPersonByUpdatedAt().

Odpowiedz

3

Istnieje kilka szczegółów dotyczących implementacji, które należy tutaj podać. Po pierwsze, jeśli jesteś po stronie stronicowania, może być łatwiejszy sposób osiągnięcia tego przy prostych zapytaniach dotyczących zakresu zamiast filtrowanych danych.

Spójrz na użycie numeru CLUSTERING ORDER, że nie powinno tam być połączenia z numerem ALLOW FILTERING. Co więcej, bez CLUSTERING ORDER domyślny program do partycjonowania Murmur3 nie jest faktycznie zamawiany, więc nie masz gwarancji na odzyskanie danych w tej samej kolejności, w jakiej je zapisałeś.

Co prawdopodobnie oznacza, że ​​paginacja w rzeczywistości nie będzie działać. Last but not least, korzystanie z modułów wyliczających bezpośrednio nie jest prawdopodobnie tym, czego szukasz.

Są one asynchroniczne, więc musisz mapować w przyszłości, aby uzyskać plasterek, ale na bok, moduły wyliczające są użyteczne, gdy coś takiego jak Spark ładuje całą tabelę naraz, np. Wiele wyników.

Podsumowując to wszystko, w środku tabeli osoby:

object id extends UUIDColumn(this) with PartitionKey[UUID]// doesn't have to be UUID 
object start extends DateTimeColumn(this) with ClusteringOrder[DateTime] with Ascending 
object end extends DateTimeColumn(this) with ClusteringOrder[DateTime] with Ascending 

I po prostu użyć fetch() a następnie Seq.slice ze zbiorów biblioteki Scala. Powyższe zakłada, że ​​chcesz paginować w porządku rosnącym, np. Pobiera najstarsze jako pierwsze.

Należy również ustalić, jaki może być realistyczny klucz partycji. Jeśli dwóch użytkowników jest jednocześnie aktualizowanych, najgorszym przypadkiem jest utrata danych i likwidacja kolejki FIFO, np. Ostatnia aktualizacja w danym momencie "wygrywa". Użyłem powyżej id, ale tego oczywiście nie potrzebujesz.

Konieczne może być również posiadanie kilku stołów, w których przechowujesz ludzi, aby móc pokryć wszystkie potrzebne zapytania.

+0

Hi @flavian, dzięki za odpowiedź. Moja baza danych ma miliony rekordów i dlatego będę musiał ją zaimplementować przy użyciu enumeratorów. Zdefiniowałem kolumnę z datą z klauzulą ​​"CLUSTERING ORDER", o której wspomniałem, ale jeśli nie używam opcji "ALLOW FILTERING", otrzymuję błędy czasu wykonywania. Udało mi się uciec z błędem kompilacji z następującą instrukcją: People.select (_. Updated_at, _.firstName) .allowFiltering.where (_. Updated_at gte fromDateTime) .and (_. Updated_at lte toDateTime) .setFetchSize (pageSize) .fetchEnumerator() uruchom Iteratee.slice (start, pageSize)) –

1

Powinieneś używać Iteratee i Enumerator ze struktury Play. W twoim przypadku trzeba:

import com.websudos.phantom.iteratee.Iteratee 

val enumerator = People.select(_.updated_at, _.firstName).allowFiltering.where(_.updated_at gte fromDateTime).and(_.updated_at lte toDateTime).fetchEnumerator 

val iteratee = Iteratee.slice[PeopleCaseClass](start, limit) 

enumerator.run(iteratee).map(_.foldLeft(List.empty[PeopleCaseClass])((l,e) => { e :: l })) 

nadzieję, że pomoże

Powiązane problemy