2016-01-15 18 views
7

Sto cercando di eseguire la pulizia del testo NLP di alcune colonne Unicode in un DataSource PySpark. Ho provato in Spark 1.3, 1.5 e 1.6 e non riesco a far funzionare le cose per la vita di me. Ho anche provato a usare Python 2.7 e Python 3.4.Pyspark DataFrame UDF sulla colonna di testo

Ho creato un udf estremamente semplice come mostrato di seguito che dovrebbe solo restituire una stringa per ogni record in una nuova colonna. Altre funzioni manipoleranno il testo e quindi restituiranno il testo modificato in una nuova colonna.

import pyspark 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 
from pyspark.sql import SQLContext 
from pyspark.sql.functions import udf 

def dummy_function(data_str): 
    cleaned_str = 'dummyData' 
    return cleaned_str 

dummy_function_udf = udf(dummy_function, StringType()) 

Alcuni dati di esempio possono essere decompressi da here.

Ecco il codice che uso per importare i dati e quindi applicare l'UDF su.

# Load a text file and convert each line to a Row. 
lines = sc.textFile("classified_tweets.txt") 
parts = lines.map(lambda l: l.split("\t")) 
training = parts.map(lambda p: (p[0], p[1])) 

# Create dataframe 
training_df = sqlContext.createDataFrame(training, ["tweet", "classification"]) 

training_df.show(5) 
+--------------------+--------------+ 
|    tweet|classification| 
+--------------------+--------------+ 
|rt @jiffyclub: wi...|  python| 
|rt @arnicas: ipyt...|  python| 
|rt @treycausey: i...|  python| 
|what's my best op...|  python| 
|rt @raymondh: #py...|  python| 
+--------------------+--------------+ 

# Apply UDF function 
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
df.show(5) 

Quando si esegue df.show (5) viene visualizzato il seguente errore. Capisco che il problema molto probabilmente non deriva dallo show() ma la traccia non mi dà molto aiuto.

---------------------------------------------------------------------------Py4JJavaError        Traceback (most recent call last)<ipython-input-19-0b21c233c724> in <module>() 
     1 df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
----> 2 df.show(5) 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n, truncate) 
    255   +---+-----+ 
    256   """ 
--> 257   print(self._jdf.showString(n, truncate)) 
    258 
    259  def __repr__(self): 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(
Py4JJavaError: An error occurred while calling o474.showString. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda> 
IndexError: list index out of range 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125) 
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913) 
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929) 
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968) 
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) 
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) 
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) 
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) 
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) 
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) 
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) 
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) 
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) 
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) 
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda> 
IndexError: list index out of range 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125) 
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913) 
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929) 
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968) 
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

funzione reale che sto cercando:

def tag_and_remove(data_str): 
    cleaned_str = ' ' 
    # noun tags 
    nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS'] 
    # adjectives 
    jj_tags = ['JJ', 'JJR', 'JJS'] 
    # verbs 
    vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ'] 
    nltk_tags = nn_tags + jj_tags + vb_tags 

    # break string into 'words' 
    text = data_str.split() 

    # tag the text and keep only those with the right tags 
    tagged_text = pos_tag(text) 
    for tagged_word in tagged_text: 
     if tagged_word[1] in nltk_tags: 
      cleaned_str += tagged_word[0] + ' ' 

    return cleaned_str 


tag_and_remove_udf = udf(tag_and_remove, StringType()) 
+2

Sei sicuro 'l.split ('\ t')' restituisce più di una voce? L'errore di indice è probabilmente da 'training = parts.map (...)'. Come sono i tuoi dati? Sei sicuro che ci siano tabulazioni ovunque? – AChampion

+0

Sì, posso confermare che i dati hanno due colonne. Ho lavato i dati di tutti gli spazi bianchi oltre agli spazi prima di inserire il file flat. Metterò un piccolo campione in alto. – dreyco676

+2

Non si sta dividendo in spazi bianchi - solo schede - 'l.split()' si suddividerebbe in qualsiasi spazio bianco. – AChampion

risposta

3

Il set di dati non è pulito. 985 linee split('\t') a un solo valore:

>>> from operator import add 
>>> lines = sc.textFile("classified_tweets.txt") 
>>> parts = lines.map(lambda l: l.split("\t")) 
>>> parts.map(lambda l: (len(l), 1)).reduceByKey(add).collect() 
[(2, 149195), (1, 985)] 
>>> parts.filter(lambda l: len(l) == 1).take(5) 
[['"show me the money!” at what point do you start trying to monetize your #startup? tweet us with #startuplife.'], 
['a good pitch can mean money in the bank for your #startup. see how body language plays a key role: (via: ajalumnify)'], 
['100+ apps in five years? @2359media did it using microsoft #azure: #azureapps'], 
['does buying better coffee make you a better leader? little things can make a big difference: (via: @jmbrandonbb)'], 
['[email protected] graduates pitched\xa0#homeautomation #startups to #vcs! check out how they celebrated: ']] 

Così cambiano il codice per:

>>> training = parts.filter(lambda l: len(l) == 2).map(lambda p: (p[0], p[1].strip())) 
>>> training_df = sqlContext.createDataFrame(training, ["tweet", "classification"]) 
>>> df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
>>> df.show(5) 
+--------------------+--------------+---------+ 
|    tweet|classification| dummy| 
+--------------------+--------------+---------+ 
|rt @jiffyclub: wi...|  python|dummyData| 
|rt @arnicas: ipyt...|  python|dummyData| 
|rt @treycausey: i...|  python|dummyData| 
|what's my best op...|  python|dummyData| 
|rt @raymondh: #py...|  python|dummyData| 
+--------------------+--------------+---------+ 
only showing top 5 rows 
+0

Grazie. Ho appreso che show() non necessariamente causa l'intera analisi che si verifica se non è necessario per la N specificata. – dreyco676

4

credo che tu stia misdefining il problema, e forse semplificare il vostro lambda ai fini di questa domanda, ma che nasconde il vero problema.

tuo stack trace legge

File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda> 
IndexError: list index out of range 

Quando ho eseguito questo codice funziona benissimo:

import pyspark 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 
from pyspark.sql import SQLContext 
from pyspark.sql.functions import udf 

training_df = sqlContext.sql("select 'foo' as tweet, 'bar' as classification") 

def dummy_function(data_str): 
    cleaned_str = 'dummyData' 
    return cleaned_str 

dummy_function_udf = udf(dummy_function, StringType()) 
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
df.show() 

+-----+--------------+---------+ 
|tweet|classification| dummy| 
+-----+--------------+---------+ 
| foo|   bar|dummyData| 
+-----+--------------+---------+ 

Sei sicuro che non ci sia qualche altro bug nel dummy_function_udf? Qual è il 'vero' udf che stai usando - a parte questa versione di esempio?

+0

Grazie mille per aver risposto. Sembra che i dati di testo siano sempre malvagi e interromperanno i tuoi parser. Mi aspettavo che qualsiasi errore con l'analisi si fosse presentato con training_df.show (5) ma sembra che analizzi solo i primi N record se non si eseguono altre trasformazioni. – dreyco676

+0

Grazie per la risposta. Ho avuto un problema simile. Posso seguire e chiedere cosa significa "udf" qui? Ho copiato il codice nella mia shell solo per trovare il seguente errore 'Traceback (ultima chiamata ultima): File" ", riga 1, in NameError: nome 'udf' non definito ' – yuqli

+0

Funzione definita dall'utente. –

Problemi correlati