2015-12-28 10 views
8

Kiedy wykonać poniżej polecenia:Domyślny układ partycji w Spark

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4).partitionBy(new HashPartitioner(10)).persist() 
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at partitionBy at <console>:22 

scala> rdd.partitions.size 
res9: Int = 10 

scala> rdd.partitioner.isDefined 
res10: Boolean = true 


scala> rdd.partitioner.get 
res11: org.apache.spark.Partitioner = [email protected] 

Mówi, że istnieje 10 partycje i podział odbywa się za pomocą HashPartitioner. Ale kiedy wykonuję poniższe polecenie:

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4) 
... 
scala> rdd.partitions.size 
res6: Int = 4 
scala> rdd.partitioner.isDefined 
res8: Boolean = false 

Mówi, że istnieją 4 partycje i partycja nie jest zdefiniowana. Co to jest domyślny schemat partycji w Spark?/W jaki sposób dane są partycjonowane w drugim przypadku?

Odpowiedz

11

Trzeba rozróżnić dwie różne rzeczy:

  • partycjonowanie jako dystrybucję danych między partycjami w zależności od wartości klucza, który jest ograniczony tylko do PairwiseRDDs (RDD[(T, U)]). Tworzy to relację między partycją a zestawem kluczy, które można znaleźć na danej partycji.
  • Partycjonowanie jako dzielenie danych wejściowych na wiele partycji, gdzie dane są po prostu dzielone na porcje zawierające kolejne rekordy, aby umożliwić obliczenia rozproszone. Dokładna logika zależy od konkretnego źródła, ale jest to albo liczba rekordów, albo wielkość porcji.

    W przypadku danych parallelize dane są równomiernie rozdzielane między partycjami za pomocą indeksów. W przypadku HadoopInputFormats (jak textFile) zależy to od właściwości takich jak mapreduce.input.fileinputformat.split.minsize/mapreduce.input.fileinputformat.split.maxsize.

Domyślny schemat partycjonowania jest po prostu żaden, ponieważ partycjonowanie nie ma zastosowania do wszystkich RDD. W przypadku operacji wymagających partycjonowania na domyślnej metodzie PairwiseRDD (aggregateByKey, reduceByKey) jest używanie partycji hash.

+0

OK !! Dziękuję za wyjaśnienia. Sprawdziłem, czy dane są krojone za pomocą indeksów w późniejszym przypadku (lub obliczaj początek i koniec, dzieląc go na liczbę partycji) –

Powiązane problemy