2016-02-02 16 views
11

Mam RDD z (String, int), która jest posortowana według kluczJak obliczyć skumulowaną sumę użyciu Spark

val data = Array(("c1",6), ("c2",3),("c3",4)) 
val rdd = sc.parallelize(data).sortByKey 

Teraz chcę zacząć wartość dla pierwszego klucza z zera do kolejnych kluczy jako suma poprzednich kluczy.

Np: C1 = 0, c2 = na c1 wartość, C3 = (c1 wartość wartość + c2), c4 = (c1 + .. + wartość c3) oczekiwany wynik:

(c1,0), (c2,6), (c3,9)... 

Czy jest możliwe osiągnąć to ? Próbowałem go z mapą, ale suma nie jest zachowywana na mapie.

var sum = 0 ; 
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }} 
+0

Jest sekwencyjny, a zatem nie można go zrównoleglić, więc nie jest to właściwy sposób użycia Sparka. Sugerowałbym, żebyś przeczytał o tym, czym jest Spark. –

+0

Przykro mi, ale nawet po kolei nie rozumiem relacji między danymi wejściowymi a danymi wyjściowymi? jak to obliczyć? – eliasah

+0

@JohanS Tak, to jest sekwencyjne i wierzę również, że to nie jest właściwy sposób na użycie iskry. W naszym przypadku użycia skończyliśmy w ten sposób. Pozwól mi wypróbować inny sposób, aby osiągnąć funkcjonalność. – Knight71

Odpowiedz

16
  1. Compute częściowe wyniki dla każdej partycji:

    val partials = rdd.mapPartitionsWithIndex((i, iter) => { 
        val (keys, values) = iter.toSeq.unzip 
        val sums = values.scanLeft(0)(_ + _) 
        Iterator((keys.zip(sums.tail), sums.last)) 
    }) 
    
  2. Collect partials podsumowuje

    val partialSums = partials.values.collect 
    
  3. Compute skumulowaną sumę ponad partycji i nadawanie go:

    val sumMap = sc.broadcast(
        (0 until rdd.partitions.size) 
        .zip(partialSums.scanLeft(0)(_ + _)) 
        .toMap 
    ) 
    
  4. Ostateczne wyniki Compute:

    val result = partials.keys.mapPartitionsWithIndex((i, iter) => { 
        val offset = sumMap.value(i) 
        if (iter.isEmpty) Iterator() 
        else iter.next.map{case (k, v) => (k, v + offset)}.toIterator 
    }) 
    
+0

Wow, wysoce edukacyjny, wciąż działa. Ale mam na przykład res12: Array [(Seq [(String, Int)], Int)] = Array ((Stream ((c01,1),?), 10), (Stream ((c05,5), ?), 18), (Strumień ((c08,8),?), 27)). Co to jest ? – thebluephantom

0

Oto rozwiązanie w PySpark. Wewnętrznie jest to zasadniczo to samo, co rozwiązanie @ 03232 Scala, ale zapewnia funkcję ogólnego przeznaczenia z interfejsem API podobnym do Sparka.

import numpy as np 
def cumsum(rdd, get_summand): 
    """Given an ordered rdd of items, computes cumulative sum of 
    get_summand(row), where row is an item in the RDD. 
    """ 
    def cumsum_in_partition(iter_rows): 
     total = 0 
     for row in iter_rows: 
      total += get_summand(row) 
      yield (total, row) 
    rdd = rdd.mapPartitions(cumsum_in_partition) 

    def last_partition_value(iter_rows): 
     final = None 
     for cumsum, row in iter_rows: 
      final = cumsum 
     return (final,) 

    partition_sums = rdd.mapPartitions(last_partition_value).collect() 
    partition_cumsums = list(np.cumsum(partition_sums)) 
    partition_cumsums = [0] + partition_cumsums 
    partition_cumsums = sc.broadcast(partition_cumsums) 

    def add_sums_of_previous_partitions(idx, iter_rows): 
     return ((cumsum + partition_cumsums.value[idx], row) 
      for cumsum, row in iter_rows) 
    rdd = rdd.mapPartitionsWithIndex(add_sums_of_previous_partitions) 
    return rdd 

# test for correctness by summing numbers, with and without Spark 
rdd = sc.range(10000,numSlices=10).sortBy(lambda x: x) 
cumsums, values = zip(*cumsum(rdd,lambda x: x).collect()) 
assert all(cumsums == np.cumsum(values)) 
0

