6

Quando si utilizza il lib standard di Scala, posso fare somthing come questo:Perché in Spark si aggregano e si piegano due API diverse?

scala> val scalaList = List(1,2,3) 
scalaList: List[Int] = List(1, 2, 3) 

scala> scalaList.foldLeft(0)((acc,n)=>acc+n) 
res0: Int = 6 

Facendo Int di molti Ints.

E posso fare qualcosa di simile:

scala> scalaList.foldLeft("")((acc,n)=>acc+n.toString) 
res1: String = 123 

Fare una stringa di molti Ints.

Quindi, foldLeft potrebbe essere o omogeneo o eterogeneo, qualunque sia il nostro obiettivo, è in un'unica API.

Mentre in Spark, se voglio uno Int di molti Ints, posso fare questo:

scala> val rdd = sc.parallelize(List(1,2,3)) 
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:12 
scala> rdd.fold(0)((acc,n)=>acc+n) 
res1: Int = 6 

L'API piega è simile a foldLeft, ma è omogenea solo, un RDD [Int] posso produce solo Int con piega.

C'è un'API aggregato scintille anche:

scala> rdd.aggregate("")((acc,n)=>acc+n.toString, (s1,s2)=>s1+s2) 
res11: String = 132 

È eterogeneo, un RDD [Int] può produrre una stringa ora.

Quindi, perché le funzioni di piega e aggregazione sono implementate come due API diverse in Spark?

Perché non sono progettati come foldLeft che potrebbe essere sia omogeneo che eterogeneo?

(Sono molto nuovo a Spark, si prega di scusarmi se questa è una domanda stupida.)

risposta

1

fold può essere implementato in modo più efficiente, perché non dipende da un ordine fisso di valutazione. Quindi ogni nodo del cluster può fold il proprio chunk in parallelo e quindi un piccolo valore complessivo fold alla fine. Mentre con foldLeft ogni elemento deve essere piegato in ordine e nulla può essere fatto in parallelo.

(Inoltre è bello avere un API semplice per il caso comune per comodità. Il lib standard ha reduce così come foldLeft per questo motivo)

+0

Per essere più precisi, 'fold' in Spark richiede sia l'associatività _and_ commutatività, mentre 'fold' in Scala e altri framework non distribuiti non richiede commutatività. Controlla la documentazione di Spark [qui] (https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#fold (T,% 20scala.Function2)) – FuzzY

2

In particolare nel Spark, il calcolo è distribuito e fatto in parallelo, quindi non è possibile implementare foldLeft come nella libreria standard. Invece, l'aggregato richiede due funzioni, una che esegue un'operazione simile a fold su ogni elemento di tipo T, producendo un valore di tipo U, e un'altra che combina il U da ogni partizione nel valore finale:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U 
0

foldLeft, foldRight, reduceLeft, reduceRight, scanLeft e scanRight sono operazioni in cui il parametro accumulato può essere diverso dai parametri di input ((A, B) -> B) e tali operazioni possono essere eseguite solo in modo sequenziale.

fold è un'operazione in cui il parametro accumulato deve essere lo stesso tipo dei parametri di ingresso ((A, A) -> A). Quindi può essere eseguito in parallelo.

aggregation è un'operazione in cui il parametro accumulato può essere di tipo diverso come i parametri di input, ma quindi è necessario fornire una funzione aggiuntiva che definisce come i parametri accumulati possono essere aggregati nel risultato finale. Questa operazione consente l'esecuzione parallela. L'operazione aggregation è una combinazione di foldLeft e fold.

Per informazioni più dettagliate, si può avere uno sguardo ai video Coursera per la "Programmazione parallela" corso:

Problemi correlati