2015-01-23 6 views
8

Sto provando a creare una riga (org.apache.spark.sql.catalyst.expressions.Row) in base all'input dell'utente. Non sono in grado di creare una riga casualmente.Come creare una riga da un elenco o matrice in Spark utilizzando Scala

C'è qualche funzionalità per creare una Linea List o Array.

Per per esempio., Se ho un file .csv con il seguente formato,

"91xxxxxxxxxx,21.31,15,0,0" 

Se l'input dell'utente [1, 2] poi ho bisogno di prendere solo 2 ° colonna e 3 ° colonna insieme al customer_id che è la prima colonna

cerco di analizzarlo con il codice:

val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) ` 

dove foo è defi ned come

def f(n: List[Int], s: String) : Row = { 
    val n = input.length 
    var out = new Array[Any](n+1) 
    var r = s.split(",") 
    out(0) = r(0) 
    for (i <- 1 to n) 
     out(i) = r(input(i-1)).toDouble 
    Row(out) 
} 

e l'ingresso è un elenco dire

val input = List(1,2) 

L'esecuzione di questo codice ottengo l3 come:

Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916]) 

Ma quello che voglio è:

Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])` 

Questo deve essere passato per creare creare uno schema in Spark SQL

risposta

13

qualcosa come il seguente dovrebbe funzionare:

import org.apache.spark.sql._ 

def f(n: List[Int], s: String) : Row = 
    Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq) 
+5

Questo funziona correttamente, se voglio analizzarlo come una singola riga di 3 valori di stringa. Ma come usarlo, se il primo valore è una stringa, il 2 ° e il 3 ° valore sono di tipo Double? È possibile? – Anju

2

Ti manca creazione del StructField e StructType. Fare riferimento alla guida ufficiale http://spark.apache.org/docs/latest/sql-programming-guide.html, parte programmazione Definizione dello schema

Io non sono uno specialista di Scala, ma in Python che sarebbe simile a questa:

from pyspark.sql import * 
sqlContext = SQLContext(sc) 

input = [1,2] 

def parse(line): 
    global input 
    l = line.split(',') 
    res = [l[0]] 
    for ind in input: 
     res.append(l[ind]) 
    return res 

csv = sc.textFile("file:///tmp/inputfile.csv") 
rows = csv.map(lambda x: parse(x)) 

fieldnum = len(input) + 1 
fields = [StructField("col"+str(i), StringType(), True) for i in range(fieldnum)] 
schema = StructType(fields) 

csvWithSchema = sqlContext.applySchema(rows, schema) 
csvWithSchema.registerTempTable("test") 
sqlContext.sql("SELECT * FROM test").collect() 

In breve, non si dovrebbe convertire direttamente i loro oggetti a remare, basta lasciare come RDD e applicare lo schema ad essa con applySchema

0

potete anche provare:

Row.fromSeq(line(0).toString ++ line(1).toDouble ++ line(2).toDouble ++ line.slice(2, line.size).map(value => value.toString)) 
Problemi correlati