Ho un dataframe Spark in che consiste in una serie di date:calcolare la durata sottraendo due colonne datetime in formato stringa
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
import pandas as pd
rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'),
('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'),
('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),
('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'),
('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')])
schema = StructType([StructField('ID', StringType(), True),
StructField('EndDateTime', StringType(), True),
StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
Quello che voglio fare è trovare duration
sottraendo EndDateTime
e StartDateTime
. Ho pensato di cercare di fare questo utilizzando una funzione:
# Function to calculate time delta
def time_delta(y,x):
end = pd.to_datetime(y)
start = pd.to_datetime(x)
delta = (end-start)
return delta
# create new RDD and add new column 'Duration' by applying time_delta function
df2 = df.withColumn('Duration', time_delta(df.EndDateTime, df.StartDateTime))
Tuttavia, questo mi dà:
>>> df2.show()
ID EndDateTime StartDateTime ANI Duration
X01 2014-02-13T12:36:... 2014-02-13T12:31:... sip:4534454450 null
X02 2014-02-13T12:35:... 2014-02-13T12:32:... sip:6413445440 null
X03 2014-02-13T12:36:... 2014-02-13T12:32:... sip:4534437492 null
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... sip:6474454453 null
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... sip:8874458555 null
io non sono sicuro se il mio approccio è corretto o meno. In caso contrario, accetterei volentieri un altro modo suggerito per raggiungere questo obiettivo.
Hai provato il debug nel REPL? – dskrvk
@dskrvk Non ho molta esperienza nel debug poiché non sono uno sviluppatore. Tuttavia, sospetto che il problema riguardi il modo in cui Spark consegna i dati alle funzioni. Ad esempio, time_delta() funziona in puro Python. Per qualche ragione, alcune funzioni di Python/Pandas semplicemente non suonano bene. Per esempio. import re def extract_ani (x): extract = x.str.extract (r '(\ d {10})') restituisce extract Dates = Dates.withColumn ('Cell', extract_ani (Date.ANI)) anche errori con Spark DataFrames, ma funziona quando converto il dataframe in un RDD e uso la funzione come parte di un 'sc.map' – Jason
In Scala vorrei usare TimestampType invece di StringType per contenere le date, e quindi creare una UDF per calcolare la differenza tra le due colonne. Non vedo da nessuna parte che tu dichiari time_delta come funzione definita dall'utente, ma è un passaggio obbligato in Scala per farlo fare ciò che stai cercando di fare. –