2015-09-08 12 views
7

Mam problem, kiedy używam streamingu iskrowego do czytania z Cassandry.Czytanie z Cassandry za pomocą Spark Streaming

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

jako link powyżej, używam

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3) 

aby wybrać dane z Cassandrą, ale wydaje się, że streaming iskra ma raz tylko jedno zapytanie, ale chcę nadal kwerendy przy użyciu przerwa 10 sencondów.

Mój kod jest następujący, życzę odpowiedzi.

Dzięki!

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import com.datastax.spark.connector.streaming._ 
import org.apache.spark.rdd._ 
import scala.collection.mutable.Queue 


object SimpleApp { 
def main(args: Array[String]){ 
    val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1") 

    val ssc = new StreamingContext(conf, Seconds(10)) 

    val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

    //rdd.collect().foreach(println) 

    val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]() 


    val dstream = ssc.queueStream(rddQueue) 

    dstream.print() 

    ssc.start() 
    rdd.collect().foreach(println) 
    rddQueue += rdd 
    ssc.awaitTermination() 
} 

}

+0

można opisać to, co chcesz osiągnąć? Czy czytasz pełną tabelę w każdym przedziale? Skąd pochodzą dane strumieniowe? – maasg

+0

@maasg Chcę odczytać tabelę z każdego interwału (np. 10s), aby wysłać zapytanie dotyczące niektórych rekordów związanych z czasem. Oznacza to, że chcę, aby Cassandra była źródłem Spark Streaming. Jednym słowem, jestem zablokowany przy tworzeniu DStream. Czy chciałbyś podać kilka wskazówek i przykładów? Dziękuję bardzo! –

Odpowiedz

6

Można utworzyć ConstantInputDStream z CassandraRDD jako wejście. ConstantInputDStream zapewni taki sam RDD dla każdego interwału strumieniowania, a wykonując akcję na tym RDD, uruchomisz materializację linii RDD, co spowoduje wykonanie kwerendy na Cassandrze za każdym razem.

Upewnij się, że dane, których dotyczy zapytanie, nie rosną nieograniczone, aby uniknąć wydłużenia czasu zapytania i spowodować niestabilny proces przesyłania strumieniowego.

Coś takiego powinno wystarczyć (przy użyciu kodu jako punktu startowego):

import org.apache.spark.streaming.dstream.ConstantInputDStream 

val ssc = new StreamingContext(conf, Seconds(10)) 

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

val dstream = new ConstantInputDStream(ssc, cassandraRDD) 

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output 
    println(rdd.collect.mkString("\n")) 
} 
ssc.start() 
ssc.awaitTermination() 
+3

Co się stanie, jeśli chcę tylko odczytać ** nowe dane ** zapisane w tabeli od czasu przetworzenia ostatniego dokumentu RDD? Czy to jest możliwe? –

+2

Czy istnieje sposób, aby zapobiec ponownemu pobieraniu starych danych? utrzymuje się w nieskończonej pętli. –

+0

@yurishkuro AFAIK, który obecnie nie jest możliwy. – maasg

0

Miałem ten sam problem i znalazł rozwiązanie tworząc podklasę klasy InputDStream. Konieczne jest zdefiniowanie metod start() i .

start() może być użyty do przygotowania. Główna logika znajduje się w compute(). Zwróci ona Option[RDD[T]]. Aby zdefiniować klasę elastyczną, zdefiniowano cechę InputStreamQuery.

trait InputStreamQuery[T] { 
    // where clause condition for partition key 
    def partitionCond : (String, Any) 
    // function to return next partition key 
    def nextValue(v:Any) : Option[Any] 
    // where clause condition for clustering key 
    def whereCond : (String, (T) => Any) 
    // batch size 
    def batchSize : Int 
} 

Na stole Cassandra keyspace.test utwórz test_by_date która reorganizuje tabelę podziału klucza date.

CREATE TABLE IF NOT exists keyspace.test 
(id timeuuid, date text, value text, primary key (id)) 

CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS 
SELECT * 
FROM keyspace.test 
WHERE id IS NOT NULL 
PRIMARY KEY (date, id) 
WITH CLUSTERING ORDER BY (id ASC); 

Jednym z możliwych rozwiązań dla test tabeli będzie

class class Test(id:UUID, date:String, value:String) 

trait InputStreamQueryTest extends InputStreamQuery[Test] { 
    val dateFormat = "uuuu-MM-dd" 

    // set batch size as 10 records 
    override def batchSize: Int = 10 

    // partitioning key conditions, query string and initial value 
    override def partitionCond: (String, Any) = ("date = ?", "2017-10-01") 
    // clustering key condition, query string and function to get clustering key from the instance 
    override def whereCond: (String, Test => Any) = (" id > ?", m => m.id) 
    // return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01' 
    override def nextValue(v: Any): Option[Any] = { 

    import java.time.format.DateTimeFormatter 

    val formatter = DateTimeFormatter.ofPattern(dateFormat) 
    val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1) 
    if (nextDate.isAfter(LocalDate.now())) None 
    else Some(nextDate.format(formatter)) 
    } 
} 

Może być stosowany w klasie CassandraInputStream następująco.

class CassandraInputStream[T: ClassTag] 
(_ssc: StreamingContext, keyspace:String, table:String) 
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T]) 
extends InputDStream[T](_ssc) with InputStreamQuery[T] { 

var lastElm:Option[T] = None 
var partitionKey : Any = _ 

override def start(): Unit = { 

    // find a partition key which stores some records 
    def findStartValue(cql : String, value:Any): Any = { 
    val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1) 

    if (rdd.cassandraCount() > 0) value 
    else { 
     nextValue(value).map(findStartValue(cql, _)).getOrElse(value) 
    } 
    } 
    // get query string and initial value from partitionCond method 
    val (cql, value) = partitionCond 
    partitionKey = findStartValue(cql, value) 
} 

override def stop(): Unit = {} 

override def compute(validTime: Time): Option[RDD[T]] = { 
    val (cql, _) = partitionCond 
    val (wh, whKey) = whereCond 

    def fetchNext(patKey: Any) : Option[CassandraTableScanRDD[T]] = { 
    // query with partitioning condition 
    val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, patKey) 

    val rdd = lastElm.map{ x => 
     query.where(wh, whKey(x)).withAscOrder.limit(batchSize) 
    }.getOrElse(query.withAscOrder.limit(batchSize)) 

    if (rdd.cassandraCount() > 0) { 
     // store the last element of this RDD 
     lastElm = Some(rdd.collect.last) 
     Some(rdd) 
    } 
    else { 
     // find the next partition key which stores data 
     nextValue(patKey).flatMap{ k => 
     partitionKey = k 
     fetchNext(k)} 
    } 
    } 

    fetchNext(partitionKey) 
} 
} 

połączenie wszystkich klas,

val conf = new SparkConf().setAppName(appName).setMaster(master) 
val ssc = new StreamingContext(conf, Seconds(10)) 

val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest 

dstream.map(println).saveToCassandra(...) 

ssc.start() 
ssc.awaitTermination() 
Powiązane problemy