2015-10-15 19 views
5

Wykonuję kod równoległy w R za pomocą pakietu parallel i mclapply, który przyjmuje wstępnie określoną liczbę rdzeni jako parametr.Zmiana liczby rdzeni podczas przetwarzania równoległego w R

Jeśli mam pracę, która będzie działać przez kilka dni, czy istnieje sposób, aby napisać (lub zawinąć) moją funkcję mclapply, aby używać mniejszej liczby rdzeni w godzinach szczytu serwera i zwiększyć wykorzystanie w godzinach szczytu?

+2

Chyba trzeba dodać warstwę do mclapply. Zasadniczo podziel listę zadań w partiach. Każda partia zostanie wykonana przez mclapply. Przed uruchomieniem nowej partii można sprawdzić stan serwera, a następnie wybrać odpowiednią liczbę rdzeni do uruchomienia następnej partii. –

+0

@KarlForner Tak, tak też bym to zrobił. – cryo111

Odpowiedz

3

Domyślam się, że najłatwiejszym rozwiązaniem byłoby podzielenie danych na mniejsze porcje i uruchomienie mclapply oddzielnie na tych porcjach. Następnie możesz ustawić liczbę rdzeni dla każdego przebiegu mclapply. Działa to prawdopodobnie lepiej przy obliczeniach, które mają małą wariancję w.r.t. czas pracy.

Stworzyłem mały szybki i brzydka makiety, jak to może wyglądać następująco:

library(parallel) 
library(lubridate) 

#you would have to come up with your own function 
#for the number of cores to be used 
determine_cores=function(hh) { 
    #hh will be the hour of the day 
    if (hh>17|hh<9) { 
    return(4) 
    } else { 
    return(2) 
    } 
} 

#prepare some sample data 
set.seed(1234) 
myData=lapply(seq(1e-1,1,1e-1),function(x) rnorm(1e7,0,x)) 

#calculate SD with mclapply WITHOUT splitting of data into chunks 
#we need this for comparison 
compRes=mclapply(myData,function(x) sd(x),mc.cores=4) 

set.seed(1234) 
#this will hold the results of the separate mclapply calls 
res=list() 
#starting position within myData 
chunk_start_pos=1 
calc_flag=TRUE 

while(calc_flag) { 
    #use the function defined above to determine how many cores we may use 
    core_num=determine_cores(lubridate::hour(Sys.time())) 
    #determine end position of data chunk 
    chunk_end_pos=chunk_start_pos+core_num-1 
    if (chunk_end_pos>=length(myData)) { 
    chunk_end_pos=length(myData) 
    calc_flag=FALSE 
    } 
    message("Calculating elements ",chunk_start_pos," to ",chunk_end_pos) 
    #mclapply call on data chunk 
    #store data in res 
    res[[length(res)+1]]=mclapply(myData[chunk_start_pos:(chunk_start_pos+core_num-1)], 
           function(x) sd(x), 
           mc.preschedule=FALSE, 
           mc.cores=core_num) 
    #calculate new start position 
    chunk_start_pos=chunk_start_pos+core_num 
} 

#let's compare the results 
all.equal(compRes,unlist(res,recursive=FALSE)) 
#TRUE 
Powiązane problemy