2015-08-19 21 views
6

Próbuję randomizować kolejność elementów w RDD. Moje obecne podejście polega na zip elementach z RDD z przetasowanymi liczbami całkowitymi, a następnie łączenie przez te liczby całkowite.Pyspark: shuffle RDD

Jednak, pyspark przewyższa tylko 100000000 liczb całkowitych. Używam poniższego kodu.

Moje pytanie brzmi: czy istnieje lepszy sposób na zip z losowym indeksem lub w inny sposób shuffle?

Próbowałem sortować według losowego klucza, który działa, ale działa wolno.

def random_indices(n): 
    """ 
    return an iterable of random indices in range(0,n) 
    """ 
    indices = range(n) 
    random.shuffle(indices) 
    return indices 

dodaje się dzieje w pyspark:

Using Python version 2.7.3 (default, Jun 22 2015 19:33:41) 
SparkContext available as sc. 
>>> import clean 
>>> clean.sc = sc 
>>> clean.random_indices(100000000) 
Killed 

Odpowiedz

5

Jednym z możliwych rozwiązań jest dodanie kluczy losowych za pomocą mapParitions

import os 
import numpy as np 

swap = lambda x: (x[1], x[0]) 

def add_random_key(it): 
    # make sure we get a proper random seed 
    seed = int(os.urandom(4).encode('hex'), 16) 
    # create separate generator 
    rs = np.random.RandomState(seed) 
    # Could be randint if you prefer integers 
    return ((rs.rand(), swap(x)) for x in it) 

rdd_with_keys = (rdd 
    # It will be used as final key. If you don't accept gaps 
    # use zipWithIndex but this should be cheaper 
    .zipWithUniqueId() 
    .mapPartitions(add_random_key, preservesPartitioning=True)) 

Następny można podzielić na partycje, sortowania każdej partycji i wyodrębnić wartości:

n = rdd.getNumPartitions() 
(rdd_with_keys 
    # partition by random key to put data on random partition 
    .partitionBy(n) 
    # Sort partition by random value to ensure random order on partition 
    .mapPartitions(sorted, preservesPartitioning=True) 
    # Extract (unique_id, value) pairs 
    .values()) 

Jeśli sortowanie na partycję ma zwolnić, można ją zastąpić shuffle Fisher-Yates.

Jeśli po prostu potrzebują losowych danych można użyć mllib.RandomRDDs

from pyspark.mllib.random import RandomRDDs 

RandomRDDs.uniformRDD(sc, n) 

Teoretycznie może to być spakowane z wejściem rdd ale wymagałoby to dopasowanie liczby elementów na partycji.

+0

Dzięki, jest to przydatne. Potrzebuję kluczy, żeby były unikalne. – Marcin

+0

Czy masz tutaj inne wymagania? Bo jeśli nie, możesz po prostu 'zipWithIndex'' zipWithUniqueId' później. Dodaje kolejną transformację, ale nie jest wyjątkowo kosztowna. – zero323

+0

Potrzebuję kluczy, aby były one losowo zamawiane i unikalne. Mogę sortować według losowego klucza, ale okazuje się to dość powolne. – Marcin