2015-07-22 12 views
20

Opis problemu:wspólna pamięć równolegle foreach w R

mam duży matrycy c, ładowane do pamięci RAM. Moim celem jest równoległe przetwarzanie, aby mieć do niego dostęp tylko do odczytu. Jednak kiedy tworzę połączenia albo używam doSNOW, doMPI, big.matrix, itd., Ilość użytych RAMów dramatycznie wzrasta.

Czy istnieje sposób na prawidłowe utworzenie pamięci współużytkowanej, z której mogą odczytać wszystkie procesy, bez tworzenia lokalnej kopii wszystkich danych?

Przykład:

libs<-function(libraries){# Installs missing libraries and then load them 
    for (lib in libraries){ 
    if(!is.element(lib, .packages(all.available = TRUE))) { 
     install.packages(lib) 
    } 
    library(lib,character.only = TRUE) 
    } 
} 

libra<-list("foreach","parallel","doSNOW","bigmemory") 
libs(libra) 

#create a matrix of size 1GB aproximatelly 
c<-matrix(runif(10000^2),10000,10000) 
#convert it to bigmatrix 
x<-as.big.matrix(c) 
# get a description of the matrix 
mdesc <- describe(x) 
# Create the required connections  
cl <- makeCluster(detectCores()) 
registerDoSNOW(cl) 
out<-foreach(linID = 1:10, .combine=c) %dopar% { 
    #load bigmemory 
    require(bigmemory) 
    # attach the matrix via shared memory?? 
    m <- attach.big.matrix(mdesc) 
    #dummy expression to test data aquisition 
    c<-m[1,1] 
} 
closeAllConnections() 

RAM: Ram usage during <code>foreach</code> na powyższym zdjęciu, może się okazać, że pamięć zwiększa sporo aż foreach końcach i jest zwolniona.

+1

Mam dokładnie ten sam problem i jestem bardzo zainteresowany rozwiązaniem. Zauważyłem również, że kopie są tworzone zamiast współdzielonej pamięci. – NoBackingDown

Odpowiedz

11

Myślę, że rozwiązanie tego problemu można znaleźć w poście Steve'a Westona, autora pakietu foreach, here. Tam on stwierdza:

Pakiet doParallel automatycznie wyeksportuje zmienne do pracowników, do których odwołuje się w pętli foreach.

Więc myślę, że problemem jest to, że w kodzie twój wielki matrix c odwołuje się cesji c<-m[1,1]. Po prostu spróbuj xyz <- m[1,1] i zobacz, co się stanie.

Oto przykład z plikiem oparciem big.matrix:

#create a matrix of size 1GB aproximatelly 
n <- 10000 
m <- 10000 
c <- matrix(runif(n*m),n,m) 
#convert it to bigmatrix 
x <- as.big.matrix(x = c, type = "double", 
       separated = FALSE, 
       backingfile = "example.bin", 
       descriptorfile = "example.desc") 
# get a description of the matrix 
mdesc <- describe(x) 
# Create the required connections  
cl <- makeCluster(detectCores()) 
registerDoSNOW(cl) 
## 1) No referencing 
out <- foreach(linID = 1:4, .combine=c) %dopar% { 
    t <- attach.big.matrix("example.desc") 
    for (i in seq_len(30L)) { 
    for (j in seq_len(m)) { 
     y <- t[i,j] 
    } 
    } 
    return(0L) 
} 

enter image description here

## 2) Referencing 
out <- foreach(linID = 1:4, .combine=c) %dopar% { 
    invisible(c) ## c is referenced and thus exported to workers 
    t <- attach.big.matrix("example.desc") 
    for (i in seq_len(30L)) { 
    for (j in seq_len(m)) { 
     y <- t[i,j] 
    } 
    } 
    return(0L) 
} 
closeAllConnections() 

enter image description here

+0

Nie widziałem, że 'c <-m [1,1]' faktycznie ładuje 'c', ponieważ spodziewałem się, że wygeneruje on nową zmienną zamiast, dobrze ją czyta. Oznacza to, że faktycznie pamięć jest udostępniana i tracę czas na zbadanie różnych opcji z powodu "c". Dziękuję bardzo za pomoc! PS: Nie sądzę, że kod poniżej niewidoczny jest kiedykolwiek wykonywany. – Stanislav

+1

@Stanislav Zgadzam się, że to trochę nieoczekiwane zachowanie. Jeśli moja odpowiedź rozwiąże Twój problem, byłbym zadowolony, gdybyś zaakceptował to. – NoBackingDown

+0

@Stanislav Ta odpowiedź jest poprawna, musisz mieć pewność, co faktycznie eksportujesz do pracowników. Zasadniczo dobrą praktyką jest, aby nazwy zmiennych nie były takie same wewnątrz i na zewnątrz pętli, chyba że faktycznie modyfikuje się ten sam obiekt. – cdeterman

3

Ewentualnie, jeśli jesteś na Linux/Mac i chcesz krowę wspólne pamięć, użyj wideł. Najpierw załaduj wszystkie dane do głównego wątku, a następnie uruchom wątki robocze (widły) o ogólnej funkcji mcparallel z pakietu parallel.

Można zbierać swoje wyniki z mccollect lub przy wykorzystaniu prawdziwie wspólnej pamięci przy użyciu biblioteki Rdsm coś takiego:

library(parallel) 
library(bigmemory) #for shared variables 
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = 'double') 
shared[1]<-1 #Init shared memory with some number 

job<-mcparallel({shared[1]<-23}) #...change it in another forked thread 
shared[1,1] #...and confirm that it gets changed 
# [1] 23 

można potwierdzić, że wartość naprawdę jest aktualizowany w backgruound, jeśli opóźnienie zapisu:

fn<-function() 
{ 
    Sys.sleep(1) #One second delay 
    shared[1]<-11 
} 

job<-mcparallel(fn()) 
shared[1] #Execute immediately after last command 
# [1] 23 
aaa[1,1] #Execute after one second 
# [1] 11 
mccollect() #To destroy all forked processes (and possibly collect their output) 

Aby sterować za współbieżność i uniknąć warunki wyścigowe używać blokad:

library(synchronicity) #for locks 
m<-boost.mutex() #Lets create a mutex "m" 

bad.incr<-function() #This function doesn't protect the shared resource with locks: 
{ 
    a<-shared[1] 
    Sys.sleep(1) 
    shared[1]<-a+1 
} 

good.incr<-function() 
{ 
    lock(m) 
    a<-shared[1] 
    Sys.sleep(1) 
    shared[1]<-a+1 
    unlock(m) 
} 

shared[1]<-1 
for (i in 1:5) job<-mcparallel(bad.incr()) 
shared[1] #You can verify, that the value didn't get increased 5 times due to race conditions 

mccollect() #To clear all threads, not to get the values 
shared[1]<-1 
for (i in 1:5) job<-mcparallel(good.incr()) 
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6 
#[1] 6 

mccollect() 

Edit:

I uproszczone Zależności trochę wymieniając Rdsm::mgrmakevar do bigmemory::big.matrix.mgrmakevar mimo wszystko wewnętrznie dzwoni pod numer big.matrix i nie potrzebujemy niczego więcej.