Sono un principiante di Apache Spark e stavo imparando le funzionalità di base. Ho avuto un piccolo dubbio.Suppongo che ho un RDD di tuple (chiave, valore) e volevo ricavarne alcuni unici. Io uso la funzione distinct(). Mi chiedo su quali basi la funzione considera le tuple come disparate ..? Si basa sulle chiavi, sui valori o su entrambi?Come funziona la funzione Distinct() in Spark?
risposta
La documentazione API per RDD.distinct() forniscono solo una descrizione di una frase:
"restituisce un nuovo RDD contenente gli elementi distinti in questa RDD."
Da esperienze recenti posso dirvi che in una tupla RDD viene considerata la tupla nel suo complesso.
Se volete chiavi distinte o valori distinti, poi a seconda di cosa esattamente si vuole realizzare, è possibile:
A. chiamata groupByKey()
di trasformare {(k1,v11),(k1,v12),(k2,v21),(k2,v22)}
a {(k1,[v11,v12]), (k2,[v21,v22])}
; o
B. striscia fuori sia le chiavi o valori chiamando keys()
o values()
seguita da distinct()
Come di questa scrittura (giugno 2015) UC Berkeley + EDX è in esecuzione un corso online gratuito Introduction to Big Data and Apache Spark che fornirebbe le mani su pratica con queste funzioni.
distinct
utilizza il metodo hashCode
e equals
degli oggetti per questa determinazione. Le tuple vengono incorporate nei meccanismi di uguaglianza che delegano all'uguaglianza e alla posizione di ciascun oggetto. Quindi, distinct
funzionerà contro l'intero oggetto Tuple2
. Come ha sottolineato Paul, puoi chiamare keys
o values
e poi distinct
. Oppure puoi scrivere i tuoi valori distinti tramite aggregateByKey
, che manterrebbe l'abbinamento delle chiavi. O se vuoi le chiavi distinte, allora puoi usare un normale aggregate
Grazie! Ha senso. –
. Distinct() sta sicuramente facendo un shuffle tra le partizioni. Per vedere di più di ciò che sta accadendo, esegui un .toDebugString sul tuo RDD.
val hashPart = new HashPartitioner(<number of partitions>)
val myRDDPreStep = <load some RDD>
val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)
che per un esempio RDD ho (myRDDPreStep è già hash-partizionato con chiave, persistito per StorageLevel.MEMORY_AND_DISK_SER, e checkpoint), restituisce:
(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
| CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
| myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]
Nota che ci possono essere più efficienti modi per ottenere un distinto che coinvolge meno shuffles, soprattutto se il tuo RDD è già partizionato in modo intelligente e le partizioni non sono eccessivamente distorte.
Vedi Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct? e Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?
Sembra che il distinct
sarà sbarazzarsi di duplicati (chiave, valore).
Nell'esempio che segue (1,20) e (2,20) sono ripetuti due volte in myRDD
, ma dopo un distinct()
, i duplicati vengono rimossi.
scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22
scala> myRDD.collect().foreach(println _)
(1,20)
(1,21)
(1,20)
(2,20)
(2,22)
(2,20)
(3,21)
(3,22)
scala> myRDD.distinct.collect().foreach(println _)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
Justin Pihony ha ragione.Distinto usa il metodo hashCode e uguale degli oggetti per questa determinazione. Il suo ritorno elementi distinti (oggetto)
val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
Distinto
rdd.distinct.collect().foreach(println)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
Se si desidera applicare distinti sulla chiave. In questo caso ridurre di è un'opzione migliore
ReduceBy
val reduceRDD= rdd.map(tup =>
(tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2)
reduceRDD.collect().foreach(println)
uscita: -
(2,20)
(1,20)
(3,21)
- 1. Come funziona la funzione di aggregazione Spark: aggregateByKey?
- 2. Come funziona SELECT DISTINCT in MySQL?
- 3. In che modo la clausola DISTINCT di SQL funziona?
- 4. Funzione takeSample() in Spark
- 5. Come funziona la funzione mapPartitions di pyspark?
- 6. Come funziona la funzione "tutto" in Python?
- 7. Come funziona la funzione map() in Processing?
- 8. Come funziona la funzione printf in C?
- 9. come funziona la funzione system() in C++?
- 10. La funzione finestra Last_value non funziona correttamente
- 11. Spark e SparkSQL: come imitare la funzione finestra?
- 12. SUM() non funziona in MySQL: SUM() con DISTINCT
- 13. Come funziona `mail` di PHP? la funzione
- 14. Come funziona la seguente definizione di funzione?
- 15. Apache Spark: distinto non funziona?
- 16. la funzione seekTo() non funziona in VideoView
- 17. ON DISTINCT in Django
- 18. Come funziona la funzione noConflict di jQuery?
- 19. Come funziona la funzione di riduzione?
- 20. La funzione openOptionsMenu non funziona in ICS?
- 21. Utilizzo di DISTINCT in JPA
- 22. Spark toDebugString non bello in python
- 23. DISTINCT clausola in SQLite
- 24. Apache Spark lancia NullPointerException quando si incontra la funzione mancante
- 25. Come funziona la funzione sizeof() per Structures in C?
- 26. come funziona la funzione di callback in python multiprocessing map_async
- 27. SELECT DISTINCT in Scala slick
- 28. Come funziona questa funzione "ritardo" funziona
- 29. Applicare la funzione a ciascuna riga di Spark DataFrame
- 30. funzione init in javascript e come funziona
Hi Paul! Supponiamo di avere una tupla RDD come segue: (1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22), ecc., Qui si può osservare che sia le chiavi che i valori si ripetono in varie tuple. quindi se applico distinto() sul suddetto RDD, quale sarebbe il risultato ..? Si prega di prendere un momento. Grazie! E, sì, stavo seguendo quel corso online! :) –
Al momento non ho tempo ma puoi configurare il tuo RDD con 'myRDD = sc.parallelize ([(1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)]); "Questo potrebbe funzionare anche in uno dei precedenti notebook Lab del corso Spark. Quindi esegui 'myRDD.distinct(). Collect() per testare l'output' – Paul