Natknąłem się na podobny problem i zaimplementowałem rozwiązanie @Paul. Chciałem zrobić cumsum na tablicy częstotliwości całkowitych posortowane według klucza (liczba całkowita), a był niewielki problem z np.cumsum(partition_sums), błąd jest unsupported operand type(s) for +=: 'int' and 'NoneType'.

Ponieważ, jeśli zakres jest wystarczająco duży, prawdopodobieństwo, że każda partycja ma coś jest wystarczająco duże (brak wartości Brak). Jeśli jednak zakres jest znacznie mniejszy niż liczba, a liczba partycji pozostaje taka sama, niektóre partycje będą puste. Oto zmodyfikowane rozwiązanie:

def cumsum(rdd, get_summand): 
    """Given an ordered rdd of items, computes cumulative sum of 
    get_summand(row), where row is an item in the RDD. 
    """ 
    def cumsum_in_partition(iter_rows): 
     total = 0 
     for row in iter_rows: 
      total += get_summand(row) 
      yield (total, row) 
    rdd = rdd.mapPartitions(cumsum_in_partition) 
    def last_partition_value(iter_rows): 
     final = None 
     for cumsum, row in iter_rows: 
      final = cumsum 
     return (final,) 
    partition_sums = rdd.mapPartitions(last_partition_value).collect() 
    # partition_cumsums = list(np.cumsum(partition_sums)) 

    #----from here are the changed lines 
    partition_sums = [x for x in partition_sums if x is not None] 
    temp = np.cumsum(partition_sums) 
    partition_cumsums = list(temp) 
    #---- 

    partition_cumsums = [0] + partition_cumsums 
    partition_cumsums = sc.broadcast(partition_cumsums) 
    def add_sums_of_previous_partitions(idx, iter_rows): 
     return ((cumsum + partition_cumsums.value[idx], row) 
      for cumsum, row in iter_rows) 
    rdd = rdd.mapPartitionsWithIndex(add_sums_of_previous_partitions) 
    return rdd 

#test on random integer frequency 
x = np.random.randint(10, size=1000) 
D = sqlCtx.createDataFrame(pd.DataFrame(x.tolist(),columns=['D'])) 
c = D.groupBy('D').count().orderBy('D') 
c_rdd = c.rdd.map(lambda x:x['count']) 
cumsums, values = zip(*cumsum(c_rdd,lambda x: x).collect()) 
+0

Wygląda na to, że wprowadzi błędy, ponieważ długość partycji_partycji nie będzie już liczbą partycji. Myślę, że poprawne jest '[x jeśli x nie jest żadnym innym 0 dla x w partycji_sumy]' – AbdealiJK

-1

możesz wypróbować za pomocą okien, korzystając z wierszy Pomiędzy. nadzieja wciąż jest pomocna.

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.Window 

val data = Array(("c1",6), ("c2",3),("c3",4)) 
val df = sc.parallelize(data).sortByKey().toDF("c", "v") 
val w = Window.orderBy("c") 
val r = df.select($"c", sum($"v").over(w.rowsBetween(-2, -1)).alias("cs")) 
display(r) 
+1

czy testowałeś to rozwiązanie? nie sumuje się. –

1

iskra buit w wsporników ulu ANALITYKA/okienkowanie funkcji i łączna suma może być łatwo osiągnięte za pomocą funkcji analiz.

Hive wiki ANALYTICS/WINDOWING funkcji.

Przykład:

Zakładając masz SqlContext object-

val datardd = sqlContext.sparkContext.parallelize(Seq(("a",1),("b",2), ("c",3),("d",4),("d",5),("d",6))) 
import sqlContext.implicits._ 

//Register as test table 
datardd.toDF("id","val").createOrReplaceTempView("test") 

//Calculate Cumulative sum 
sqlContext.sql("select id,val, " + 
    "SUM(val) over ( order by id rows between unbounded preceding and current row) cumulative_Sum " + 
    "from test").show() 

Podejście to przyczyna poniżej ostrzeżenia. W przypadku, gdy executor uruchamia OutOfMemory, dostosuj parametry pamięci zadania do pracy z ogromnym zestawem danych.

Okno WARN: brak partycji zdefiniowanej dla działania okna! Przenoszenie wszystkie dane do jednej partycji, może to spowodować poważne pogorszenie wydajności

Mam nadzieję, że to pomaga.

+1

Gotowe. Dziękuję za twój komentarz. Uprzejmie zaktualizuj post, który uznasz za stosowny. –

Powiązane problemy