2016-05-11 9 views
5

Piszę aplikację zapłonową, gdzie potrzebne do oceny danych strumieniowych na podstawie danych historycznych, które mieści się w bazie danych serwera SQLJak oceniać obiekty iskra Dstream z ramki danych iskra

Teraz pomysł jest iskra pobierze dane historyczne z bazy danych i utrzyma je w pamięci oraz oceni dane strumieniowe na ich podstawie.

Teraz jestem coraz danych strumieniowych jak

import re 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.sql import SQLContext,functions as func,Row 


sc = SparkContext("local[2]", "realtimeApp") 
ssc = StreamingContext(sc,10) 
files = ssc.textFileStream("hdfs://RealTimeInputFolder/") 

########Lets get the data from the db which is relavant for streaming ### 

driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" 
dataurl = "jdbc:sqlserver://myserver:1433" 
db = "mydb" 
table = "stream_helper" 
credential = "my_credentials" 

########basic data for evaluation purpose ######## 



files_count = files.flatMap(lambda file: file.split()) 

pattern = '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\\S+)(TranDescription=u.)([a-zA-z\\s]+)([\\S\\s]+)(dSc=u.)([A-Z]{2}.[0-9]+)' 


tranfiles = "wasb://myserver.blob.core.windows.net/RealTimeInputFolder01/" 

def getSqlContextInstance(sparkContext): 
    if ('sqlContextSingletonInstance' not in globals()): 
     globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) 
    return globals()['sqlContextSingletonInstance'] 


def pre_parse(logline): 
    """ 
    to read files as rows of sql in pyspark streaming using the pattern . for use of logging 
    added 0,1 in case there is any failure in processing by this pattern 

    """ 
    match = re.search(pattern,logline) 
    if match is None: 
     return(line,0) 
    else: 
     return(
     Row(
     customer_id = match.group(8) 
     trantype = match.group(5) 
     amount = float(match.group(2)) 
     ),1) 


def parse(): 
    """ 
    actual processing is happening here 
    """ 
    parsed_tran = ssc.textFileStream(tranfiles).map(preparse) 
    success = parsed_tran.filter(lambda s: s[1] == 1).map(lambda x:x[0]) 
    fail = parsed_tran.filter(lambda s:s[1] == 0).map(lambda x:x[0]) 
    if fail.count() > 0: 
     print "no of non parsed file : %d", % fail.count() 

    return success,fail 

success ,fail = parse() 

Teraz chcę oceniać go przez ramkę danych, które otrzymuję od danych historycznych

base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load() 

Teraz ponieważ jest zwracany jako ramy danych, jak używać tego do mojego celu. Przewodnik programowania strumieniowania here mówi
"Musisz utworzyć SQLContext za pomocą SparkContext, którego używa StreamingContext."

Teraz to jeszcze bardziej zagmatwało, w jaki sposób wykorzystać istniejącą ramkę danych do przesyłania strumieniowego. Każda pomoc jest wysoko ceniona.

+0

Istnieje wiele sposoby, aby to osiągnąć. Sugerowałbym przeglądanie metody foreachRDD dla DStreams, w której można użyć istniejącej ramki danych jako zmiennej broadcast. Możesz także przekonwertować swoją ramkę danych na RDD i połączyć ją z każdym DStream RDD za pomocą transformacji. Jeśli potrzebujesz iskontekstu z klasy sparkStreamingContext w metodzie foreachRDD do utworzenia sqlContext, możesz po prostu wywołać 'ssc.sparkContext()' –

Odpowiedz

0

Aby manipulować DataFrames, zawsze potrzebujemy SqlContext więc można instanciate to lubią:

sc = SparkContext("local[2]", "realtimeApp") 
sqlc = SQLContext(sc) 
ssc = StreamingContext(sc, 10) 

Te 2 kontekstach (SqlContext i StreamingContext) będą współistnieć w tym samym miejscu pracy, ponieważ są one związane z tym samym SparkContext. Należy jednak pamiętać, że nie można wprowadzić dwóch różnych warunków SparkContext w to samo zadanie.

Po utworzeniu ramki DataFrame ze swoich DStreams można dołączyć do historycznej DataFrame z ramką DataFrame utworzoną ze strumienia. Aby to zrobić, chciałbym zrobić coś takiego:

yourDStream.foreachRDD(lambda rdd: sqlContext 
    .createDataFrame(rdd) 
    .join(historicalDF, ...) 
    ... 
) 

myśleć o wysokości strumieniowej transmisji danych należy użyć dla Twojego dołączyć podczas manipulowania strumieni, może być zainteresowany przez windowed functions

Powiązane problemy