2015-03-12 15 views
38

Ho bisogno di utilizzare il metodoCome convertire un DataFrame in normale RDD in pyspark?

(rdd.)partitionBy(npartitions, custom_partitioner) 

che non è disponibile sul dataframe. Tutti i metodi DataFrame si riferiscono solo ai risultati DataFrame. Allora, come creare un RDD dai dati DataFrame?

Nota: questo è un cambiamento (in 1.3.0) da 1.2.0.

Aggiornamento dalla risposta di @dpangmao: il metodo è .rdd. Ero interessato a capire se (a) fosse pubblico e (b) quali sono le implicazioni sulla performance.

Well (a) è sì e (b) - ben si può vedere qui che ci sono significative implicazioni Potenza: un nuovo RDD deve essere creato invocando mapPartitions:

In dataframe.py (prendere nota del nome del file cambiato così (era sql.py):

@property 
def rdd(self): 
    """ 
    Return the content of the :class:`DataFrame` as an :class:`RDD` 
    of :class:`Row` s. 
    """ 
    if not hasattr(self, '_lazy_rdd'): 
     jrdd = self._jdf.javaToPython() 
     rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) 
     schema = self.schema 

     def applySchema(it): 
      cls = _create_cls(schema) 
      return itertools.imap(cls, it) 

     self._lazy_rdd = rdd.mapPartitions(applySchema) 

    return self._lazy_rdd 

risposta

74

Utilizzare il metodo .rdd in questo modo:

rdd = df.rdd 
+1

sì hai ragione. Ho aggiornato l'OP dopo aver scavato più a fondo in questo. – javadba

+14

sì ma si converte in org.apache.spark.rdd.RDD [org.apache.spark.sql.Row] ma non org.apache.spark.rdd.RDD [stringa] –

38

La risposta di @ dapangmao funziona, ma non fornisce la normale scintilla RDD, ma restituisce un oggetto Riga. Se si desidera avere il normale formato RDD.

Prova questo:

rdd = df.rdd.map(tuple) 

o

rdd = df.rdd.map(list) 
+1

Questo dovrebbe essere il comportamento predefinito imo quando si chiama 'df.rdd' –

Problemi correlati