7

mam iskrę 2,0 dataframe example o następującej strukturze:Spark DataFrame: czy groupBy po zamówieniu Czy utrzymujesz tę kolejność?

id, hour, count 
id1, 0, 12 
id1, 1, 55 
.. 
id1, 23, 44 
id2, 0, 12 
id2, 1, 89 
.. 
id2, 23, 34 
etc. 

24 zawiera dane dla każdego identyfikatora (po jednej dla każdej porze dnia) i jest sortowana według id godzinę przy użyciu funkcji orderby.

Stworzyłem agregatora groupConcat:

def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable { 
    override def zero: String = "" 

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat) 

    override def merge(b1: String, b2: String) = b1 + b2 

    override def finish(b: String) = b.substring(1) 

    override def bufferEncoder: Encoder[String] = Encoders.STRING 

    override def outputEncoder: Encoder[String] = Encoders.STRING 
    }.toColumn 

Pomaga mi złączyć kolumny do strun do uzyskania tej ostatecznej dataframe:

id, hourly_count 
id1, 12:55:..:44 
id2, 12:89:..:34 
etc. 

Moje pytanie brzmi, czy mogę zrobić example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"), czy to gwarancję czy liczniki godzinowe zostaną prawidłowo zamówione w odpowiednich zasobnikach?

Przeczytałem, że nie jest to konieczne w przypadku RDD (patrz Spark sort by key and then group by to get ordered iterable?), ale może jest inaczej w DataFrames?

Jeśli nie, jak mogę obejść ten problem?

Odpowiedz

3

Krótka odpowiedź brzmi Tak, liczba godzinowa utrzymuje tę samą kolejność.

Aby uogólnić, ważne jest, aby sortować przed grupowaniem. Również sortowanie musi być takie samo jak grupa + kolumna, dla której chcesz sortować.

Przykładem może być tak:

employees 
    .sort("company_id", "department_id", "employee_role") 
    .groupBy("company_id", "department_id") 
    .agg(Aggregators.groupConcat(":", 2) as "count_per_role") 
+1

Czy masz jakieś referencje stwierdzające, że groupBy utrzymuje kolejność? Nie mogłem znaleźć niczego w oficjalnych dokumentach. –

+0

Nie mam oficjalnych dokumentów, ale mam ten artykuł, który wyjaśnia nieco lepszy mechanizm https://bzhangusc.wordpress.com/2015/05/28/groupby-on -dataframe-is-not-the-groupby-on-rdd /. Komentarze również są interesujące. – Interfector

+1

Co ciekawe, nawet sam Sean Owen twierdzi, że zamówienie może nie zostać zachowane (https://issues.apache.org/jira/browse/SPARK-16207?focusedCommentId=15356725&page=com.atlassian.jira.plugin.system.issuetabpanels% 3Acomment-tabpanel # comment-15356725) –

1

Mam przypadek, gdy tak nie jest zawsze: czasami tak, przeważnie nie.

Moja dataframe ma 200 partycje uruchomione Spark 1,6

df_group_sort = data.orderBy(times).groupBy(group_key).agg(
                F.sort_array(F.collect_list(times)), 
                F.collect_list(times) 
                  ) 

sprawdzić kolejność ja porównać wartości powrotów

F.sort_array(F.collect_list(times)) 

i

F.collect_list(times) 

dając przykład (Z lewej strony: sort_array (collect_list()); po prawej: collect_list())

2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000 
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000 
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000 
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000 
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000 
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000 
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000 
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000 
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000 
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000 
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000 
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000 
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000 
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000 

W lewej kolumnie zawsze sortowane, a prawa kolumna składa się tylko z sortowanych bloków. Dla różnych wykonań funkcji take() kolejność bloków w prawej kolumnie jest różna.

+0

Przyjęta odpowiedź stwierdziła, że ​​musisz posortować zarówno według kolumny, którą chcesz posortować, jak i kolumn, z którymi się grupujesz, tj. 'OrderBy (times, group_key) .groupBy (group_key)'. Próbowałeś tego? – Shaido

0

Zamówienie może być lub nie może być takie samo, w zależności od liczby partycji i dystrybucji danych. Możemy rozwiązać za pomocą samego rdd.

Na przykład ::

Uratowałem poniższych przykładowych danych w pliku i załadować go w HDFS.

1,type1,300 
2,type1,100 
3,type2,400 
4,type2,500 
5,type1,400 
6,type3,560 
7,type2,200 
8,type3,800 

i wykonany z poniższego polecenia:

sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect() 

wyjściowa:

Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4)) 

Oznacza to, że możemy pogrupować dane według typu, potem posortowane według ceny, a łączone identyfikatory z "~" jako separator. Powyższe polecenie może być uszkodzony, jak poniżej:

val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3) 

val groupedData=validData.groupBy(_(1)) //group data rdds 

val sortedJoinedData=groupedData.mapValues(x=>{ 
    val list=x.toList 
    val sortedList=list.sortBy(_(2)) 
    val idOnlyList=sortedList.map(_(0)) 
    idOnlyList.mkString("~") 
} 
) 
sortedJoinedData.collect() 

możemy następnie podjąć konkretnej grupy za pomocą polecenia

sortedJoinedData.filter(_._1=="type1").collect() 

wyjściowa:

Array[(String, String)] = Array((type1,2~1~5)) 
Powiązane problemy