2016-03-20 7 views
5

Moi dataframes zawiera jedno pole, które jest data i wydaje się w formie łańcucha, jako przykładPySpark: filtrowania DataFrame przez pola daty w przedziale gdzie data jest ciąg

'2015-07-02T11:22:21.050Z' 

muszę filtrować DataFrame na data uzyskania tylko zapisów w ostatnim tygodniu. Więc starałem podejście mapie, gdzie przerabiałem daty ciągów do obiektów datetime z strptime:

def map_to_datetime(row): 
    format_string = '%Y-%m-%dT%H:%M:%S.%fZ' 
    row.date = datetime.strptime(row.date, format_string) 

df = df.map(map_to_datetime) 

a następnie chciałbym zastosować filtr jako

df.filter(lambda row: 
    row.date >= (datetime.today() - timedelta(days=7))) 

udaje mi się dostać pracę mapowania ale filtr kończy się niepowodzeniem z możliwością filtrowania w sposób, który działa lub czy powinienem zmienić podejście i jak?

Odpowiedz

5

można rozwiązać ten bez użycia bocznego pracownik kodu Pythona i przełączenie do RDD. Przede wszystkim, skoro używasz ISO 8601 znaków, Twoje dane mogą zostać bezpośrednio rzutować na bieżąco lub datownika:

from pyspark.sql.functions import col 

df = sc.parallelize([ 
    ('2015-07-02T11:22:21.050Z',), 
    ('2016-03-20T21:00:00.000Z',) 
]).toDF(("d_str",)) 

df_casted = df.select("*", 
    col("d_str").cast("date").alias("dt"), 
    col("d_str").cast("timestamp").alias("ts")) 

To uratuje jedno podroż dookoła między JVM i Python. Istnieje również kilka sposobów, w jaki możesz podejść do drugiej części. tylko data:

from pyspark.sql.functions import current_date, datediff, unix_timestamp 

df_casted.where(datediff(current_date(), col("dt")) < 7) 

Sygnatury:

def days(i: int) -> int: 
    return 60 * 60 * 24 * i 

df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7)) 

Można również spojrzeć na current_timestamp i date_sub

Uwaga: Chciałbym unikać DataFrame.map. Lepiej jest zamiast tego używać DataFrame.rdd.map. Zaoszczędzi ci to trochę pracy po przejściu na wersję 2.0+

5

zorientowali się sposób, aby rozwiązać mój problem za pomocą interfejsu API SparkSQL z dat przechowywane jako ciągi i robi to:

last_week = (datetime.today() - timedelta(days=7)).strftime(format='%Y-%m-%d') 

new_df = df.where(df.date >= last_week) 
Powiązane problemy