2015-05-04 19 views
7

Ho un tavolo cassandra con un campo di tipo testo denominato snapshot contenente oggetti JSON:Spark JSON campo di testo per RDD

[identifier, timestamp, snapshot] 

ho capito che per essere in grado di fare le trasformazioni su quel campo con Spark, Devo convertire quel campo di quel RDD in un altro RDD per realizzare trasformazioni sullo schema JSON.

È corretto? Come devo procedere per quello?

Edit: Per ora sono riuscito a creare un RDD da un unico campo di testo:

val conf = new SparkConf().setAppName("signal-aggregation") 
val sc = new SparkContext(conf) 
val sqlContext = new SQLContext(sc) 
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots") 
val first = snapshots.first() 
val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3))) 
firstJson.printSchema() 

Il che mi mostra lo schema JSON. Buona!

Come faccio a comunicare a Spark che questo schema deve essere applicato su tutte le righe della tabella Istantanee, per ottenere un RDD su quel campo di istantanea da ciascuna riga?

+0

Se ho capito bene, hai diversi oggetti JSON all'interno di ogni campo nella tabella cassandra e devi calcolare ogni oggetto in modo indipendente. –

+0

Sì, hai ragione, ma ho letto da qualche parte che Spark può comprendere quel campo di testo come json e che potrei fare delle trasformazioni su alcuni valori di quei jsons, è corretto? – galex

risposta

12

Ci siamo quasi, si vuole solo passare il vostro un RDD [String] con il JSON nel jsonRDD metodo

val conf = new SparkConf().setAppName("signal-aggregation") 
val sc = new SparkContext(conf) 
val sqlContext = new SQLContext(sc) 
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots") 
val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String]) 
val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly 
jsonSchemaRDD.registerTempTable("testjson") 
sqlContext.sql("SELECT * FROM testjson where .... ").collect 

Un rapido esempio

val stringRDD = sc.parallelize(Seq(""" 
    { "isActive": false, 
    "balance": "$1,431.73", 
    "picture": "http://placehold.it/32x32", 
    "age": 35, 
    "eyeColor": "blue" 
    }""", 
    """{ 
    "isActive": true, 
    "balance": "$2,515.60", 
    "picture": "http://placehold.it/32x32", 
    "age": 34, 
    "eyeColor": "blue" 
    }""", 
    """{ 
    "isActive": false, 
    "balance": "$3,765.29", 
    "picture": "http://placehold.it/32x32", 
    "age": 26, 
    "eyeColor": "blue" 
    }""") 
) 
sqlContext.jsonRDD(stringRDD).registerTempTable("testjson") 
csc.sql("SELECT age from testjson").collect 
//res24: Array[org.apache.spark.sql.Row] = Array([35], [34], [26]) 
+0

Perfetto, grazie! – galex

Problemi correlati