Piszę aplikację zapłonową, gdzie potrzebne do oceny danych strumieniowych na podstawie danych historycznych, które mieści się w bazie danych serwera SQLJak oceniać obiekty iskra Dstream z ramki danych iskra
Teraz pomysł jest iskra pobierze dane historyczne z bazy danych i utrzyma je w pamięci oraz oceni dane strumieniowe na ich podstawie.
Teraz jestem coraz danych strumieniowych jak
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext,functions as func,Row
sc = SparkContext("local[2]", "realtimeApp")
ssc = StreamingContext(sc,10)
files = ssc.textFileStream("hdfs://RealTimeInputFolder/")
########Lets get the data from the db which is relavant for streaming ###
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
dataurl = "jdbc:sqlserver://myserver:1433"
db = "mydb"
table = "stream_helper"
credential = "my_credentials"
########basic data for evaluation purpose ########
files_count = files.flatMap(lambda file: file.split())
pattern = '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\\S+)(TranDescription=u.)([a-zA-z\\s]+)([\\S\\s]+)(dSc=u.)([A-Z]{2}.[0-9]+)'
tranfiles = "wasb://myserver.blob.core.windows.net/RealTimeInputFolder01/"
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
def pre_parse(logline):
"""
to read files as rows of sql in pyspark streaming using the pattern . for use of logging
added 0,1 in case there is any failure in processing by this pattern
"""
match = re.search(pattern,logline)
if match is None:
return(line,0)
else:
return(
Row(
customer_id = match.group(8)
trantype = match.group(5)
amount = float(match.group(2))
),1)
def parse():
"""
actual processing is happening here
"""
parsed_tran = ssc.textFileStream(tranfiles).map(preparse)
success = parsed_tran.filter(lambda s: s[1] == 1).map(lambda x:x[0])
fail = parsed_tran.filter(lambda s:s[1] == 0).map(lambda x:x[0])
if fail.count() > 0:
print "no of non parsed file : %d", % fail.count()
return success,fail
success ,fail = parse()
Teraz chcę oceniać go przez ramkę danych, które otrzymuję od danych historycznych
base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load()
Teraz ponieważ jest zwracany jako ramy danych, jak używać tego do mojego celu. Przewodnik programowania strumieniowania here mówi
"Musisz utworzyć SQLContext za pomocą SparkContext, którego używa StreamingContext."
Teraz to jeszcze bardziej zagmatwało, w jaki sposób wykorzystać istniejącą ramkę danych do przesyłania strumieniowego. Każda pomoc jest wysoko ceniona.
Istnieje wiele sposoby, aby to osiągnąć. Sugerowałbym przeglądanie metody foreachRDD dla DStreams, w której można użyć istniejącej ramki danych jako zmiennej broadcast. Możesz także przekonwertować swoją ramkę danych na RDD i połączyć ją z każdym DStream RDD za pomocą transformacji. Jeśli potrzebujesz iskontekstu z klasy sparkStreamingContext w metodzie foreachRDD do utworzenia sqlContext, możesz po prostu wywołać 'ssc.sparkContext()' –