2016-05-24 12 views
10

E.g. jeśli przechodzę przez ten sam RDD liczb, gdzie jeden przepływ filtruje dla liczb parzystych i uśrednia je, a pozostałe filtry dla nieparzystych i sumuje je. Jeśli napiszę to jako dwa potoki na tym samym RDD, utworzą się dwie egzekucje, które będą skanować RDD dwukrotnie, co może być kosztowne pod względem IO.Zapobiegaj więcej IO z wieloma potokami na tym samym RDD

W jaki sposób można zmniejszyć tę liczbę I, aby odczytać dane tylko raz bez przepisywania logiki w jednym potoku? Struktura, która przyjmuje dwa potoki i łączy je z jednym, jest oczywiście dobra, tak długo, jak deweloperzy kontynuują pracę nad każdym potokiem niezależnie (w rzeczywistości, te potoki są ładowane z oddzielnych modułów).

Punkt nie jest używać pamięci podręcznej(), aby osiągnąć to

+0

Czy ten rurociąg jest odczytywany wewnątrz pojedynczego zadania Spark? Lub dwie oddzielne oferty pracy? –

+0

Ta sama praca (SparkContext) – IttayD

+0

Czy korzystanie z [IgniteRDD] (https://ignite.apache.org/features/igniterdd.html) działa dla ciebie? Można załadować dane do współużytkowanego RDD, a następnie obrobić oba potoki. – heenenee

Odpowiedz

3

Ponieważ twoje pytanie jest raczej niejasne, pomyślmy o ogólnych strategiach, które można wykorzystać do rozwiązania tego problemu.

Standardowym rozwiązaniem jest buforowanie, ale ponieważ wyraźnie chcesz tego uniknąć, zakładam tutaj pewne dodatkowe ograniczenia. Sugeruje to, że pewne podobne rozwiązania, takie

nie przyjęcia. Oznacza to, że musisz znaleźć trochę, aby manipulować samym rurociągiem.

Mimo że wiele transformacji można zgnieść, każda transformacja tworzy nowy RDD. To, w połączeniu z Twoim stwierdzeniem dotyczącym buforowania, określa relatywnie silne ograniczenia dotyczące możliwych rozwiązań.

Zacznijmy od najprostszego możliwego przypadku, w którym wszystkie rurociągi mogą być wyrażone jako jednostopniowe zlecenia. Ogranicza to nasze wybory, aby odwzorowywać jedynie zadania i proste zadania polegające na zmniejszeniu liczby map (takie jak opisane w pytaniu). Rurociągi takie jak ten można łatwo wyrazić jako sekwencję operacji na lokalnych iteratorach. Więc co następuje

import org.apache.spark.util.StatCounter 

def isEven(x: Long) = x % 2 == 0 
def isOdd(x: Long) = !isEven(x) 

def p1(rdd: RDD[Long]) = { 
    rdd 
    .filter(isEven _) 
    .aggregate(StatCounter())(_ merge _, _ merge _) 
    .mean 
} 

def p2(rdd: RDD[Long]) = { 
    rdd 
    .filter(isOdd _) 
    .reduce(_ + _) 
} 

można wyrazić jako:

def p1(rdd: RDD[Long]) = { 
    rdd 
    .mapPartitions(iter => 
     Iterator(iter.filter(isEven _).foldLeft(StatCounter())(_ merge _))) 
    .collect 
    .reduce(_ merge _) 
    .mean 
} 


def p2(rdd: RDD[Long]) = { 
    rdd 
    .mapPartitions(iter => 
     Iterator(iter.filter(isOdd _).foldLeft(0L)(_ + _))) 
    .collect 
    .reduce(_ + _) 
    // identity _ 
} 

W tym momencie możemy przepisać oddzielne zadania w następujący sposób:

def mapPartitions2[T, U, V](rdd: RDD[T])(f: Iterator[T] => U, g: Iterator[T] => V) = { 
    rdd.mapPartitions(iter => { 
    val items = iter.toList 
    Iterator((f(items.iterator), g(items.iterator))) 
    }) 
} 

def reduceLocally2[U, V](rdd: RDD[(U, V)])(f: (U, U) => U, g: (V, V) => V) = { 
    rdd.collect.reduce((x, y) => (f(x._1, y._1), g(x._2, y._2))) 
} 

def evaluate[U, V, X, Z](pair: (U, V))(f: U => X, g: V => Z) = (f(pair._1), g(pair._2)) 

val rdd = sc.range(0L, 100L) 

def f(iter: Iterator[Long]) = iter.filter(isEven _).foldLeft(StatCounter())(_ merge _) 
def g(iter: Iterator[Long]) = iter.filter(isOdd _).foldLeft(0L)(_ + _) 


evaluate(reduceLocally2(mapPartitions2(rdd)(f, g))(_ merge _, _ + _))(_.mean, identity) 

Największym problemem jest to, że mamy do skwapliwie oceń każdą partycję, aby móc zastosować poszczególne potoki. Oznacza to, że ogólne wymagania dotyczące pamięci mogą być znacznie wyższe w porównaniu do tej samej logiki stosowanej osobno. Bez buforowania * jest również bezużyteczny w przypadku wieloetapowych zadań.

Alternatywnym rozwiązaniem jest przetwarzanie danych elementem mądry ale traktują każdą pozycję jako krotki seqs:

def map2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(f: T => V, g: U => X) = { 
    rdd.map{ case (ts, us) => (ts.map(f), us.map(g)) } 
} 


def filter2[T, U](rdd: RDD[(Seq[T], Seq[U])])(
    f: T => Boolean, g: U => Boolean) = { 
    rdd.map{ case (ts, us) => (ts.filter(f), us.filter(g)) } 
} 


def aggregate2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(zt: V, zu: X) 
    (s1: (V, T) => V, s2: (X, U) => X, m1: (V, V) => V, m2: (X, X) => X) = { 
    rdd.mapPartitions(iter => { 
    var accT = zt 
    var accU = zu 
    iter.foreach { case (ts, us) => { 
     accT = ts.foldLeft(accT)(s1) 
     accU = us.foldLeft(accU)(s2) 
    }} 

    Iterator((accT, accU)) 
    }).reduce { case ((v1, x1), (v2, x2)) => ((m1(v1, v2), m2(x1, x2))) } 
} 

Z API jak to możemy wyrazić wstępne rurociągi jak:

val rddSeq = rdd.map(x => (Seq(x), Seq(x))) 


aggregate2(filter2(rddSeq)(isEven, isOdd))(StatCounter(), 0L)(
    _ merge _, _ + _, _ merge _, _ + _ 
) 

ten podejście jest nieco silniejsze niż poprzednie (w razie potrzeby można łatwo wdrożyć niektóre podzestawy metod byKey), a wymagania dotyczące pamięci w typowych potokach powinny być porównywalne do podstawowego interfejsu API, ale jest również znacznie bardziej uciążliwe.


* Można sprawdzić an answer dostarczone przez eje przykłady multipleksowania.

Powiązane problemy