2015-04-20 17 views
6

ho un set di dati e voglio estrarre coloro (recensione/testo) hanno (recensione/ora) tra X e Y, per esempio (1183,3344 milioni < tempo < 1.185.926,4 mila),filtro RDD in scala scintilla

qui sono campioni dei miei dati:

product/productId: B000278ADA 
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large 
product/price: 46.34 
review/userId: A17KXW1PCUAIIN 
review/profileName: Mark Anthony "Mark" 
review/helpfulness: 4/4 
review/score: 5.0 
review/time: 1174435200 
review/summary: Jobst UltraSheer Knee High Stockings 
review/text: Does a very good job of relieving fatigue. 

product/productId: B000278ADB 
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large 
product/price: 46.34 
review/userId: A9Q3932GX4FX8 
review/profileName: Trina Wehle 
review/helpfulness: 1/1 
review/score: 3.0 
review/time: 1352505600 
review/summary: Delivery was very long wait..... 
review/text: It took almost 3 weeks to recieve the two pairs of stockings . 

product/productId: B000278ADB 
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large 
product/price: 46.34 
review/userId: AUIZ1GNBTG5OB 
review/profileName: dgodoy 
review/helpfulness: 1/1 
review/score: 2.0 
review/time: 1287014400 
review/summary: sizes recomended in the size chart are not real 
review/text: sizes are much smaller than what is recomended in the chart. I tried to put it and sheer it!. 

mio Spark-Scala codice:

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 
import org.apache.spark.{SparkConf, SparkContext} 

object test1 { 
    def main(args: Array[String]): Unit = { 
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local") 
    val sc = new SparkContext(conf1) 
    val conf: Configuration = new Configuration 
    conf.set("textinputformat.record.delimiter", "product/title:") 
    val input1=sc.newAPIHadoopFile("data/Electronics.txt",  classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) 
    val lines = input1.map { text => text._2} 
    val filt = lines.filter(text=>(text.toString.contains(tt => tt in (startdate until enddate)))) 
    filt.saveAsTextFile("data/filter1") 
    } 
} 

ma il mio codice non funziona,

come posso filtrare queste linee?

+1

Non vedo la stringa del delimitatore "product/productId:" nel file di input. – ipoteka

+1

cosa ti aspetti come output e quale problema stai affrontando? – maasg

risposta

10

È molto più semplice di così. Prova questo:

object test1 
{ 
    def main(args: Array[String]): Unit = 
    { 
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local") 
    val sc = new SparkContext(conf1) 

    def extractDateAndCompare(line: String): Boolean= 
    { 
     val from = line.indexOf("/time: ") + 7 
     val to = line.indexOf("review/text: ") -1 
     val date = line.substring(from, to).toLong 
     date > startDate && date < endDate 
    } 

    sc.textFile("data/Electronics.txt") 
     .filter(extractDateAndCompare) 
     .saveAsTextFile("data/filter1") 
    } 
} 

Di solito trovo quei metodi intermedi ausiliari per rendere le cose molto più chiare. Ovviamente, ciò presuppone che le date di confine siano definite da qualche parte e che il file di input contenga problemi di formato. L'ho fatto intenzionalmente per mantenerlo semplice, ma aggiungendo una prova, la restituzione di una clausola Option e l'uso di flatMap() possono aiutarti a evitare errori se li hai.

Inoltre, il tuo testo non elaborato è un po 'macchinoso, potresti voler esplorare Json, i file TSV o qualche altro formato alternativo più semplice.

+0

Nota, l'ho codificato qui da zero, potrebbero esserci piccoli dettagli sugli indici, ecc. Ma spero che tu abbia capito l'idea. –

+0

Caro Daniel, ho 1 gigabyte di dati di recensioni (testo) Ecco un esempio il mio set di dati: prodotto/productId: B000278ADA prodotto/title: Jobst Ultr prodotto/prezzo: 46.34 recensione/userId: A1ZJAH4 recensione/profileName: jud doolitt review/helpfulness: 0/0 recensione/punteggio: 5.0 recensione/tempo: 1359936000 recensione/riepilogo: Shopping One recensione/testo: È meraviglioso trovare esattamente quello che stai cercando. Voglio estrarre recensioni/testi che si trovano in un periodo di tempo, ad esempio voglio estrarre recensione/testo nell'anno 2002, per questo lavoro scrivo sopra il codice che considera un intero dato di revisione come Record di RDD –

+0

Oh , Vedo che hai aggiornato il testo di esempio. Quindi questo significa che ogni "record" genera più righe? –

Problemi correlati