2016-02-05 12 views
5

Di seguito è riportato il codice di esempio che sto utilizzando. quando viene eseguito questo processo spark, i join di Dataframe si stanno verificando utilizzando sortmergejoin anziché broadcastjoin.Trasmissione non avvenuta durante l'adesione a dataframes in Spark 1.6

Il broadcastjoin non sta accadendo anche quando si specifica un suggerimento broadcast() nell'istruzione join.

L'ottimizzatore sta eseguendo il partizionamento del dataframe e sta causando l'inclinazione dei dati.

Qualcuno ha visto questo comportamento?

L'ho eseguito su filato utilizzando Spark 1.6 e HiveContext come SQLContext. Il lavoro spark funziona su 200 esecutori. e la dimensione dei dati di txnTable è 240 GB e il datasize di countriesDf è 5mb.

risposta

7

Sia il modo in cui si trasmette DataFrame e come si accede a esso non sono corretti.

  • Le trasmissioni standard non possono essere utilizzate per gestire strutture dati distribuite. Se si desidera eseguire la trasmissione di unirsi a un DataFrame si dovrebbe usare broadcast funzioni che segna dato DataFrame per la trasmissione:

    import org.apache.spark.sql.functions.broadcast 
    
    val countriesDf: DataFrame = ??? 
    val tmp: DataFrame = broadcast(
        countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries") 
    ) 
    
    txnTable.as("df1").join(
        broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner") 
    

    Internamente lo farà collecttmp senza conversione da interno e broadcast in seguito.

  • gli argomenti di join vengono valutati con entusiasmo. Anche è stato possibile utilizzare SparkContext.broadcast con il valore di trasmissione della struttura dati distribuita localmente prima che venga chiamato join. Questo è il motivo per cui la funzione funziona, ma non esegue il broadcast join.

+0

Ora, sto vedendo BroadcastHashJoin in una corsa e SortMergeJoin in un'altra corsa. (stesso codice, set di dati diversi). –

+0

Suppongo che superi la soglia di dimensione per i join di trasmissione. – zero323

+0

Ho uno spark.sql.autoBroadcastJoinThreshold molto alto. Circa. 1 GB. E il file che viene trasmesso è di circa 5 MB. Tuttavia, in altri termini, la raccomandazione sopra funziona alla grande. –

Problemi correlati