2016-04-13 25 views
13

Sono nuovo di scintilla & pyspark.pyspark EOFError dopo aver chiamato la mappa

Sto leggendo un file csv piccolo (~ 40k) in un dataframe.

from pyspark.sql import functions as F 
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv') 
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0)) 
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF() 

ho un po 'strano errore che non si verifica ogni volta, ma accade abbastanza regolarmente

>>> df2.show(1) 
+--------------------+---------+ 
|   features| label| 
+--------------------+---------+ 
|[0.0,0.0,0.0,0.0,...|4700734.0| 
+--------------------+---------+ 
only showing top 1 row 

>>> df2.count() 
41999                   
>>> df2.show(1) 
+--------------------+---------+ 
|   features| label| 
+--------------------+---------+ 
|[0.0,0.0,0.0,0.0,...|4700734.0| 
+--------------------+---------+ 
only showing top 1 row 

>>> df2.count() 
41999                   
>>> df2.show(1) 
Traceback (most recent call last): 
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager 
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker  
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main 
    if read_int(infile) == SpecialLengths.END_OF_STREAM: 
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int 
    raise EOFError 
EOFError 
+--------------------+---------+ 
|   features| label| 
+--------------------+---------+ 
|[0.0,0.0,0.0,0.0,...|4700734.0| 
+--------------------+---------+ 
only showing top 1 row 

Una volta che EOFError è stata sollevata, non voglio vedere di nuovo fino a quando faccio qualcosa che richiede l'interazione con lo spark server

Quando chiamo df2.count() mostra che il prompt [Stage xxx] che è ciò che intendo è andare al server spark. Tutto ciò che scatena sembra finire per restituire di nuovo EOFError quando faccio qualcosa con df2.

Non sembra che accada con df (vs df2), quindi sembra che debba succedere qualcosa con la linea df.map().

+1

Ho sentito dalla lista degli utenti spark che questo messaggio è solo un po 'troppo prolisso e può essere ignorato. – Pete

+0

Pete, puoi indicarci gli archivi? – rjurney

+0

Ho cercato l'elenco spark-user e non riesco a trovare nulla a riguardo di un EOFError :( – rjurney

risposta

0

Per favore, prova a fare la mappa dopo aver convertito il dataframe in rdd. Si applica la funzione mappa su un dataframe e poi di nuovo creando un dataframe da that.Syntax sarebbe come

df.rdd.map().toDF() 

Si prega di farmi sapere se funziona. Grazie.

0

Credo che tu stia utilizzando Spark 2.xe versioni successive. Qui di seguito il codice dovrebbe creare il dataframe da CSV:

df = spark.read.format("csv").option("header", "true").load("csvfile.csv") 

allora si può avere sotto il codice:

df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0)) 

e quindi è possibile creare DF2 senza Row e todf()

fatemi sapere se questo funziona o se stai usando Spark 1.6 ... grazie.

Problemi correlati