2015-04-29 18 views
12

Sto usando python su Spark e vorrei ottenere un csv in un dataframe.Ottieni CSV su Spark dataframe

Lo documentation per Spark SQL in modo strano non fornisce spiegazioni per CSV come origine.

ho trovato Spark-CSV, però ho problemi con due parti della documentazione:

  • "This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3" Ho davvero bisogno di aggiungere questo ogni argomento mi lancio pyspark o scintille presentare? Sembra molto inelegante. Non c'è un modo per importarlo in Python piuttosto che riscaricarlo ogni volta?

  • df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv") Anche se faccio quanto sopra, questo non funzionerà. Cosa significa l'argomento "sorgente" in questa riga di codice? Come faccio semplicemente a caricare un file locale su linux, ad esempio "/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv"?

risposta

11

Leggere il file csv in un RDD e quindi generare un RowRDD dall'RDD originale.

creare lo schema rappresentato da uno StructType tinta con la struttura di righe nel RDD creato nel passaggio 1.

Applicare lo schema per l'RDD di righe tramite il metodo createDataFrame fornito da SqlContext.

lines = sc.textFile("examples/src/main/resources/people.txt") 
parts = lines.map(lambda l: l.split(",")) 
# Each line is converted to a tuple. 
people = parts.map(lambda p: (p[0], p[1].strip())) 

# The schema is encoded in a string. 
schemaString = "name age" 

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] 
schema = StructType(fields) 

# Apply the schema to the RDD. 
schemaPeople = spark.createDataFrame(people, schema) 

fonte: SPARK PROGRAMMING GUIDE

+0

questa risposta è vecchio, le nuove versioni di scintilla sono modi più semplici per raggiungere questo obiettivo. Fare riferimento alle risposte https://stackoverflow.com/a/41638342/187355 e https://stackoverflow.com/a/46539901/187355 –

20
from pyspark.sql.types import StringType 
from pyspark import SQLContext 
sqlContext = SQLContext(sc) 

Employee_rdd = sc.textFile("\..\Employee.csv") 
       .map(lambda line: line.split(",")) 

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name']) 

Employee_df.show() 
+0

Questa risposta ha diverse upvotes ma non è esattamente chiaro per me quello che sta succedendo - ti fanno SqlContext (sc) e chiama sqlContext, quindi non devi farci niente .. è solo un codice estraneo? Quando provo lo stesso codice con un semplice file CSV in un notebook zeppelin ottengo l'errore: '' 'Traceback (chiamata più recente scorso): file "/tmp/zeppelin_pyspark-7664300769638364279.py", la linea 252 , in eval (compiledCode) File "", linea 1, in AttributeError: 'int' oggetto non ha attributo 'map'''' – tamale

+0

Si prega di condividere il codice, per ottenere aiuto. Ho usato il codice in diversi casi, non ho avuto problemi –

0

mi sono imbattuto in problema simile. La soluzione è aggiungere una variabile d'ambiente chiamata "PYSPARK_SUBMIT_ARGS" e impostarne il valore su "--packages com.databricks: spark-csv_2.10: 1.4.0 pyspark-shell". Funziona con la shell interattiva di Spark's Python.

Assicurati di abbinare la versione di spark-csv con la versione di Scala installata. Con Scala 2.11, è spark-csv_2.11 e con Scala 2.10 o 2.10.5 è spark-csv_2.10.

Spero che funzioni.

8

Se non vi dispiace la dipendenza extra del pacchetto, è possibile utilizzare Pandas per analizzare il file CSV. Gestisce le virgole interne bene.

Dipendenze:

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 

leggere l'intero file in una sola volta in una dataframe Spark:

sc = SparkContext('local','example') # if using locally 
sql_sc = SQLContext(sc) 

pandas_df = pd.read_csv('file.csv') # assuming the file contains a header 
# If no header: 
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) 
s_df = sql_sc.createDataFrame(pandas_df) 

Oppure, ancora di più data-consapevole, è possibile suddividere i dati in uno Spark RDD poi DF :

chunk_100k = pd.read_csv('file.csv', chunksize=100000) 

for chunky in chunk_100k: 
    Spark_temp_rdd = sc.parallelize(chunky.values.tolist()) 
    try: 
     Spark_full_rdd += Spark_temp_rdd 
    except NameError: 
     Spark_full_rdd = Spark_temp_rdd 
    del Spark_temp_rdd 

Spark_DF = Spark_full_rdd.toDF(['column 1','column 2']) 
+0

createDataFrame spesso dà e errore come questo: IllegalArgumentException: "Errore durante un'istanza 'org.apache.spark.sql.hive.HiveSessionState':" ... ogni esperienza colpire Questo? – mathtick

6

seguito Spark 2.0, si consiglia di utilizzare una sessione Spark:

012.
from pyspark.sql import SparkSession 
from pyspark.sql import Row 

# Create a SparkSession 
spark = SparkSession \ 
    .builder \ 
    .appName("basic example") \ 
    .config("spark.some.config.option", "some-value") \ 
    .getOrCreate() 

def mapper(line): 
    fields = line.split(',') 
    return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3])) 

lines = spark.sparkContext.textFile("file.csv") 
df = lines.map(mapper) 

# Infer the schema, and register the DataFrame as a table. 
schemaDf = spark.createDataFrame(df).cache() 
schemaDf.createOrReplaceTempView("tablename") 
6

Con versioni più recenti di Spark (come di, credo, 1.4) questo è diventato molto più semplice.L'espressione sqlContext.read ti dà un esempio DataFrameReader, con un metodo .csv():

df = sqlContext.read.csv("/path/to/your.csv") 

Nota che si può anche indicare che il file CSV ha un'intestazione aggiungendo la parola chiave argomento header=True alla chiamata .csv(). Una manciata di altre opzioni sono disponibili e descritte nel link sopra.

0

Sulla base della risposta da Aravind, ma molto più breve, ad esempio, :

lines = sc.textFile("/path/to/file").map(lambda x: x.split(",")) 
df = lines.toDF(["year", "month", "day", "count"]) 
2

per Pyspark, supponendo che la prima riga del file CSV contiene un'intestazione

spark = SparkSession.builder.appName('chosenName').getOrCreate() 
df=spark.read.csv('fileNameWithPath', mode="DROPMALFORMED",inferSchema=True, header = True) 
Problemi correlati