2015-08-09 12 views
7

Zacznijmy od prostej funkcji, która zawsze zwraca losową liczbę całkowitą:Liczby losowe generowanie w PySpark

import numpy as np 

def f(x): 
    return np.random.randint(1000) 

i RDD wypełniony zerami i mapowane przy użyciu f:

rdd = sc.parallelize([0] * 10).map(f) 

Od ponad RDD jest nietrwałe Oczekuję, że za każdym razem, gdy będę zbierać, otrzymam inną wydajność:

> rdd.collect() 
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255] 

Jeśli zignorujemy fakt, że rozkład wartości nie wygląda tak przypadkowo, to mniej więcej to, co się dzieje. Problem zaczyna się, kiedy my możemy wziąć tylko pierwszy element:

assert len(set(rdd.first() for _ in xrange(100))) == 1 

lub

assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1 

Wydaje się zwrócić tę samą liczbę każdym razem. Byłem w stanie odtworzyć to zachowanie na dwóch różnych komputerach ze Spark 1.2, 1.3 i 1.4. Tutaj używam np.random.randint, ale zachowuje się tak samo z random.randint.

ten problem, tak samo jak nie-dokładnie-losowych wyników z collect, wydaje się być specyficzna Python i nie mogę odtworzyć go za pomocą Scala:

def f(x: Int) = scala.util.Random.nextInt(1000) 

val rdd = sc.parallelize(List.fill(10)(0)).map(f) 
(1 to 100).map(x => rdd.first).toSet.size 

rdd.collect() 

Czy brakuje czegoś oczywiste tutaj?

Edit:

Okazuje się, że źródło problemu jest implementacja Pythona RNG. Aby zacytować: official documentation:

Funkcje dostarczane przez ten moduł są w rzeczywistości związane metodami ukrytej instancji klasy random.Random. Możesz utworzyć własne instancje losowe, aby uzyskać generatory, które nie współdzielą stanu.

Zakładam NumPy działa w ten sam sposób i przepisywanie f korzystając RandomState instancji następująco

import os 
import binascii 

def f(x, seed=None): 
    seed = (
     seed if seed is not None 
     else int(binascii.hexlify(os.urandom(4)), 16)) 
    rs = np.random.RandomState(seed) 
    return rs.randint(1000) 

sprawia, że ​​wolniej, ale rozwiązuje problem.

Powyższe wyjaśnia nie losowe wyniki z kolekcji Ciągle nie rozumiem, w jaki sposób wpływa on na różne akcje.

+0

Aby wyjaśnić: jeśli używam losowej funkcji numpy w Sparku, zawsze wybiera ona te same wyniki w każdej partycji? Jak mogę użyć np.random.choice, aby był losowy? – member555

+0

_Wybierz zawsze takie same wyniki w każdym partycji_ - nie dokładnie, ale wartości obliczone na pojedynczym module nie będą niezależne. _Jak mogę użyć np.random.choice, aby był losowy? _ - już opisałem rozwiązanie w edycji. Powinieneś użyć osobnego stanu. Ponieważ jest dość drogi, możesz go zainicjować raz na partycję. – zero323

+0

Czy możesz wyjaśnić mi więcej szczegółów na czym polega problem? dlaczego współdzielony stan Pythona stanowi problem? – member555

Odpowiedz

2

Rzeczywisty problem jest stosunkowo prosty.Każdy podproces w Pythonie dziedziczy swój stan od jego rodzica:

len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect())) 
# 1 

Ponieważ stan rodzic nie ma powodu do zmiany w tym konkretnym scenariuszu, a pracownicy mają ograniczoną żywotność, stan każdego dziecka będą dokładnie takie same na każdym biegu.

1

To wydaje się być bug (lub funkcja) z randint. Widzę to samo zachowanie, ale zaraz po zmianie f, wartości rzeczywiście się zmieniają. Tak więc, nie jestem pewien rzeczywistej losowości tej metody .... Nie mogę znaleźć żadnej dokumentacji, ale wydaje się, że za pomocą jakiegoś deterministycznego algorytmu matematycznego zamiast korzystania z bardziej zmiennych funkcji uruchomionej maszyny. Nawet jeśli powracam, numery wydają się być takie same po powrocie do pierwotnej wartości ...

+0

Jest to generator pseudolosowy implementujący Mersenne Twister, ale nie powinien to być problem. Problem jest definitywnie związany ze wspólną klasą "Losową" (zredagowałem pytanie, aby to odzwierciedlić), ale to, w jaki sposób wpływa ona na "pierwszy" wynik, wciąż mnie zastanawia. – zero323