5

Ho un dataframe che assomiglia segue:Spark, dataframe: applicare trasformatore/stimatore sui gruppi

+-----------+-----+------------+ 
|  userID|group| features| 
+-----------+-----+------------+ 
|12462563356| 1| [5.0,43.0]| 
|12462563701| 2| [1.0,8.0]| 
|12462563701| 1| [2.0,12.0]| 
|12462564356| 1| [1.0,1.0]| 
|12462565487| 3| [2.0,3.0]| 
|12462565698| 2| [1.0,1.0]| 
|12462565698| 1| [1.0,1.0]| 
|12462566081| 2| [1.0,2.0]| 
|12462566081| 1| [1.0,15.0]| 
|12462566225| 2| [1.0,1.0]| 
|12462566225| 1| [9.0,85.0]| 
|12462566526| 2| [1.0,1.0]| 
|12462566526| 1| [3.0,79.0]| 
|12462567006| 2| [11.0,15.0]| 
|12462567006| 1| [10.0,15.0]| 
|12462567006| 3| [10.0,15.0]| 
|12462586595| 2| [2.0,42.0]| 
|12462586595| 3| [2.0,16.0]| 
|12462589343| 3| [1.0,1.0]| 
+-----------+-----+------------+ 

Dove i tipi di colonne sono: userID: Lungo, gruppo: Int, e le caratteristiche: Vettore.

Questo è già un DataFrame raggruppato, cioè un ID utente apparirà in un particolare gruppo al massimo una volta.

Il mio obiettivo è scalare la colonna features per gruppo.

C'è un modo per applicare un feature transformer (nel mio caso mi piacerebbe applicare un StandardScaler) per gruppo invece di applicare appieno dataframe.

P.S. usare ML non è obbligatorio, quindi nessun problema se la soluzione è basata su MLlib.

+0

Come pensate di installare lo standardScaler? In ogni gruppo? – eliasah

+0

Mi piacerebbe ridimensionare ogni dimensione del vettore delle caratteristiche, per gruppo. – Rami

+1

AFAIK non lo fa, ma puoi sempre applicare tutte le operazioni direttamente. Scaler funziona su RDD comunque, quindi è solo una questione di statistiche di calcolo e di trasformazione per gruppo. – zero323

risposta

5

È possibile calcolare le statistiche per gruppo con quasi lo stesso codice come predefinito Scaler:

import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer 
import org.apache.spark.mllib.linalg.{Vector, Vectors} 
import org.apache.spark.sql.Row 

// Compute Multivariate Statistics 
val summary = data.select($"group", $"features") 
    .rdd 
    .map { 
     case Row(group: Int, features: Vector) => (group, features) 
    } 
    .aggregateByKey(new MultivariateOnlineSummarizer)(/* Create an empty new MultivariateOnlineSummarizer */ 
     (agg, v) => agg.add(v), /* seqOp : Add a new sample Vector to this summarizer, and update the statistical summary. */ 
     (agg1, agg2) => agg1.merge(agg2)) /* combOp : As MultivariateOnlineSummarizer accepts a merge action with another MultivariateOnlineSummarizer, and update the statistical summary. */ 
    .mapValues { 
     s => (
     s.variance.toArray.map(math.sqrt(_)), /* compute the square root variance for each key */ 
     s.mean.toArray /* fetch the mean for each key */ 
    ) 
    }.collectAsMap 

Se numero previsto di gruppi è relativamente basso è possibile trasmettere questi:

val summaryBd = sc.broadcast(summary) 

e trasformare i dati :

val scaledRows = df.map{ case Row(userID, group: Int, features: Vector) => 
    val (stdev, mean) = summaryBd.value(group) 
    val vs = features.toArray.clone() 
    for (i <- 0 until vs.size) { 
    vs(i) = if(stdev(i) == 0.0) 0.0 else (vs(i) - mean(i)) * (1/stdev(i)) 
    } 
    Row(userID, group, Vectors.dense(vs)) 
} 
val scaledDf = sqlContext.createDataFrame(scaledRows, df.schema) 

Altrimenti puoi semplicemente unirti. Non dovrebbe essere difficile avvolgerlo come un trasformatore ML con una colonna di gruppo come parametro.

+1

Questa è una risposta eccellente! – eliasah

Problemi correlati