2015-04-24 13 views
15

Ho un CSV in cui un campo è datetime in un formato specifico. Non posso importarlo direttamente nel mio Dataframe perché deve essere un timestamp. Così ho importarlo come stringa e convertirlo in un Timestamp come questoUn modo migliore per convertire un campo stringa in data e ora in Spark

import java.sql.Timestamp 
import java.text.SimpleDateFormat 
import java.util.Date 
import org.apache.spark.sql.Row 

def getTimestamp(x:Any) : Timestamp = { 
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") 
    if (x.toString() == "") 
    return null 
    else { 
     val d = format.parse(x.toString()); 
     val t = new Timestamp(d.getTime()); 
     return t 
    } 
} 

def convert(row : Row) : Row = { 
    val d1 = getTimestamp(row(3)) 
    return Row(row(0),row(1),row(2),d1) 
} 

C'è una migliore, modo più conciso per fare questo, con l'API dataframe o scintille sql? Il metodo sopra richiede la creazione di un RDD e di fornire nuovamente lo schema per il Dataframe.

risposta

6

non ho giocato con Spark SQL ancora, ma credo che questo sarebbe scala più idiomatica (utilizzo nullo non è considerata una buona pratica):

def getTimestamp(s: String) : Option[Timestamp] = s match { 
    case "" => None 
    case _ => { 
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") 
    Try(new Timestamp(format.parse(s).getTime)) match { 
     case Success(t) => Some(t) 
     case Failure(_) => None 
    }  
    } 
} 

Si prega di notare Suppongo che tu sai elementi tipi anticipo (se lo leggi da un file CSV, tutti sono String), ecco perché uso un tipo corretto come String e non Any (tutto è sottotipo di Any).

Dipende anche da come si desidera gestire le eccezioni di analisi. In questo caso, se si verifica un'eccezione di parsing, viene semplicemente restituito None.

Si può usare più avanti con:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3)) 
+0

L'ho già fatto. Ho sentito che dovrei affrontare il problema principale prima di passare a simili sottigliezze. Se c'è una soluzione migliore, potrebbe non doverlo fare affatto. Il problema riguarda la file rows.map che restituisce rdd e dovrà essere convertito in ddf. Quindi potrebbe essere che manca ddf api o non so come farlo. – user568109

+0

Non so se c'è un altro modo, ma è possibile convertire qualsiasi RDD in un DF senza problemi. In questo esempio concreto con 'sqlContext.createDataFrame (rowRDD, schema)'. Per me spark sql è bello interrogare i tuoi dati in un modo simile a SQL, non analizzare i dati stessi (per esempio, usa semplici RDD). – jarandaf

+0

Prova (nuovo Timestamp (format.parse (s) .getTime)). ToOption – nont

1

Vorrei spostare il metodo getTimeStamp scritto da voi in mapPartitions di RDD e riutilizzare GenericMutableRow tra le righe in un iteratore:

val strRdd = sc.textFile("hdfs://path/to/cvs-file") 
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter => 
    new Iterator[Row] { 
    val row = new GenericMutableRow(4) 
    var current: Array[String] = _ 

    def hasNext = iter.hasNext 
    def next() = { 
     current = iter.next() 
     row(0) = current(0) 
     row(1) = current(1) 
     row(2) = current(2) 

     val ts = getTimestamp(current(3)) 
     if(ts != null) { 
     row.update(3, ts) 
     } else { 
     row.setNullAt(3) 
     } 
     row 
    } 
    } 
} 

E dovresti comunque utilizzare lo schema per generare un DataFrame

val df = sqlContext.createDataFrame(rowRdd, tableSchema) 

L'utilizzo di GenericMutableRow all'interno un'implementazione iteratore potrebbe essere trovare in Aggregate Operator, InMemoryColumnarTableScan, ParquetTableOperations ecc

+0

È molto vicino al mio codice attuale. Inoltre, se vuoi analizzare il file csv, dovresti probabilmente usare spark-csv invece di split. Il punto che volevo fare è aggiungere e mutare colonne ti restituirà un rdd che dovrà essere nuovamente convertito in ddf dando uno schema. C'è un percorso più breve. – user568109

+0

@ user568109, non credo che ce ne sia uno. Dal momento che spark-sql avrebbe bisogno di uno schema, deve averne uno in qualche modo. Se si utilizza RDD [CaseClassX], spark-sql inferirebbe automaticamente lo schema per te, dalla definizione della classe del case. Ma tu usi qui una riga (Array [Any]), nessuna inferenza DataType potrebbe andare lì, quindi basta passarne una. –

+0

Penso che usare un riferimento, mutarlo ogni volta e restituirlo come riferimento sia una ricetta per il disastro. Hai effettivamente utilizzato questo approccio con successo? – maasg

1

ho timestamp ISO8601 nel mio set di dati e avevo bisogno di convertirlo in formato "AAAA-MM-dd". Questo è quello che ho fatto:

import org.joda.time.{DateTime, DateTimeZone} 
object DateUtils extends Serializable { 
    def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC) 
    def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC) 
} 

sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd")) 

e si può solo usare l'UDF nella query SQL scintilla.

31

Spark> = 2,2

import org.apache.spark.sql.functions.to_timestamp 

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") 
df.withColumn("ts", ts).show(2, false) 

df.withColumn("ts", ts).show(2, false) 

// +---+-------------------+-------------------+ 
// |id |dts    |ts     | 
// +---+-------------------+-------------------+ 
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01| 
// |2 |#[email protected]#@#    |null    | 
// +---+-------------------+-------------------+ 

Spark> = 1.6, < 2,2

È possibile utilizzare le funzioni di data di elaborazione che sono state introdotte in Spark 1.5. Supponendo di aver seguenti dati:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#[email protected]#@#")).toDF("id", "dts") 

È possibile utilizzare unix_timestamp per analizzare le stringhe e gettarlo ai timestamp

import org.apache.spark.sql.functions.unix_timestamp 

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp") 

df.withColumn("ts", ts).show(2, false) 

// +---+-------------------+---------------------+ 
// |id |dts    |ts     | 
// +---+-------------------+---------------------+ 
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0| 
// |2 |#[email protected]#@#    |null     | 
// +---+-------------------+---------------------+ 

Come si può vedere che copre sia la gestione di analisi e l'errore.

Spark> = 1.5, < 1,6

dovrete usare l'utilizzo o meno così:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp") 

o

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp") 

a causa di SPARK-11724.

Spark < 1,5

si dovrebbe essere in grado di utilizzare questi con expr e HiveContext.

0

userei https://github.com/databricks/spark-csv

Ciò dedurre timestamp per voi.

import com.databricks.spark.csv._ 
val rdd: RDD[String] = sc.textFile("csvfile.csv") 

val df : DataFrame = new CsvParser().withDelimiter('|') 
     .withInferSchema(true) 
     .withParseMode("DROPMALFORMED") 
     .csvRdd(sqlContext, rdd) 
Problemi correlati