5

Sto preparando un esempio di giocattolo spark.ml. Spark version 1.6.0, in esecuzione su Oracle JDK version 1.8.0_65, pyspark, notebook ipython.spark.ml StringIndexer genera l'etichetta "Non visibile" su misura()

In primo luogo, non ha praticamente nulla a che fare con Spark, ML, StringIndexer: handling unseen labels. L'eccezione viene generata mentre si adatta una pipeline a un set di dati, non a trasformarla. E sopprimere l'eccezione potrebbe non essere una soluzione qui, dal momento che, temo, il set di dati in questo caso si incasina piuttosto male.

Il mio set di dati è di circa 800 Mb non compresso, quindi potrebbe essere difficile da riprodurre (sottoinsiemi più piccoli sembrano evitare questo problema).

Il dataset assomiglia a questo:

+--------------------+-----------+-----+-------+-----+--------------------+ 
|     url|   ip| rs| lang|label|     txt| 
+--------------------+-----------+-----+-------+-----+--------------------+ 
|http://3d-detmold...|217.160.215|378.0|  de| 0.0|homwillkommskip c...| 
| http://3davto.ru/| 188.225.16|891.0|  id| 1.0|оформить заказ пе...| 
| http://404.szm.com/| 85.248.42| 58.0|  cs| 0.0|kliknite tu alebo...| 
| http://404.xls.hu/| 212.52.166|168.0|  hu| 0.0|honlapkészítés404...| 
|http://a--m--a--t...| 66.6.43|462.0|  en| 0.0|back top archiv r...| 
|http://a-wrf.ru/c...| 78.108.80|126.0|unknown| 1.0|     | 
|http://a-wrf.ru/s...| 78.108.80|214.0|  ru| 1.0|установк фаркопна...| 
+--------------------+-----------+-----+-------+-----+--------------------+ 

Il valore viene previsto è label. L'intero gasdotto ad esso applicata:

from pyspark.ml import Pipeline 
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF 
from pyspark.ml.classification import LogisticRegression 

train, test = munge(src_dataframe).randomSplit([70., 30.], seed=12345) 

pipe_stages = [ 
    StringIndexer(inputCol='lang', outputCol='lang_idx'), 
    OneHotEncoder(inputCol='lang_idx', outputCol='lang_onehot'), 
    Tokenizer(inputCol='ip', outputCol='ip_tokens'), 
    HashingTF(numFeatures=2**10, inputCol='ip_tokens', outputCol='ip_vector'), 
    Tokenizer(inputCol='txt', outputCol='txt_tokens'), 
    HashingTF(numFeatures=2**18, inputCol='txt_tokens', outputCol='txt_vector'), 
    VectorAssembler(inputCols=['lang_onehot', 'ip_vector', 'txt_vector'], outputCol='features'), 
    LogisticRegression(labelCol='label', featuresCol='features') 
] 

pipe = Pipeline(stages=pipe_stages) 
pipemodel = pipe.fit(train) 

Ed ecco lo stacktrace:

Py4JJavaError: An error occurred while calling o10793.fit. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 627.0 failed 1 times, most recent failure: Lost task 18.0 in stage 627.0 (TID 23259, localhost): org.apache.spark.SparkException: Unseen label: pl-PL. 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) 
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) 
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113) 
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:271) 
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159) 
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) 
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Unseen label: pl-PL. 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) 
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

La linea più interessante è:

org.apache.spark.SparkException: Unseen label: pl-PL. 

Non ne ho idea, come pl-PL che è un valore dalla colonna lang potrebbe essere stato confuso nella colonna label, che è un float, non string modificato: alcuni coclusions affrettate, corretti grazie a @ zero323

Ho guardato più al suo interno e ha scoperto, che pl-PL è un valore da parte di test del set di dati, non è la formazione. Così ora non so nemmeno dove cercare il colpevole: potrebbe essere facilmente il codice randomSplit, non StringIndexer, e chissà cos'altro.

Come posso indagare su questo?

risposta

7

Unseen labelis a generic message which doesn't correspond to a specific column. Molto probabilmente problema è con una fase successiva:

StringIndexer(inputCol='lang', outputCol='lang_idx') 

con pl-PL presente in train("lang") e non presenti in test("lang").

È possibile correggerlo usando setHandleInvalid con skip:

from pyspark.ml.feature import StringIndexer 

train = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["k", "v"]) 
test = sc.parallelize([(3, "foo"), (4, "foobar")]).toDF(["k", "v"]) 

indexer = StringIndexer(inputCol="v", outputCol="vi") 
indexer.fit(train).transform(test).show() 

## Py4JJavaError: An error occurred while calling o112.showString. 
## : org.apache.spark.SparkException: Job aborted due to stage failure: 
## ... 
## org.apache.spark.SparkException: Unseen label: foobar. 

indexer.setHandleInvalid("skip").fit(train).transform(test).show() 

## +---+---+---+ 
## | k| v| vi| 
## +---+---+---+ 
## | 3|foo|1.0| 
## +---+---+---+ 
+0

Dove posizioneresti setHandleInvalid ("salta") in una pipeline? – mikeL

+0

@mikeL Ovunque si definisca 'StringIndexer'. È un 'Param' di indicizzatore, non' Pipeline'. – zero323

2

Va bene penso che ho ottenuto questo. Almeno ho funzionato.

La memorizzazione nella cache del dataframe (comprese le partizioni treno/test) risolve il problema. Questo è quello che ho trovato in questo numero di JIRA: https://issues.apache.org/jira/browse/SPARK-12590.

Quindi non è un bug, solo il fatto che randomSample potrebbe produrre un risultato diverso sullo stesso set di dati, ma partizionato in modo diverso. E apparentemente, alcune delle mie funzioni di munging (o Pipeline) implicano la ripartizione, quindi i risultati del ricalcolo del convoglio dalla sua definizione potrebbero divergere.

Ciò che ancora mi interessa - è la riproducibilità: è sempre la riga "pl-PL" che viene mescolata nella parte errata del set di dati, cioè non è una ripartizione casuale. È deterministico, solo incoerente. Mi chiedo come accade esattamente.

Problemi correlati