2015-10-22 18 views
11

Ho un Spark 1.5.0 DataFrame con un mix di null e stringhe vuote nella stessa colonna. Voglio convertire tutte le stringhe vuote in tutte le colonne a null (None, in Python). DataFrame può avere centinaia di colonne, quindi sto cercando di evitare le manipolazioni hard-coded di ogni colonna.Sostituisci stringhe vuote con valori None/null in DataFrame

Vedere il mio tentativo qui sotto, che si traduce in un errore.

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

## Create a test DataFrame 
testDF = sqlContext.createDataFrame([Row(col1='foo', col2=1), Row(col1='', col2=2), Row(col1=None, col2='')]) 
testDF.show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## | | 2| 
## |null|null| 
## +----+----+ 

## Try to replace an empty string with None/null 
testDF.replace('', None).show() 
## ValueError: value should be a float, int, long, string, list, or tuple 

## A string value of null (obviously) doesn't work... 
testDF.replace('', 'null').na.drop(subset='col1').show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## |null| 2| 
## +----+----+ 
+0

@palsch, No, non restituisce un elenco. Restituisce un DataFrame. Ho aggiornato la domanda con un link alla documentazione di Spark. – dnlbrky

+2

@palsch non è una domanda generale di Python! Spark DataFrames è una struttura di dati distribuiti utilizzata generalmente per consentire analisi di dati pesanti su big data. Quindi la tua soluzione non è adatta. – eliasah

+1

@eliasah A dire la verità Pythonico 'lambda x: Nessuno se non x altro x' avvolto con' udf' funzionerebbe bene :) – zero323

risposta

15

E 'semplice come questo:

from pyspark.sql.functions import col, when 

def blank_as_null(x): 
    return when(col(x) != "", col(x)).otherwise(None) 

dfWithEmptyReplaced = testDF.withColumn("col1", blank_as_null("col1")) 

dfWithEmptyReplaced.show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## |null| 2| 
## |null|null| 
## +----+----+ 

dfWithEmptyReplaced.na.drop().show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## +----+----+ 

Se si vuole riempire più colonne è possibile ad esempio ridurre:

to_convert = set([...]) # Some set of columns 

reduce(lambda df, x: df.withColumn(x, blank_as_null(x)), to_convert, testDF) 

o utilizzare la comprensione:

exprs = [ 
    blank_as_null(x).alias(x) if x in to_convert else x for x in testDF.columns] 

testDF.select(*exprs) 

Se si vuole specificamente ope tasso sui campi stringa si prega di controllare the answer per robin-loxley.

+0

Grazie a @ zero323. La tua risposta può essere estesa per gestire molte colonne automaticamente ed efficientemente? Forse elencare tutti i nomi delle colonne, generare un codice simile come risposta per ogni colonna e quindi valutare il codice? – dnlbrky

+0

Non vedo alcun motivo per cui non potresti. I DataFrames sono ponderati e il resto è solo un Python standard. Troverai alcune opzioni nella modifica. – zero323

+0

Accetterò questa risposta, ma potresti per favore aggiungere prima il bit da @RobinLoxley? Oppure, se non ti dispiace, posso modificare la tua risposta. – dnlbrky

8

La mia soluzione è molto meglio di tutte le soluzioni I'v visto finora, che può trattare con il maggior numero di campi come si desidera, vedere la piccola funzione di come il seguente:

// Replace empty Strings with null values 
    private def setEmptyToNull(df: DataFrame): DataFrame = { 
    val exprs = df.schema.map { f => 
     f.dataType match { 
     case StringType => when(length(col(f.name)) === 0, lit(null: String).cast(StringType)).otherwise(col(f.name)).as(f.name) 
     case _ => col(f.name) 
     } 
    } 

    df.select(exprs: _*) 
    } 

Si può facilmente riscrivere la funzione sopra in Python.

ho imparato questo trucco da @liancheng

6

Basta aggiungere in cima zero323 di e risposte di soulmachine. Per convertire per tutti i campi StringType.

from pyspark.sql.types import StringType 
string_fields = [] 
for i, f in enumerate(test_df.schema.fields): 
    if isinstance(f.dataType, StringType): 
     string_fields.append(f.name) 
Problemi correlati