2016-05-24 12 views
8

Ho un dataframe spark con la seguente struttura. BodyText_token ha i token (elaborati/set di parole). E ho una lista annidata di parole chiave definitiPassando una colonna di frame di dati e un elenco esterno a udf sotto conColumn

root 
|-- id: string (nullable = true) 
|-- body: string (nullable = true) 
|-- bodyText_token: array (nullable = true) 

keyword_list=['union','workers','strike','pay','rally','free','immigration',], 
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']] 

avevo bisogno di verificare quanti gettoni cadono sotto ogni elenco di parole chiave e aggiungere il risultato come una nuova colonna della dataframe esistente. Ad esempio: se tokens =["become", "farmer","rally","workers","student"] il risultato sarà -> [1,2,0]

La seguente funzione ha funzionato come previsto.

def label_maker_topic(tokens,topic_words): 
    twt_list = [] 
    for i in range(0, len(topic_words)): 
     count = 0 
     #print(topic_words[i]) 
     for tkn in tokens: 
      if tkn in topic_words[i]: 
       count += 1 
     twt_list.append(count) 

    return twt_list 

Ho usato udf sotto con Colonna per accedere alla funzione e ottengo un errore. Penso che si tratti di passare una lista esterna a un udf. C'è un modo per passare una lista esterna e la colonna datafram ad un udf e aggiungere una nuova colonna al mio dataframe?

topicWord = udf(label_maker_topic,StringType()) 
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list)) 

risposta

20

la soluzione più pulita è quella di passare gli argomenti aggiuntivi utilizzando chiusura:

def make_topic_word(topic_words): 
    return udf(lambda c: label_maker_topic(c, topic_words)) 

df = sc.parallelize([(["union"],)]).toDF(["tokens"]) 

(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens"))) 
    .show()) 

Questo non richiede alcuna modifica nella keyword_list o la funzione di avvolgere con UDF. Puoi anche usare questo metodo per passare un oggetto arbitrario. Questo può essere usato per passare ad esempio un elenco di sets per ricerche efficienti.

Se si desidera utilizzare l'UDF attuale e passare direttamente topic_words dovrete convertirlo in una colonna letterale prima:

from pyspark.sql.functions import array, lit 

ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list]) 
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show() 

A seconda dei dati e requisiti non ci può alternativi, soluzioni più efficienti, che non richiedono UDF (esplosione + aggregazione + compressione) o ricerche (hashing + operazioni vettoriali).

7

I seguenti pregevoli opere in cui qualsiasi parametro esterno può essere passato al UDF (un codice ottimizzato per aiutare chiunque)

topicWord=udf(lambda tkn: label_maker_topic(tkn,topic_words),StringType()) 
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token)) 
+0

Questo funziona, ma vorrei stare attenti con questo, perché l'UDF avrà le 'topic_words 'valore nel momento in cui è stato definito udf. Quindi cambiare il 'topic_words' e riutilizzare l'udf in seguito non funzionerà - userà comunque il valore di' topic_words' nel momento in cui è stato definito udf. – CHP

Problemi correlati