2015-05-17 17 views
14

Ho un dataframe Spark in che consiste in una serie di date:calcolare la durata sottraendo due colonne datetime in formato stringa

from pyspark.sql import SQLContext 
from pyspark.sql import Row 
from pyspark.sql.types import * 
sqlContext = SQLContext(sc) 
import pandas as pd 

rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'), 
            ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'), 
            ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'), 
            ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'), 
            ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')]) 
schema = StructType([StructField('ID', StringType(), True), 
        StructField('EndDateTime', StringType(), True), 
        StructField('StartDateTime', StringType(), True)]) 
df = sqlContext.createDataFrame(rdd, schema) 

Quello che voglio fare è trovare duration sottraendo EndDateTime e StartDateTime. Ho pensato di cercare di fare questo utilizzando una funzione:

# Function to calculate time delta 
def time_delta(y,x): 
    end = pd.to_datetime(y) 
    start = pd.to_datetime(x) 
    delta = (end-start) 
    return delta 

# create new RDD and add new column 'Duration' by applying time_delta function 
df2 = df.withColumn('Duration', time_delta(df.EndDateTime, df.StartDateTime)) 

Tuttavia, questo mi dà:

>>> df2.show() 
ID EndDateTime   StartDateTime  ANI   Duration 
X01 2014-02-13T12:36:... 2014-02-13T12:31:... sip:4534454450 null  
X02 2014-02-13T12:35:... 2014-02-13T12:32:... sip:6413445440 null  
X03 2014-02-13T12:36:... 2014-02-13T12:32:... sip:4534437492 null  
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... sip:6474454453 null  
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... sip:8874458555 null 

io non sono sicuro se il mio approccio è corretto o meno. In caso contrario, accetterei volentieri un altro modo suggerito per raggiungere questo obiettivo.

+0

Hai provato il debug nel REPL? – dskrvk

+0

@dskrvk Non ho molta esperienza nel debug poiché non sono uno sviluppatore. Tuttavia, sospetto che il problema riguardi il modo in cui Spark consegna i dati alle funzioni. Ad esempio, time_delta() funziona in puro Python. Per qualche ragione, alcune funzioni di Python/Pandas semplicemente non suonano bene. Per esempio. import re def extract_ani (x): extract = x.str.extract (r '(\ d {10})') restituisce extract Dates = Dates.withColumn ('Cell', extract_ani (Date.ANI)) anche errori con Spark DataFrames, ma funziona quando converto il dataframe in un RDD e uso la funzione come parte di un 'sc.map' – Jason

+0

In Scala vorrei usare TimestampType invece di StringType per contenere le date, e quindi creare una UDF per calcolare la differenza tra le due colonne. Non vedo da nessuna parte che tu dichiari time_delta come funzione definita dall'utente, ma è un passaggio obbligato in Scala per farlo fare ciò che stai cercando di fare. –

risposta

23

Come di Spark 1.5 è possibile utilizzare unix_timestamp:

from pyspark.sql import functions as F 
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS" 
timeDiff = (F.unix_timestamp('EndDateTime', format=timeFmt) 
      - F.unix_timestamp('StartDateTime', format=timeFmt)) 
df = df.withColumn("Duration", timeDiff) 

Nota il formato dell'ora stile Java.

>>> df.show() 
+---+--------------------+--------------------+--------+ 
| ID|   EndDateTime|  StartDateTime|Duration| 
+---+--------------------+--------------------+--------+ 
|X01|2014-02-13T12:36:...|2014-02-13T12:31:...|  258| 
|X02|2014-02-13T12:35:...|2014-02-13T12:32:...|  204| 
|X03|2014-02-13T12:36:...|2014-02-13T12:32:...|  228| 
|XO4|2014-02-13T12:37:...|2014-02-13T12:32:...|  269| 
|XO5|2014-02-13T12:36:...|2014-02-13T12:33:...|  202| 
+---+--------------------+--------------------+--------+ 
+0

Puoi dividere per 3600.0 per convertire in ore 'df.withColumn (" Durata_ora ", df.Durazione/3600.0)' –

13

