2015-04-25 13 views
9

mi sono imbattuto in this line nella sorgente di Apache codice di Sparkcome interpretare RDD.treeAggregate

val (gradientSum, lossSum, miniBatchSize) = data 
    .sample(false, miniBatchFraction, 42 + i) 
    .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
     seqOp = (c, v) => { 
     // c: (grad, loss, count), v: (label, features) 
     val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1)) 
     (c._1, c._2 + l, c._3 + 1) 
     }, 
     combOp = (c1, c2) => { 
     // c: (grad, loss, count) 
     (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) 
     } 
    ) 

ho più difficoltà a leggere questo:

  • un primo momento non riesco a trovare nulla sul web che spiega esattamente come funziona treeAggregate, qual è il significato dei parametri.
  • In secondo luogo, qui .treeAggregate sembra avere due()() seguendo il nome del metodo. Cosa potrebbe significare? È una qualche scala sintassi speciale che non capisco.
  • Infine, vedo seqOp e comboOp restituiscono una tupla di 3 elementi che corrisponde alla variabile del lato sinistro atteso, ma quale viene effettivamente restituita?

Questa dichiarazione deve essere davvero avanzata. Non posso iniziare a decifrarlo.

risposta

13

treeAggregate è un'implementazione specializzata di aggregate che applica in modo iterativo la funzione di combinazione a un sottoinsieme di partizioni. Questo viene fatto al fine di evitare il ritorno di tutti i risultati parziali al guidatore dove una riduzione del passaggio singolo avverrebbe come fa il classico aggregate.

Per tutti gli scopi pratici, treeAggregate segue lo stesso principio aggregate spiegato in questa risposta: Explain the aggregate functionality in Python con l'eccezione che richiede un parametro supplementare per indicare la profondità del livello di aggregazione parziale.

Vorrei provare a spiegare cosa sta succedendo qui in particolare:

Per aggregata, abbiamo bisogno di uno zero, una funzione combinatore e una funzione di ridurre. aggregate utilizza currying per specificare il valore zero indipendentemente dalla combinazione e ridurre le funzioni.

Possiamo quindi sezionare la funzione di cui sopra in questo modo. Speriamo che aiuta la comprensione:

val Zero: (BDV, Double, Long) = (BDV.zeros[Double](n), 0.0, 0L) 
val combinerFunction: ((BDV, Double, Long), (??, ??)) => (BDV, Double, Long) = (c, v) => { 
     // c: (grad, loss, count), v: (label, features) 
     val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1)) 
     (c._1, c._2 + l, c._3 + 1) 
val reducerFunction: ((BDV, Double, Long),(BDV, Double, Long)) => (BDV, Double, Long) = (c1, c2) => { 
     // c: (grad, loss, count) 
     (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) 
     } 

Poi possiamo riscrivere la chiamata a treeAggregate in una forma più digeribile:

val (gradientSum, lossSum, miniBatchSize) = treeAggregate(Zero)(combinerFunction, reducerFunction) 

Questa forma sara 'estratto' la tupla risultante nei valori denominati gradientSum, lossSum, miniBatchSize per un ulteriore utilizzo .

Si noti che treeAggregate prende un ulteriore parametro depth che viene dichiarato con un valore predefinito depth = 2, pertanto, poiché non viene fornito in questa particolare chiamata, prenderà quel valore predefinito.

Problemi correlati