6

Moje pytanie jest wyzwalane przez przypadek użycia obliczania różnic między kolejnymi rzędami w iskrowegowej ramce danych.Unikaj wpływu wydajności trybu pojedynczej partycji w funkcjach okna Spark

Na przykład mam:

>>> df.show() 
+-----+----------+ 
|index|  col1| 
+-----+----------+ 
| 0.0|0.58734024| 
| 1.0|0.67304325| 
| 2.0|0.85154736| 
| 3.0| 0.5449719| 
+-----+----------+ 

Jeśli wybiorę do obliczenia tych użyciu funkcji "okienko", to można to zrobić tak:

>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc()) 
>>> import pyspark.sql.functions as f 
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show() 
+-----+----------+-----------+ 
|index|  col1| diffs_col1| 
+-----+----------+-----------+ 
| 0.0|0.58734024|0.085703015| 
| 1.0|0.67304325| 0.17850411| 
| 2.0|0.85154736|-0.30657548| 
| 3.0| 0.5449719|  null| 
+-----+----------+-----------+ 

Pytanie: jawnie podzielił ramkę danych w pojedynczej partycji. Jaki jest wpływ tego na wydajność, a jeśli tak, to dlaczego tak jest i jak mogę tego uniknąć? Bo kiedy nie określono partycji, pojawia się następujące ostrzeżenie:

16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 

Odpowiedz

6

W praktyce wpływ na wydajność będzie prawie taka sama jak jeśli pominąć partitionBy klauzuli w ogóle. Wszystkie rekordy zostaną przetasowane na jednej partycji, posortowane lokalnie i kolejno po kolei iterowane.

Różnica dotyczy tylko liczby utworzonych partycji. Zilustrujmy to na przykładzie za pomocą prostego zestawu danych z 10 stref i 1000 rekordów:

df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42)) 

Jeśli zdefiniować ramkę bez przegrody przez klauzuli

w_unpart = Window.orderBy(f.col("index").asc()) 

i używać go z lag

df_lag_unpart = df.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1") 
) 

będzie tylko jedna partycja w sumie:

df_lag_unpart.rdd.glom().map(len).collect() 
[1000] 

porównaniu do tej definicji ramki z indeksem manekina (uproszczony nieco w porównaniu do kodu:

w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc()) 

użyje liczbę partycji równych spark.sql.shuffle.partitions:

spark.conf.set("spark.sql.shuffle.partitions", 11) 

df_lag_part = df.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1") 
) 

df_lag_part.rdd.glom().count() 
11 

z tylko jedna niepustowa partycja:

df_lag_part.rdd.glom().filter(lambda x: x).count() 

Niestety, nie ma uniwersalnego rozwiązania, które można wykorzystać do rozwiązania tego problemu w PySpark. Jest to nieodłączny mechanizm implementacji połączony z rozproszonym modelem przetwarzania.

Od index kolumna jest sekwencyjne można generować sztuczny klucz partycjonowania z ustalonej liczby rekordów w bloku:

rec_per_block = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions")) 

df_with_block = df.withColumn(
    "block", (f.col("index")/rec_per_block).cast("int") 
) 

i użyć go do definiowania specyfikacji ramki:

w_with_block = Window.partitionBy("block").orderBy("index") 

df_lag_with_block = df_with_block.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1") 
) 

zostanie użyte oczekiwanej liczby partycji:

df_lag_with_block.rdd.glom().count() 
11 

z grubsza równomiernym rozkładzie danych (nie możemy uniknąć kolizji hash):

df_lag_with_block.rdd.glom().map(len).collect() 
[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270] 

ale z wielu luk w granicach bloku:

df_lag_with_block.where(f.col("diffs_col1").isNull()).count() 
12 

Od granic są łatwe do wyliczenia:

from itertools import chain 

boundary_idxs = sorted(chain.from_iterable(
    # Here we depend on sequential identifiers 
    # This could be generalized to any monotonically increasing 
    # id by taking min and max per block 
    (idx - 1, idx) for idx in 
    df_lag_with_block.groupBy("block").min("index") 
     .drop("block").rdd.flatMap(lambda x: x) 
     .collect()))[2:] # The first boundary doesn't carry useful inf. 

zawsze można wybrać:

missing = df_with_block.where(f.col("index").isin(boundary_idxs)) 

i wypełnić je oddzielnie:

# We use window without partitions here. Since number of records 
# will be small this won't be a performance issue 
# but will generate "Moving all data to a single partition" warning 
missing_with_lag = missing.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1") 
).select("index", f.col("diffs_col1").alias("diffs_fill")) 

i join:

combined = (df_lag_with_block 
    .join(missing_with_lag, ["index"], "leftouter") 
    .withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill"))) 

aby uzyskać pożądany rezultat:

mismatched = combined.join(df_lag_unpart, ["index"], "outer").where(
    combined["diffs_col1"] != df_lag_unpart["diffs_col1"] 
) 
assert mismatched.count() == 0 
Powiązane problemy