Ho bisogno di utilizzare il metodoCome convertire un DataFrame in normale RDD in pyspark?
(rdd.)partitionBy(npartitions, custom_partitioner)
che non è disponibile sul dataframe. Tutti i metodi DataFrame si riferiscono solo ai risultati DataFrame. Allora, come creare un RDD dai dati DataFrame?
Nota: questo è un cambiamento (in 1.3.0) da 1.2.0.
Aggiornamento dalla risposta di @dpangmao: il metodo è .rdd. Ero interessato a capire se (a) fosse pubblico e (b) quali sono le implicazioni sulla performance.
Well (a) è sì e (b) - ben si può vedere qui che ci sono significative implicazioni Potenza: un nuovo RDD deve essere creato invocando mapPartitions:
In dataframe.py (prendere nota del nome del file cambiato così (era sql.py):
@property
def rdd(self):
"""
Return the content of the :class:`DataFrame` as an :class:`RDD`
of :class:`Row` s.
"""
if not hasattr(self, '_lazy_rdd'):
jrdd = self._jdf.javaToPython()
rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
schema = self.schema
def applySchema(it):
cls = _create_cls(schema)
return itertools.imap(cls, it)
self._lazy_rdd = rdd.mapPartitions(applySchema)
return self._lazy_rdd
sì hai ragione. Ho aggiornato l'OP dopo aver scavato più a fondo in questo. – javadba
sì ma si converte in org.apache.spark.rdd.RDD [org.apache.spark.sql.Row] ma non org.apache.spark.rdd.RDD [stringa] –