2015-03-29 9 views
5

Di seguito è riportato il mio script spark sql che carica un file e utilizza SQL su di esso, voglio raccogliere l'output dalla query sql e scriverlo su un file, non so come aiutare qualcuno.come raccogliere l'output di spark sql in un file?

//import classes for sql 
 
import org.apache.spark.sql.SQLContext 
 
import org.apache.spark.{SparkConf, SparkContext} 
 

 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
 

 
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. 
 
import sqlContext.createSchemaRDD 
 

 

 
//hdfs paths 
 
val warehouse="hdfs://quickstart.cloudera/user/hive/warehouse/" 
 
val customers_path=warehouse+"people/people.txt" 
 
customers_path 
 

 
//create rdd file called file 
 
val file=sc.textFile(customers_path) 
 

 
val schemaString="name age" 
 

 
import org.apache.spark.sql._ 
 

 

 

 
val schema = 
 
    StructType(
 
    schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true))) 
 

 
val rowRDD=file.map(_.split(",")).map(p => Row(p(0),p(1).trim)) 
 

 
val peopleSchemRDD=sqlContext.applySchema(rowRDD, schema) 
 

 
// Register the SchemaRDD as a table. 
 
peopleSchemRDD.registerTempTable("people") 
 

 
// SQL statements can be run by using the sql methods provided by sqlContext. 
 
sqlContext.sql("select count(*) from people").collect().foreach(println) 
 
System.exit(0)

+0

Nel codice fornito il risultato di la query è solo un numero, giusto? Stai chiedendo come scrivere un numero in un file in Scala? –

+0

sì Voglio che il numero o l'output sia scritto su un file, c'è un modo per farlo? –

+0

op val = sqlContext.sql ("select count (*) dalla gente") val c = op.collect() val RDD = sc.parallelize (c) rdd.saveAsTextFile ("/ home/Cloudera/op" System.exit (0) –

risposta

4

Se si desidera solo per contare il numero di righe in un file di grandi dimensioni su HDFS e scrivere in un altro file:

import java.nio.file.{ Files, Paths } 
val path = "hdfs://quickstart.cloudera/user/hive/warehouse/people/people.txt" 
val rdd = sc.textFile(path) 
val linesCount = rdd.count 
Files.write(Paths.get("line_count.txt"), linesCount.toString.getBytes) 
0

//import classes for sql 
 
import sqlContext.implicits._ 
 
import org.apache.spark.sql.SQLContext 
 
import org.apache.spark.{SparkConf, SparkContext} 
 

 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
 

 
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. 
 
import sqlContext.createSchemaRDD 
 
import sqlContext.implicits._ 
 

 
//hdfs paths 
 
val warehouse="hdfs://quickstart.cloudera/user/hive/warehouse/" 
 
val customers_path=warehouse+"people/people.txt" 
 
customers_path 
 

 
//create rdd file called file 
 
val file=sc.textFile(customers_path) 
 

 
val schemaString="name age" 
 

 
import org.apache.spark.sql._ 
 

 

 

 
val schema = 
 
    StructType(
 
    schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true))) 
 

 
val rowRDD=file.map(_.split(",")).map(p => Row(p(0),p(1).trim)) 
 

 
val peopleSchemRDD=sqlContext.applySchema(rowRDD, schema) 
 

 
// Register the SchemaRDD as a table. 
 
peopleSchemRDD.registerTempTable("people") 
 

 
// SQL statements can be run by using the sql methods provided by sqlContext. 
 
val op=sqlContext.sql("select count(*) from people") 
 
val c=op.collect() 
 
val rdd=sc.parallelize(c) 
 
rdd.saveAsTextFile("/home/cloudera/op") 
 
System.exit(0)

+2

Nessun motivo per creare un RDD a 1 elemento solo per scrivere un file. –

Problemi correlati