Grazie a David Griffin. Ecco come fare questo per riferimento futuro.

from pyspark.sql import SQLContext, Row 
sqlContext = SQLContext(sc) 
from pyspark.sql.types import StringType, IntegerType, StructType, StructField 
from pyspark.sql.functions import udf 

# Build sample data 
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'), 
         ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'), 
         ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'), 
         ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'), 
         ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')]) 
schema = StructType([StructField('ID', StringType(), True), 
        StructField('EndDateTime', StringType(), True), 
        StructField('StartDateTime', StringType(), True)]) 
df = sqlContext.createDataFrame(rdd, schema) 

# define timedelta function (obtain duration in seconds) 
def time_delta(y,x): 
    from datetime import datetime 
    end = datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f') 
    start = datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f') 
    delta = (end-start).total_seconds() 
    return delta 

# register as a UDF 
f = udf(time_delta, IntegerType()) 

# Apply function 
df2 = df.withColumn('Duration', f(df.EndDateTime, df.StartDateTime)) 

Applicando time_delta() vi darà durata in secondi:

>>> df2.show() 
ID EndDateTime   StartDateTime  Duration 
X01 2014-02-13T12:36:... 2014-02-13T12:31:... 258  
X02 2014-02-13T12:35:... 2014-02-13T12:32:... 204  
X03 2014-02-13T12:36:... 2014-02-13T12:32:... 228  
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... 268  
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... 202 
+2

Utilizzare (end-start) .total_seconds(). Altrimenti si ottengono brutte sorprese come questa: time_delta ('2014-02-13T12: 36: 14.000', '2014-02-13T12: 36: 15.900') restituisce 86398 invece di -1.9 – user2158166

+0

buona chiamata @ utente2158166. Ho aggiornato lo script sopra. – Jason

+2

Questo codice non funziona più. La durata risulta nullo. Usando zeppelin, spark 1.6 –

0

Ecco una versione funzionante scintilla 2.x derivato dal di answer

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession,SQLContext 
from pyspark.sql.types import StringType, StructType, StructField 

sc = SparkContext() 
sqlContext = SQLContext(sc) 
spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate() 

rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'), 
         ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'), 
         ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'), 
         ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'), 
         ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')]) 
schema = StructType([StructField('ID', StringType(), True), 
        StructField('EndDateTime', StringType(), True), 
        StructField('StartDateTime', StringType(), True)]) 
df = sqlContext.createDataFrame(rdd, schema) 

# register as a UDF 
from datetime import datetime 
sqlContext.registerFunction("time_delta", lambda y,x:(datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')-datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')).total_seconds()) 

df.createOrReplaceTempView("Test_table") 

spark.sql("SELECT ID,EndDateTime,StartDateTime,time_delta(EndDateTime,StartDateTime) as time_delta FROM Test_table").show() 

sc.stop() 
0

Questo può essere fatto in scintilla-SQL convertendo la data stringa timestamp e quindi ottenere la differenza jason .

1: Convertire a timestamp:

CAST(UNIX_TIMESTAMP(MY_COL_NAME,'dd-MMM-yy') as TIMESTAMP 

2: Prendi la differenza tra le date utilizzando datediff funzione.

Questo saranno combinati in una funzione annidata come:

spark.sql("select COL_1, COL_2, datediff(CAST(UNIX_TIMESTAMP(COL_1,'dd-MMM-yy') as TIMESTAMP), CAST(UNIX_TIMESTAMP(COL_2,'dd-MMM-yy') as TIMESTAMP)) as LAG_in_days from MyTable") 

seguenti è il risultato:

+---------+---------+-----------+ 
| COL_1| COL_2|LAG_in_days| 
+---------+---------+-----------+ 
|24-JAN-17|16-JAN-17|   8| 
|19-JAN-05|18-JAN-05|   1| 
|23-MAY-06|23-MAY-06|   0| 
|18-AUG-06|17-AUG-06|   1| 
+---------+---------+-----------+ 

Riferimento: https://docs-snaplogic.atlassian.net/wiki/spaces/SD/pages/2458071/Date+Functions+and+Properties+Spark+SQL

Problemi correlati