2015-12-28 14 views
8

Quando eseguire sotto comando:predefinito Schema di partizionamento in Spark

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4).partitionBy(new HashPartitioner(10)).persist() 
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at partitionBy at <console>:22 

scala> rdd.partitions.size 
res9: Int = 10 

scala> rdd.partitioner.isDefined 
res10: Boolean = true 


scala> rdd.partitioner.get 
res11: org.apache.spark.Partitioner = [email protected] 

Si dice che ci sono 10 le partizioni e partizionamento avviene tramite HashPartitioner. Ma quando eseguo sotto il comando:

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4) 
... 
scala> rdd.partitions.size 
res6: Int = 4 
scala> rdd.partitioner.isDefined 
res8: Boolean = false 

Si dice che ci siano 4 partizioni e il partizionatore non sia definito. Quindi, Qual è lo schema di partizionamento predefinito in Spark?/Come vengono suddivisi i dati nel secondo caso?

risposta

11

Bisogna distinguere tra due cose diverse:

  • partizionamento come la distribuzione dei dati tra le partizioni in funzione di un valore della chiave che è limitata solo al PairwiseRDDs (RDD[(T, U)]). Ciò crea una relazione tra la partizione e il set di chiavi che possono essere trovate in una determinata partizione.
  • partizionamento come input di suddivisione in partizioni multiple in cui i dati vengono semplicemente suddivisi in blocchi contenenti record consecutivi per consentire il calcolo distribuito. La logica esatta dipende da una fonte specifica ma è o il numero di record o la dimensione di un blocco.

    In caso di parallelize i dati sono equamente distribuiti tra le partizioni utilizzando gli indici. In caso di HadoopInputFormats (come textFile) dipende da proprietà come mapreduce.input.fileinputformat.split.minsize/mapreduce.input.fileinputformat.split.maxsize.

Quindi lo schema di partizionamento predefinito è semplicemente nessuno perché il partizionamento non è applicabile a tutti gli RDD. Per le operazioni che richiedono il partizionamento su un PairwiseRDD (aggregateByKey, reduceByKey ecc.) Il metodo predefinito è utilizzare il partizionamento hash.

+0

OK !! Grazie per le spiegazioni .. Ho controllato che i dati vengano tagliati usando gli indici in un caso successivo (o calcolando l'inizio e la fine dividendolo con il numero di partizioni) –

Problemi correlati