2014-09-03 30 views
8

Sono abbastanza nuovo da accendere e sto cercando di ricevere un DStream strutturato come un json da un argomento di kafka e voglio analizzare il contenuto di ogni json. JSON che ricevo è qualcosa di simile:Parsing json in spark-streaming

{"type":"position","ident":"IBE32JZ","air_ground":"A","alt":"34000","clock":"1409733420","id":"IBE32JZ-1409715361-ed-0002:0","gs":"446","heading":"71","lat":"44.50987","lon":"2.98972","reg":"ECJRE","squawk":"1004","updateType":"A","altChange":" "} 

Sto cercando di estrarre il campo Ident unica, almeno per ora e sto utilizzando la libreria lift-JSON per analizzare de dati. Il mio programma si presenta così:

ma mi genera l'eccezione di seguito:

java.lang.NoClassDefFoundError: scala/reflect/ClassManifest 
    at net.liftweb.json.JsonAST$JValue.extract(JsonAST.scala:300) 
    at aero.catec.stratio.ScalaExample$.parser(ScalaExample.scala:33) 
    at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48) 
    at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003) 
    at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) 
    at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:575) 
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:560) 
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 

Il fatto è che se una corsa lo stesso senza l'utilizzo di scintilla (la lettura da un file) che funziona perfettamente. Il problema inizia quando provo a inserirlo in un programma spark. Inoltre, se cambio la funzione parser in qualcosa del genere:

def parser(json: String): JValue = { 
    val parsedJson = parse(json) 
    return (parsedJson \\ "ident") 
} 

Funziona anche. Ma quando provo ad estrarre la stringa vera e propria, ottengo lo stesso errore.

Grazie per il vostro aiuto. Spero di averlo spiegato bene.

+1

Probabilmente è una mancata corrispondenza nella versione scala che stai utilizzando. –

+0

Posso supporre che "paso1.extract [PlaneInfo]" debba essere parsedJson.extract [PlaneInfo]? – Gillespie

risposta

2

questo accade perché manca una scala riflettono la dipendenza necessaria per serializzare/deserializzare il record. Prova ad aggiungere il vaso scala riflette che corrisponde alla versione scintilla.

Suggerimento: "org.scala-lang" % "scala-riflettere" % Version.scala

0

Oh, un buon vecchio problema :-)

Fondamentalmente questo indica un problema di versione: uno dei le tue dipendenze non sono compatibili con il compilatore Scala che stai attualmente utilizzando. Sei su 2.10?

Prova a cercare su Google la frase "NoClassDefFoundError: scala/reflect/ClassManifest", sono sicuro che troverai una descrizione più che sufficiente sul problema.