2015-09-14 11 views
10

Ultimamente sto pianificando di migrare il mio codice ML Python standalone per accendere. La pipeline ML nel numero spark.ml risulta piuttosto a portata di mano, con un'API semplificata per concatenare gli stadi dell'algoritmo e la ricerca della griglia iperparametrica.Risultati intermedi nella cache nella pipeline Spark ML

Tuttavia, ho trovato il supporto per una caratteristica importante oscura nei documenti esistenti: memorizzazione nella cache dei risultati intermedi. L'importanza di questa funzionalità si verifica quando la pipeline coinvolge fasi intensive di calcolo.

Ad esempio, nel mio caso uso un'enorme matrice sparsa per eseguire più medie mobili sui dati delle serie temporali al fine di formare caratteristiche di input. La struttura della matrice è determinata da alcuni iperparametri. Questo passaggio si rivela un collo di bottiglia per l'intera pipeline perché devo costruire la matrice in runtime.

Durante la ricerca parametri, in genere ho altri parametri da esaminare diversi da questo "parametro struttura". Quindi se riesco a riutilizzare l'enorme matrice quando il "parametro di struttura" è invariato, posso risparmiare un sacco di tempo. Per questo motivo, ho intenzionalmente formato il mio codice per memorizzare e riutilizzare questi risultati intermedi.

Quindi la mia domanda è: la pipeline ML di Spark gestisce automaticamente il caching intermedio? O devo compilare manualmente il codice per farlo? Se è così, c'è qualche buona pratica da cui imparare?

P.S. Ho esaminato il documento ufficiale e qualche altro materiale, ma nessuno di loro sembra discutere questo argomento.

+0

ho una [domanda relativa] (http://stackoverflow.com/questions/33161320/distributed-batch-computation -con-lungo-termine-persistenza-e-checkpoint) che sfortunatamente non ha nemmeno risposte. –

risposta

4

Quindi mi sono imbattuto nello stesso problema e il modo in cui ho risolto è che ho implementato il mio PipelineStage, che memorizza nella cache DataSet di input e lo restituisce così com'è.

import org.apache.spark.ml.Transformer 
import org.apache.spark.ml.param.ParamMap 
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable} 
import org.apache.spark.sql.{DataFrame, Dataset} 
import org.apache.spark.sql.types.StructType 

class Cacher(val uid: String) extends Transformer with DefaultParamsWritable { 
    override def transform(dataset: Dataset[_]): DataFrame = dataset.toDF.cache() 

    override def copy(extra: ParamMap): Transformer = defaultCopy(extra) 

    override def transformSchema(schema: StructType): StructType = schema 

    def this() = this(Identifiable.randomUID("CacherTransformer")) 
} 

di usarlo, allora si farebbe qualcosa di simile:

new Pipeline().setStages(Array(stage1, new Cacher(), stage2)) 
+1

Immagino che l'unico problema con questa sia la soluzione (che ho effettivamente upvoted!) È che non si interrompe il dataframe memorizzato nella cache (se si collega più Cacher). Potresti obiettare che non è un problema perché Spark automaticamente non persiste all'ora di GC, ma può diventare piuttosto confuso dalla tua UI, ad es. vedere così tanti dati nella cache. –

Problemi correlati