2017-07-30 17 views
6

Mam następującą ramkę danych, a moim zamiarem jest znalezienie wszystkich identyfikatorów, które mają różne UŻYCIE ale ten sam TYP.multidplyr i group_by() i filter()

ID <- rep(1:4, each=3) 
USAGE <- c("private","private","private","private", 
"taxi","private","taxi","taxi","taxi","taxi","private","taxi") 
TYPE <- c("VW","VW","VW","VW","MER","VW","VW","VW","VW","VW","VW","VW") 
df <- data.frame(ID,USAGE,TYPE) 

Jeśli biegnę

df %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1) 

uzyskać zamierzony efekt. Ale moja oryginalna ramka danych ma> 2 miliony wierszy. Dlatego chciałbym użyć wszystkich moich rdzeni podczas uruchamiania tej operacji.

Próbowałem tego kodu z multidplyr:

f1 <- partition(df, ID) 
f2 <- f1 %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1) 
f3 <- collect(f2) 

Ale wtedy pojawia się następujący komunikat:

Warning message: group_indices_.grouped_df ignores extra arguments 

po

f1 <- partition(df, ID) 

i

Error in checkForRemoteErrors(lapply(cl, recvResult)) : 
    4 nodes produced errors; first error: Evaluation error: object 'f1' not found. 

po

f2 <- f1%>% group_by(ID, TYPE) %>% filter(f1, n_distinct(USAGE)>1) 

Jaki byłby poprawny sposób wdrożyć całą operację do multidplyr? Wielkie dzięki.

Odpowiedz

2

Powinieneś uwzględnić wszystkie zmienne grupujące w swoim połączeniu z numerem partition(). W ten sposób każdy rdzeń ma wszystkie dane potrzebne do przeprowadzenia obliczeń dla danej grupy.

library(tidyverse) 
library(multidplyr) 

fast <- df %>% 
    partition(ID, TYPE) %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) %>% 
    collect() 

Weryfikacja

Nadal będziesz dostać ostrzeżenie o group_indices, ale wyniki są takie same jak oryginalne dplyr metody.

slow <- df %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) 

fast == slow 
     ID USAGE TYPE 
#[1,] TRUE TRUE TRUE 
#[2,] TRUE TRUE TRUE 
#[3,] TRUE TRUE TRUE 

Benchmarking

Teraz najważniejsze pytanie: czy jest to szybciej? Definiowanie cluster pozwala nam upewnić się, że używamy wszystkich rdzeni.

library(microbenchmark) 
library(parallel) 

cluster <- create_cluster(cores = detectCores()) 

fast_func <- function(df) { 
    df %>% 
    partition(ID, TYPE, cluster = cluster) %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) %>% 
    collect() 
} 

slow_func <- function(df) { 
    slow <- df %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) 
} 

microbenchmark(fast_func(df), slow_func(df)) 
# Unit: milliseconds 
# expr  min  lq  mean median  uq  max neval cld 
# fast_func(df) 41.360358 47.529695 55.806609 50.529851 61.459433 133.53045 100 b 
# slow_func(df) 4.717761 6.974897 9.333049 7.796686 8.468594 49.51916 100 a 

Użycie przetwarzania równoległego faktycznie wolniej w tym przypadku. Średni bieg dla fast_func zajmuje 56 milisekund zamiast 9. Wynika to z narzutu związanego z zarządzaniem przepływem danych między klastrami. Ale powiedziałeś, że twoje dane mają miliony wierszy, więc spróbujmy.

# Embiggen the data 
df <- df[rep(seq_len(nrow(df)), each=2000000),] %>% tbl_df() 

microbenchmark(fast_func(df), slow_func(df)) 
# Unit: seconds 
# expr  min  lq  mean median  uq  max neval cld 
# fast_func(df) 43.067089 43.781144 50.754600 49.440864 55.308355 65.499095 10 b 
# slow_func(df) 1.741674 2.550008 3.529607 3.246665 3.983452 7.214484 10 a 

Z gigantycznym zbiorem danych, fast_func jest wciąż wolniejszy! Są chwile, w których równoległe działanie pozwoli zaoszczędzić ogromną ilość czasu, ale prosty zgrupowany filtr niekoniecznie jest jednym z nich.