2014-05-11 10 views
7

Vorrei eseguire alcune trasformazioni solo su un sottoinsieme di un RDD (per rendere più veloce la sperimentazione in REPL).Esecuzione delle operazioni solo sul sottoinsieme di un RDD

È possibile?

RDD ha take(num: Int): Array[T] metodo, penso che avrei bisogno di qualcosa di simile, ma tornando RDD [T]

+0

è la tua domanda ancora aperta? Se hai una risposta accettabile, non dimenticare di contrassegnarla come tale. – maasg

risposta

16

È possibile utilizzare RDD.sample per ottenere un RDD fuori, non uno Array. Ad esempio, per campionare ~ 1% senza sostituzione:

val data = ... 
data.count 
... 
res1: Long = 18066983 

val sample = data.sample(false, 0.01, System.currentTimeMillis().toInt) 
sample.count 
... 
res3: Long = 180190 

Il terzo parametro è un seme, ed è per fortuna facoltativa nella prossima versione Spark.

0

A quanto pare è possibile creare RDD sottoinsieme dalla prima utilizzando il suo metodo take e poi passando matrice restituita alla SparkContext di makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism) che restituisce nuovo RDD.

Questo approccio mi sembra tuttavia poco chiaro. C'è un modo migliore?

0

Io uso sempre la funzione parallelize di SparkContext per distribuirla dall'array [T] ma sembra che makeRDD faccia lo stesso. È il modo corretto di entrambi.

0

RDD s sono raccolte distribuite che si materializzano solo sulle azioni. Non è possibile Tronca tua RDD ad una dimensione fissa, e ancora ottenere un RDD posteriore (da qui RDD.take(n) restituisce un Array[T], proprio come collect)

che si desidera ottenere dimensioni simili RDD s, indipendentemente dalle dimensioni di input , è possibile troncare gli elementi in ciascuna delle partizioni, in questo modo è possibile controllare meglio il numero assoluto di elementi risultanti in RDD. La dimensione del RDD risultante dipenderà dal parallelismo della scintilla.

Un esempio dal spark-shell:

import org.apache.spark.rdd.RDD 
val numberOfPartitions = 1000 

val millionRdd: RDD[Int] = sc.parallelize(1 to 1000000, numberOfPartitions) 

val millionRddTruncated: RDD[Int] = rdd.mapPartitions(_.take(10)) 

val billionRddTruncated: RDD[Int] = sc.parallelize(1 to 1000000000, numberOfPartitions).mapPartitions(_.take(10)) 

millionRdd.count   // 1000000 
millionRddTruncated.count // 10000 = 10 item * 1000 partitions 
billionRddTruncated.count // 10000 = 10 item * 1000 partitions 
Problemi correlati