2015-10-13 22 views
18

Ho una connessione JDBC con Apache Spark e PostgreSQL e voglio inserire alcuni dati nel mio database. Quando utilizzo la modalità append, devo specificare id per ogni DataFrame.Row. C'è un modo per Spark di creare chiavi primarie?Chiavi primarie con Apache Spark

+0

Avete esigenze particolari? Tipo di dati, valori consecutivi, qualcos'altro? – zero323

+0

no, solo i vecchi interi unici validi – Nhor

risposta

30

Scala:

Se tutto ciò che serve è numeri univoci è possibile utilizzare zipWithUniqueId e ricreare dataframe. In primo luogo alcune importazioni e dati fittizi:

import sqlContext.implicits._ 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructType, StructField, LongType} 

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar") 

schema estratto per un ulteriore utilizzo:

val schema = df.schema 

campo id Aggiungi:

val rows = df.rdd.zipWithUniqueId.map{ 
    case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)} 

Crea dataframe:

val dfWithPK = sqlContext.createDataFrame(
    rows, StructType(StructField("id", LongType, false) +: schema.fields)) 

Lo stesso cosa in Python:

from pyspark.sql import Row 
from pyspark.sql.types import StructField, StructType, LongType 

row = Row("foo", "bar") 
row_with_index = Row(*["id"] + df.columns) 

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF() 

def make_row(columns): 
    def _make_row(row, uid): 
     row_dict = row.asDict() 
     return row_with_index(*[uid] + [row_dict.get(c) for c in columns]) 
    return _make_row 

f = make_row(df.columns) 

df_with_pk = (df.rdd 
    .zipWithUniqueId() 
    .map(lambda x: f(*x)) 
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields))) 

Se si preferisce il tuo numero progressivo può sostituire zipWithUniqueId con zipWithIndex ma è un po 'più costoso.

Direttamente con DataFrame API:

(universale Scala, Python, Java, R con più o meno la stessa sintassi)

In precedenza ho perso monotonicallyIncreasingId funzione che dovrebbe funzionare bene come fino a quando non si richiedono numeri consecutivi:

import org.apache.spark.sql.functions.monotonicallyIncreasingId 

df.withColumn("id", monotonicallyIncreasingId).show() 
// +---+----+-----------+ 
// |foo| bar|   id| 
// +---+----+-----------+ 
// | a|-1.0|17179869184| 
// | b|-2.0|42949672960| 
// | c|-3.0|60129542144| 
// +---+----+-----------+ 

Mentre utile monotonicallyIncreasingId non è deterministico. Non solo gli ID possono essere diversi dall'esecuzione all'esecuzione, ma senza trucchi aggiuntivi non possono essere utilizzati per identificare le righe quando le operazioni successive contengono filtri.

Nota:

E 'anche possibile utilizzare rowNumber funzione finestra:

from pyspark.sql.window import Window 
from pyspark.sql.functions import rowNumber 

w = Window().orderBy() 
df.withColumn("id", rowNumber().over(w)).show() 

Sfortunatamente:

WARN Window: No partizione definita per il funzionamento della finestra! Spostando tutti i dati su una singola partizione, ciò può causare un serio peggioramento delle prestazioni.

Quindi, a meno che non si abbia un modo naturale di partizionare i dati e garantire l'unicità non è particolarmente utile in questo momento.

+0

funzioneranno solo con R? so che hai usato scala sopra, ma tutto quello che posso trovare su questo 'zipWithUniqueId' è solo nei documenti SparkR – Nhor

+0

In realtà è Scala. Hai bisogno di una soluzione Python? SQL semplice? – zero323

+0

no no, posso capire il tuo codice, stavo solo chiedendo se c'è qualcosa nei documenti di pyspark su 'zipWithUniqueId', ma sembra che fossi solo pigro, perché alla fine l'ho trovato, grazie mille per la tua soluzione! – Nhor

7
from pyspark.sql.functions import monotonically_increasing_id 

df.withColumn("id", monotonically_increasing_id()).show() 

Nota che il secondo argomento della df.withColumn è monotonically_increasing_id() non monotonically_increasing_id.

3

Ho trovato la seguente soluzione relativamente semplice per il caso in cui zipWithIndex() è il comportamento desiderato, vale a dire per quelli che desiderano numeri interi consecutivi.

In questo caso, utilizziamo pyspark e ci basiamo sulla comprensione del dizionario per mappare l'oggetto riga originale in un nuovo dizionario che si adatta a un nuovo schema incluso l'indice univoco.

# read the initial dataframe without index 
dfNoIndex = sqlContext.read.parquet(dataframePath) 
# Need to zip together with a unique integer 

# First create a new schema with uuid field appended 
newSchema = StructType([StructField("uuid", IntegerType(), False)] 
         + dfNoIndex.schema.fields) 
# zip with the index, map it to a dictionary which includes new field 
df = dfNoIndex.rdd.zipWithIndex()\ 
         .map(lambda (row, id): {k:v 
               for k, v 
               in row.asDict().items() + [("uuid", id)]})\ 
         .toDF(newSchema)