2016-01-25 16 views
5

Sto lavorando con i dati estratti da SFDC utilizzando il pacchetto simple-salesforce. Sto usando Python3 per lo scripting e Spark 1.5.2.Crea DataFrame dall'elenco delle tuple usando pyspark

ho creato un RDD contenente i seguenti dati:

[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')] 
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')] 
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')] 
... 

Questi dati sono in RDD chiamato v_rdd

mio schema è simile al seguente:

StructType(List(StructField(Id,StringType,true),StructField(PackSize,StringType,true),StructField(Name,StringType,true))) 

Sto cercando di creare dataframe fuori da questo RDD:

sqlDataFrame = sqlContext.createDataFrame(v_rdd, schema) 

stampano i dataframe:

sqlDataFrame.printSchema() 

e ottenere il seguente:

+--------------------+--------------------+--------------------+ 
|     Id| PackSize|       Name| 
+--------------------+--------------------+--------------------+ 
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...| 
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...| 
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...| 

mi aspetto di vedere i dati reali, in questo modo:

+------------------+------------------+--------------------+ 
|    Id|PackSize|       Name| 
+------------------+------------------+--------------------+ 
|a0w1a0000003xB1A |    1.0|  A   | 
|a0w1a0000003xAAI |    1.0|  B   | 
|a0w1a00000xB3AAI |    30.0|  C   | 

Potete per favore aiutarmi a identificare cosa sto facendo male qui.

Il mio script Python è lungo, non sono sicuro che sarebbe conveniente per le persone vagliarlo, quindi ho pubblicato solo le parti con cui sto avendo un problema.

Grazie mille in anticipo!

risposta

12

Ehi potresti la prossima volta fornire un esempio funzionante. Sarebbe più facile

Il modo in cui viene presentato il vostro RDD è fondamentalmente strano per creare un DataFrame. Ecco come si crea un DF secondo Spark Documentation.

>>> l = [('Alice', 1)] 
>>> sqlContext.createDataFrame(l).collect() 
[Row(_1=u'Alice', _2=1)] 
>>> sqlContext.createDataFrame(l, ['name', 'age']).collect() 
[Row(name=u'Alice', age=1)] 

Quindi per quanto riguarda il vostro esempio è possibile creare l'output desiderato come questo senso:

# Your data at the moment 
data = sc.parallelize([ 
[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')], 
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')], 
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')] 
    ]) 
# Convert to tuple 
data_converted = data.map(lambda x: (x[0][1], x[1][1], x[2][1])) 

# Define schema 
schema = StructType([ 
    StructField("Id", StringType(), True), 
    StructField("Packsize", StringType(), True), 
    StructField("Name", StringType(), True) 
]) 

# Create dataframe 
DF = sqlContext.createDataFrame(data_converted, schema) 

# Output 
DF.show() 
+----------------+--------+----+ 
|    Id|Packsize|Name| 
+----------------+--------+----+ 
|a0w1a0000003xB1A|  1.0| A| 
|a0w1a0000003xAAI|  1.0| B| 
|a0w1a00000xB3AAI| 30.0| C| 
+----------------+--------+----+ 

Spero che questo aiuti