2015-06-20 13 views
18

Sono un principiante di Apache Spark e stavo imparando le funzionalità di base. Ho avuto un piccolo dubbio.Suppongo che ho un RDD di tuple (chiave, valore) e volevo ricavarne alcuni unici. Io uso la funzione distinct(). Mi chiedo su quali basi la funzione considera le tuple come disparate ..? Si basa sulle chiavi, sui valori o su entrambi?Come funziona la funzione Distinct() in Spark?

risposta

8

La documentazione API per RDD.distinct() forniscono solo una descrizione di una frase:

"restituisce un nuovo RDD contenente gli elementi distinti in questa RDD."

Da esperienze recenti posso dirvi che in una tupla RDD viene considerata la tupla nel suo complesso.

Se volete chiavi distinte o valori distinti, poi a seconda di cosa esattamente si vuole realizzare, è possibile:

A. chiamata groupByKey() di trasformare {(k1,v11),(k1,v12),(k2,v21),(k2,v22)} a {(k1,[v11,v12]), (k2,[v21,v22])}; o

B. striscia fuori sia le chiavi o valori chiamando keys() o values() seguita da distinct()

Come di questa scrittura (giugno 2015) UC Berkeley + EDX è in esecuzione un corso online gratuito Introduction to Big Data and Apache Spark che fornirebbe le mani su pratica con queste funzioni.

+0

Hi Paul! Supponiamo di avere una tupla RDD come segue: (1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22), ecc., Qui si può osservare che sia le chiavi che i valori si ripetono in varie tuple. quindi se applico distinto() sul suddetto RDD, quale sarebbe il risultato ..? Si prega di prendere un momento. Grazie! E, sì, stavo seguendo quel corso online! :) –

+2

Al momento non ho tempo ma puoi configurare il tuo RDD con 'myRDD = sc.parallelize ([(1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)]); "Questo potrebbe funzionare anche in uno dei precedenti notebook Lab del corso Spark. Quindi esegui 'myRDD.distinct(). Collect() per testare l'output' – Paul

4

distinct utilizza il metodo hashCode e equals degli oggetti per questa determinazione. Le tuple vengono incorporate nei meccanismi di uguaglianza che delegano all'uguaglianza e alla posizione di ciascun oggetto. Quindi, distinct funzionerà contro l'intero oggetto Tuple2. Come ha sottolineato Paul, puoi chiamare keys o values e poi distinct. Oppure puoi scrivere i tuoi valori distinti tramite aggregateByKey, che manterrebbe l'abbinamento delle chiavi. O se vuoi le chiavi distinte, allora puoi usare un normale aggregate

+0

Grazie! Ha senso. –

22

. Distinct() sta sicuramente facendo un shuffle tra le partizioni. Per vedere di più di ciò che sta accadendo, esegui un .toDebugString sul tuo RDD.

val hashPart = new HashPartitioner(<number of partitions>) 

val myRDDPreStep = <load some RDD> 

val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER) 
myRDD.checkpoint 
println(myRDD.toDebugString) 

che per un esempio RDD ho (myRDDPreStep è già hash-partizionato con chiave, persistito per StorageLevel.MEMORY_AND_DISK_SER, e checkpoint), restituisce:

(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
    | ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
    +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
     | myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated] 
     |  CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B 
     | myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated] 

Nota che ci possono essere più efficienti modi per ottenere un distinto che coinvolge meno shuffles, soprattutto se il tuo RDD è già partizionato in modo intelligente e le partizioni non sono eccessivamente distorte.

Vedi Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct? e Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?

1

Sembra che il distinct sarà sbarazzarsi di duplicati (chiave, valore).

Nell'esempio che segue (1,20) e (2,20) sono ripetuti due volte in myRDD, ma dopo un distinct(), i duplicati vengono rimossi.

scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) 
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22 

scala> myRDD.collect().foreach(println _) 
(1,20) 
(1,21) 
(1,20) 
(2,20) 
(2,22) 
(2,20) 
(3,21) 
(3,22) 

scala> myRDD.distinct.collect().foreach(println _) 
(2,22) 
(1,20) 
(3,22) 
(2,20) 
(1,21) 
(3,21) 
6

Justin Pihony ha ragione.Distinto usa il metodo hashCode e uguale degli oggetti per questa determinazione. Il suo ritorno elementi distinti (oggetto)

val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) 

Distinto

rdd.distinct.collect().foreach(println) 
(2,22) 
(1,20) 
(3,22) 
(2,20) 
(1,21) 
(3,21) 

Se si desidera applicare distinti sulla chiave. In questo caso ridurre di è un'opzione migliore

ReduceBy

val reduceRDD= rdd.map(tup => 
    (tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2) 

reduceRDD.collect().foreach(println) 

uscita: -

(2,20) 
(1,20) 
(3,21) 
Problemi correlati