2016-03-16 17 views
14

Pracuję nad projektem strumieniowym Scala (2.11)/Spark (1.6.1) i używam mapWithState() do śledzenia danych z poprzednich partii.Spark Streaming MapWithState wydaje się okresowo odbudowywać stan kompletny

Stan jest dystrybuowany w 20 partycjach na wielu węzłach, utworzonych za pomocą StateSpec.function(trackStateFunc _).numPartitions(20). W tym stanie mamy tylko kilka kluczy (~ 100) zmapowanych do Sets z ~ 160 000 wpisów, które rosną w całej aplikacji. Cały stan jest do 3GB, który może być obsługiwany przez każdy węzeł w klastrze. W każdej partii niektóre dane są dodawane do stanu, ale nie są usuwane do samego końca procesu, tj. ~ 15 minut.

Podczas korzystania z interfejsu użytkownika aplikacji, co 10-ty czas przetwarzania partii jest bardzo wysoki w porównaniu do innych partii. Zobacz zdjęcia:

The spikes show the higher processing time.

żółtego pola reprezentują wysoki czas przetwarzania.

enter image description here

Bardziej szczegółowy widok praca pokazuje, że w tych partiach występują w pewnym momencie, dokładnie wtedy, gdy wszystkie partycje są 20 „pominięty”. Lub tak mówi interfejs użytkownika.

enter image description here

Moje rozumienie skipped jest to, że każda partycja stan jest jednym z możliwych zadań, które nie są wykonywane, ponieważ nie muszą być przeliczane. Jednak nie rozumiem, dlaczego liczba skips jest różna w każdej pracy i dlaczego ostatnia praca wymaga tak dużego przetwarzania. Wyższy czas przetwarzania występuje niezależnie od wielkości państwa, wpływa tylko na czas trwania.

Czy jest to błąd w funkcji mapWithState(), czy jest to zamierzone zachowanie? Czy podstawowa struktura danych wymaga pewnego rodzaju przetasowania, czy Set w stanie trzeba skopiować dane? A może jest to błąd w mojej aplikacji?

Odpowiedz

9

Czy jest to błąd w funkcji mapWithState() czy jest to zamierzone zachowanie w postaci ?

Jest to zamierzone zachowanie. Wyskoki, które widzisz, ponieważ twoje dane są sprawdzane na końcu tej danej partii. Jeśli zauważysz czas na dłuższych partiach, zobaczysz, że dzieje się to stale co 100 sekund. Dzieje się tak dlatego, że czas punktu kontrolnego jest stały i jest obliczany na podstawie twojego batchDuration, czyli jak często rozmawiasz ze źródłem danych, aby odczytać partię pomnożoną przez pewną stałą, chyba że wyraźnie ustawisz przedział DStream.checkpoint.

Oto odnośny fragment kodu z MapWithStateDStream:

override def initialize(time: Time): Unit = { 
    if (checkpointDuration == null) { 
    checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER 
    } 
    super.initialize(time) 
} 

Gdzie DEFAULT_CHECKPOINT_DURATION_MULTIPLIER jest:

private[streaming] object InternalMapWithStateDStream { 
    private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10 
} 

Które zrówna się dokładnie z zachowaniem widzisz, ponieważ twój czas odczytu partia jest co 10 sekund => 10 * 10 = 100 sekund.

Jest to normalne i jest to koszt utrzymywania się stanu z urządzeniem Spark. Optymalizacja po twojej stronie może polegać na zastanowieniu się, w jaki sposób możesz zminimalizować rozmiar stanu, który musisz zachować w pamięci, aby ta serializacja była tak szybka, jak to tylko możliwe. Dodatkowo upewnij się, że dane są rozłożone na tyle executorów, aby stan był równomiernie rozłożony między wszystkimi węzłami. Mam także nadzieję, że zamiast domyślnej serializacji Java masz włączoną Kryo Serialization, która może zapewnić znaczący wzrost wydajności.

+0

W moim przypadku widzę, że każde zlecenie jest sprawdzane w partia. Dlaczego nie tylko ostatnia praca? Jakie jest twoje rozwiązanie, aby mieć oko na rozmiar państwa? Aby móc ją zoptymalizować. – crak

+0

@crak Jaki jest Twój okres kontrolny? I jak widzisz, że każda praca kontroluje dane? –

+0

Co 10 partii. Moje oko było nadużyciem, mam 12 zadań na 16, które robią checkpoint. I to jest logiczne, mam 12 mapWithState, widzę tam ślad w iskrze. Ale bez wiedzy, który z nich ma największy rozmiar. mapWithState Store po prostu stwierdziła, że ​​nie tak jak poprzednia implantacja? – crak

1

Oprócz zaakceptowanej odpowiedzi, wskazującej na cenę serializacji związanej z punktami kontrolnymi, jest jeszcze inny, mniej znany problem, który może przyczynić się do spiczastego zachowania: eksmisja usuniętych stanów.

W szczególności stany "usunięte" lub "przekroczone czasy" nie są natychmiast usuwane z mapy, ale są oznaczone do usunięcia i faktycznie usunięte tylko w procesie serializacji [w Spark 1.6.1, patrz writeObjectInternal()].

Ma to dwie konsekwencje wydajności, które występują tylko raz na 10 porcji:

  1. Proces przejścia i usuwanie ma swoją cenę
  2. Jeśli przetwarza strumień timed-out/usuniętych wydarzeń, na przykład utrzymaj go w pamięci zewnętrznej, koszty związane z wszystkimi 10 partiami zostaną opłacone tylko w tym momencie (a nie jak można by się spodziewać po każdym RDD).
Powiązane problemy