2015-08-19 28 views
6

Sto provando a randomizzare l'ordine degli elementi in un RDD. Il mio attuale approccio è quello di comprimere gli elementi con un RDD di interi shuffled, per poi unirmi a quegli interi.Pyspark: shuffle RDD

Tuttavia, pyspark si interrompe con solo 100000000 numeri interi. Sto usando il codice qui sotto

La mia domanda è: c'è un modo migliore per zip con l'indice casuale o in altro modo shuffle?

Ho provato a ordinare con una chiave casuale, che funziona, ma è lento.

def random_indices(n): 
    """ 
    return an iterable of random indices in range(0,n) 
    """ 
    indices = range(n) 
    random.shuffle(indices) 
    return indices 

accade quanto segue in pyspark:

Using Python version 2.7.3 (default, Jun 22 2015 19:33:41) 
SparkContext available as sc. 
>>> import clean 
>>> clean.sc = sc 
>>> clean.random_indices(100000000) 
Killed 

risposta

5

Un possibile approccio è quello di aggiungere chiavi casuali usando mapParitions

import os 
import numpy as np 

swap = lambda x: (x[1], x[0]) 

def add_random_key(it): 
    # make sure we get a proper random seed 
    seed = int(os.urandom(4).encode('hex'), 16) 
    # create separate generator 
    rs = np.random.RandomState(seed) 
    # Could be randint if you prefer integers 
    return ((rs.rand(), swap(x)) for x in it) 

rdd_with_keys = (rdd 
    # It will be used as final key. If you don't accept gaps 
    # use zipWithIndex but this should be cheaper 
    .zipWithUniqueId() 
    .mapPartitions(add_random_key, preservesPartitioning=True)) 

potrai ripartizionare, ordinare ogni partizione ed estrarre i valori:

n = rdd.getNumPartitions() 
(rdd_with_keys 
    # partition by random key to put data on random partition 
    .partitionBy(n) 
    # Sort partition by random value to ensure random order on partition 
    .mapPartitions(sorted, preservesPartitioning=True) 
    # Extract (unique_id, value) pairs 
    .values()) 

Se l'ordinamento per partizione è ancora lento, potrebbe essere sostituito da Fisher-Yates shuffle.

Se avete semplicemente bisogno di un dati casuali quindi è possibile utilizzare mllib.RandomRDDs

from pyspark.mllib.random import RandomRDDs 

RandomRDDs.uniformRDD(sc, n) 

Teoricamente potrebbe essere compresso con ingresso rdd ma richiederebbe che corrisponde al numero di elementi per partizione.

+0

Grazie, questo è utile. In realtà ho bisogno che le chiavi siano uniche. – Marcin

+0

Avete altri requisiti qui? Perché se no, puoi semplicemente 'zipWithIndex'' zipWithUniqueId' dopo. Aggiunge un'altra trasformazione ma non è estremamente costosa. – zero323

+0

Ho bisogno che le chiavi siano ordinate in modo casuale e uniche. Posso ordinare per chiave casuale, ma ciò si rivela piuttosto lento. – Marcin