2016-01-17 11 views
8

Uso Spark 1.5 e Scala 2.10.6filtro dataframe scintilla con il campo di riga che è un array di stringhe

Sto cercando di filtrare un dataframe tramite un campo "tag", cioè un array di stringhe. Stai cercando tutte le righe che hanno il tag "privato".

val report = df.select("*") 
    .where(df("tags").contains("private")) 

ottenere:

Exception in thread "main" org.apache.spark.sql.AnalysisException: non permette di risolvere 'Contiene (tag, privati)' a causa del tipo di dati non corrispondente: argomento 1 richiede un tipo di stringa, tuttavia, "tag" è di tipo array .;

Il metodo di filtro è più adatto?

aggiornamento:

i dati proviene da adattatore cassandra, ma un esempio minimo che mostra quello che sto cercando di fare e ottiene anche l'errore di cui sopra è:

def testData (sc: SparkContext): DataFrame = { 
    val stringRDD = sc.parallelize(Seq(""" 
     { "name": "ed", 
     "tags": ["red", "private"] 
     }""", 
     """{ "name": "fred", 
     "tags": ["public", "blue"] 
     }""") 
    ) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 
    sqlContext.read.json(stringRDD) 
    } 
    def run(sc: SparkContext) { 
    val df1 = testData(sc) 
    df1.show() 
    val report = df1.select("*") 
     .where(df1("tags").contains("private")) 
    report.show() 
    } 

AGGIORNATO: l'array tag può essere di qualsiasi lunghezza e il tag 'privato' può essere in qualsiasi posizione

AGGIORNAMENTO: una soluzione che funziona: UDF

val filterPriv = udf {(tags: mutable.WrappedArray[String]) => tags.contains("private")} 
val report = df1.filter(filterPriv(df1("tags"))) 
+0

campione di posta dei dati e come creare il df –

+1

Un'opzione è creare un UDF. –

+1

Bene, dopo aver guardato il codice sorgente (poiché lo scaladoc per 'Column.contains' dice solo" Contiene l'altro elemento "che non è molto illuminante), vedo che' Column.contains' costruisce un'istanza di 'org.apache .spark.sql.catalyst.expressions.Contains' che dice "Una funzione che restituisce true se la stringa' left' contiene la stringa 'right'". Quindi sembra che 'df1 (" tags "). Contains' non possa fare ciò che vogliamo che faccia in questo caso. Non so quale alternativa suggerire. C'è un 'ArrayContains' anche in' ... espressioni' ma 'Column' non sembra farne uso. –

risposta

13

Penso che se si utilizza where(array_contains(...)) che funzionerà. Ecco il mio risultato:

scala> import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext 

scala> import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.DataFrame 

scala> def testData (sc: SparkContext): DataFrame = { 
    |  val stringRDD = sc.parallelize(Seq 
    |  ("""{ "name": "ned", "tags": ["blue", "big", "private"] }""", 
    |  """{ "name": "albert", "tags": ["private", "lumpy"] }""", 
    |  """{ "name": "zed", "tags": ["big", "private", "square"] }""", 
    |  """{ "name": "jed", "tags": ["green", "small", "round"] }""", 
    |  """{ "name": "ed", "tags": ["red", "private"] }""", 
    |  """{ "name": "fred", "tags": ["public", "blue"] }""")) 
    |  val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    |  import sqlContext.implicits._ 
    |  sqlContext.read.json(stringRDD) 
    | } 
testData: (sc: org.apache.spark.SparkContext)org.apache.spark.sql.DataFrame 

scala> 
    | val df = testData (sc) 
df: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>] 

scala> val report = df.select ("*").where (array_contains (df("tags"), "private")) 
report: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>] 

scala> report.show 
+------+--------------------+ 
| name|    tags| 
+------+--------------------+ 
| ned|[blue, big, private]| 
|albert| [private, lumpy]| 
| zed|[big, private, sq...| 
| ed|  [red, private]| 
+------+--------------------+ 

Nota che funziona se si scrive where(array_contains(df("tags"), "private")), ma se si scrive where(df("tags").array_contains("private")) (più direttamente analogo a quello che hai scritto in origine) non riesce con array_contains is not a member of org.apache.spark.sql.Column. Guardando il codice sorgente per Column, vedo che ci sono alcune cose per gestire contains (costruendo un'istanza Contains per quello) ma non array_contains. Forse è una svista.

0

È possibile utilizzare ordinale per fare riferimento a json array per es. nel tuo caso df("tags")(0). Ecco un esempio di lavoro

scala> val stringRDD = sc.parallelize(Seq(""" 
    |  { "name": "ed", 
    |   "tags": ["private"] 
    |  }""", 
    |  """{ "name": "fred", 
    |   "tags": ["public"] 
    |  }""") 
    | ) 
stringRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[87] at parallelize at <console>:22 

scala> import sqlContext.implicits._ 
import sqlContext.implicits._ 

scala> sqlContext.read.json(stringRDD) 
res28: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>] 

scala> val df=sqlContext.read.json(stringRDD) 
df: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>] 

scala> df.columns 
res29: Array[String] = Array(name, tags) 

scala> df.dtypes 
res30: Array[(String, String)] = Array((name,StringType), (tags,ArrayType(StringType,true))) 

scala> val report = df.select("*").where(df("tags")(0).contains("private")) 
report: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>] 

scala> report.show 
+----+-------------+ 
|name|   tags| 
+----+-------------+ 
| ed|List(private)| 
+----+-------------+ 
+0

grazie. funziona se pos è fisso ma non lo è.Avrei dovuto rendere i dati di test un po 'più complessi, ci può essere un numero qualsiasi di tag nell'array, la posizione è arbitraria. – navicore

+0

@navicore quindi il tuo codice dovrebbe funzionare. controlla il mio aggiornamento –

+0

interessante, mi manca qualcosa, sembra esattamente quello che stavo facendo, ma ottenendo l'errore per. versioni a doppia verifica scintilla ora ... – navicore

Problemi correlati