2015-11-24 14 views
23

Ho un set di dati molto grande che viene caricato in Hive. Consiste di circa 1,9 milioni di righe e 1450 colonne. Devo determinare la "copertura" di ciascuna colonna, ovvero la frazione di righe che hanno valori non NaN per ogni colonna.Contare il numero di voci non NaN in ogni colonna di Spark dataframe con Pyspark

Ecco il mio codice:

from pyspark import SparkContext 
from pyspark.sql import HiveContext 
import string as string 

sc = SparkContext(appName="compute_coverages") ## Create the context 
sqlContext = HiveContext(sc) 

df = sqlContext.sql("select * from data_table") 
nrows_tot = df.count() 

covgs=sc.parallelize(df.columns) 
     .map(lambda x: str(x)) 
     .map(lambda x: (x, float(df.select(x).dropna().count())/float(nrows_tot) * 100.)) 

Cercando questo fuori nella shell pyspark, se io poi faccio covgs.take (10), restituisce una piuttosto grande stack errori. Dice che c'è un problema in save nel file /usr/lib64/python2.6/pickle.py. Questa è la parte finale dell'errore:

py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 
     at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) 
     at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) 
     at py4j.Gateway.invoke(Gateway.java:252) 
     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
     at py4j.commands.CallCommand.execute(CallCommand.java:79) 
     at py4j.GatewayConnection.run(GatewayConnection.java:207) 
     at java.lang.Thread.run(Thread.java:745) 

Se c'è un modo migliore per raggiungere questo obiettivo che il modo che sto cercando, io sono aperto a suggerimenti. Non posso usare panda, tuttavia, poiché non è attualmente disponibile nel cluster su cui lavoro e non ho i diritti per installarlo.

risposta

58

Cominciamo con un dato fittizio:

from pyspark.sql import Row 

row = Row("v", "x", "y", "z") 
df = sc.parallelize([ 
    row(0.0, 1, 2, 3.0), row(None, 3, 4, 5.0), 
    row(None, None, 6, 7.0), row(float("Nan"), 8, 9, float("NaN")) 
]).toDF() 

## +----+----+---+---+ 
## | v| x| y| z| 
## +----+----+---+---+ 
## | 0.0| 1| 2|3.0| 
## |null| 3| 4|5.0| 
## |null|null| 6|7.0| 
## | NaN| 8| 9|NaN| 
## +----+----+---+---+ 

Tutto ciò che serve è una semplice aggregazione:

from pyspark.sql.functions import col, count, isnan, lit, sum 

def count_not_null(c, nan_as_null=False): 
    """Use conversion between boolean and integer 
    - False -> 0 
    - True -> 1 
    """ 
    pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True)) 
    return sum(pred.cast("integer")).alias(c) 

df.agg(*[count_not_null(c) for c in df.columns]).show() 

## +---+---+---+---+ 
## | v| x| y| z| 
## +---+---+---+---+ 
## | 2| 3| 4| 4| 
## +---+---+---+---+ 

o se si vuole trattare NaN un NULL:

df.agg(*[count_not_null(c, True) for c in df.columns]).show() 

## +---+---+---+---+ 
## | v| x| y| z| 
## +---+---+---+---+ 
## | 1| 3| 4| 3| 
## +---+---+---+--- 

È inoltre possibile sfruttare la semantica SQL NULL per ottenere il sa Mi risultato senza creare una funzione personalizzata:

df.agg(*[ 
    count(c).alias(c) # vertical (column-wise) operations in SQL ignore NULLs 
    for c in df.columns 
]).show() 

## +---+---+---+ 
## | x| y| z| 
## +---+---+---+ 
## | 1| 2| 3| 
## +---+---+---+ 

ma questo non funzionerà con NaNs.

Se preferite frazioni:

exprs = [(count_not_null(c)/count("*")).alias(c) for c in df.columns] 
df.agg(*exprs).show() 

## +------------------+------------------+---+ 
## |     x|     y| z| 
## +------------------+------------------+---+ 
## |0.3333333333333333|0.6666666666666666|1.0| 
## +------------------+------------------+---+ 

o

# COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issue 
df.select(*[(count(c)/count("*")).alias(c) for c in df.columns]).show() 

## +------------------+------------------+---+ 
## |     x|     y| z| 
## +------------------+------------------+---+ 
## |0.3333333333333333|0.6666666666666666|1.0| 
## +------------------+------------------+---+ 
+0

somma di ritorno (col (c) .isNotNull() cast ("integer").) Alias ​​(c) qui lo fa. sa automaticamente quale dataframe accedere? È perché otteniamo i nomi delle colonne da quel particolare dataframe? – Roshini

+0

@Roshini Le colonne sono significative solo in un ambito di specifica espressione SQL che definisce i collegamenti. Nel contesto di altre parole di un determinato 'select' definisce come vengono risolte le colonne. – zero323

+0

@ zero323 Come gestiamo quando il suo nan invece di null? Ho notato che l'uso di df.na.replace (np.nan, 'NA') cambia nan a None Quindi questo sarà un modo corretto? Passaggio 1: utilizzare df.na.replace (np.nan, 'NA') per convertire nan in Nessuno Passaggio 2: applicare il metodo sul nuovo df per contare null. –

Problemi correlati