Zacznijmy od prostej funkcji, która zawsze zwraca losową liczbę całkowitą:Liczby losowe generowanie w PySpark
import numpy as np
def f(x):
return np.random.randint(1000)
i RDD wypełniony zerami i mapowane przy użyciu f
:
rdd = sc.parallelize([0] * 10).map(f)
Od ponad RDD jest nietrwałe Oczekuję, że za każdym razem, gdy będę zbierać, otrzymam inną wydajność:
> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
Jeśli zignorujemy fakt, że rozkład wartości nie wygląda tak przypadkowo, to mniej więcej to, co się dzieje. Problem zaczyna się, kiedy my możemy wziąć tylko pierwszy element:
assert len(set(rdd.first() for _ in xrange(100))) == 1
lub
assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1
Wydaje się zwrócić tę samą liczbę każdym razem. Byłem w stanie odtworzyć to zachowanie na dwóch różnych komputerach ze Spark 1.2, 1.3 i 1.4. Tutaj używam np.random.randint
, ale zachowuje się tak samo z random.randint
.
ten problem, tak samo jak nie-dokładnie-losowych wyników z collect
, wydaje się być specyficzna Python i nie mogę odtworzyć go za pomocą Scala:
def f(x: Int) = scala.util.Random.nextInt(1000)
val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size
rdd.collect()
Czy brakuje czegoś oczywiste tutaj?
Edit:
Okazuje się, że źródło problemu jest implementacja Pythona RNG. Aby zacytować: official documentation:
Funkcje dostarczane przez ten moduł są w rzeczywistości związane metodami ukrytej instancji klasy random.Random. Możesz utworzyć własne instancje losowe, aby uzyskać generatory, które nie współdzielą stanu.
Zakładam NumPy działa w ten sam sposób i przepisywanie f
korzystając RandomState
instancji następująco
import os
import binascii
def f(x, seed=None):
seed = (
seed if seed is not None
else int(binascii.hexlify(os.urandom(4)), 16))
rs = np.random.RandomState(seed)
return rs.randint(1000)
sprawia, że wolniej, ale rozwiązuje problem.
Powyższe wyjaśnia nie losowe wyniki z kolekcji Ciągle nie rozumiem, w jaki sposób wpływa on na różne akcje.
Aby wyjaśnić: jeśli używam losowej funkcji numpy w Sparku, zawsze wybiera ona te same wyniki w każdej partycji? Jak mogę użyć np.random.choice, aby był losowy? – member555
_Wybierz zawsze takie same wyniki w każdym partycji_ - nie dokładnie, ale wartości obliczone na pojedynczym module nie będą niezależne. _Jak mogę użyć np.random.choice, aby był losowy? _ - już opisałem rozwiązanie w edycji. Powinieneś użyć osobnego stanu. Ponieważ jest dość drogi, możesz go zainicjować raz na partycję. – zero323
Czy możesz wyjaśnić mi więcej szczegółów na czym polega problem? dlaczego współdzielony stan Pythona stanowi problem? – member555