2013-04-25 13 views
7

Mam program w Pythonie, który w zasadzie wykonuje następujące operacje:dane zapisywać na dysk w Pythonie jako proces w tle

for j in xrange(200): 
    # 1) Compute a bunch of data 
    # 2) Write data to disk 

1) trwa około 2-5 minut
2) wykonuje około ~ 1 minuta

Należy pamiętać, że w pamięci znajduje się za dużo danych.

Idealnie chciałbym zapisać dane na dysku w sposób pozwalający uniknąć bezczynności procesora. Czy to możliwe w Pythonie? Dzięki!

Odpowiedz

9

można spróbować using multiple processes takiego:

import multiprocessing as mp 

def compute(j): 
    # compute a bunch of data 
    return data 

def write(data): 
    # write data to disk 

if __name__ == '__main__': 
    pool = mp.Pool() 
    for j in xrange(200): 
     pool.apply_async(compute, args=(j,), callback=write) 
    pool.close() 
    pool.join() 

pool = mp.Pool() utworzy pulę procesów roboczych. Domyślnie liczba pracowników równa jest liczbie rdzeni procesora posiadanych przez urządzenie.

Każde wywołanie pool.apply_async kolejkuje zadanie uruchamiane przez pracownika w puli procesów roboczych. Gdy pracownik jest dostępny, działa pod numerem compute(j). Gdy pracownik zwraca wartość, data, wątek w procesie głównym uruchamia funkcję oddzwaniania write(data), przy czym data jest danymi zwracanymi przez proces roboczy.

Niektóre Ostrzeżenia:

  • Dane musi być picklable, ponieważ jest przekazywane z procesu pracownika z powrotem do głównego procesu poprzez Queue.
  • Nie ma gwarancji, że kolejność wykonywania zadań przez użytkownika jest taka sama jak kolejność, w jakiej zadania zostały wysłane do puli . Tak więc kolejność zapisywania danych na dysku może nie odpowiadać odpowiadającej j w zakresie od 0 do 199. Jednym ze sposobów obejścia tego problemu byłoby zapisanie danych do bazy danych sqlite (lub innego rodzaju) z j jako jednej pól danych. Następnie, gdy chcesz przeczytać dane w kolejności, możesz SELECT * FROM table ORDER BY j.
  • Korzystanie z wielu procesów zwiększa ilość wymaganej pamięci , ponieważ dane są generowane przez procesy robocze, a dane oczekujące na zapis na dysk gromadzą się w kolejce. Ty może być w stanie zmniejszyć ilość pamięci wymaganej przy użyciu tablic NumPy . Jeśli nie jest to możliwe, to może trzeba zmniejszyć liczbę procesów:

    pool = mp.Pool(processes=1) 
    

    który stworzy jeden proces roboczy (aby uruchomić compute), pozostawiając główny proces uruchomić write. Ponieważ compute trwa dłużej niż write, kolejka nie zostanie skopiowana z więcej niż jednym fragmentem danych do zapisania na dysku.Jednak nadal potrzebna jest wystarczająca ilość pamięci do obliczenia na jednym kawałku danych podczas zapisywania innego fragmentu danych na dysku.

    Jeśli nie masz wystarczającej ilości pamięci do jednoczesnego wykonania obu zadań, nie masz wyboru - oryginalny kod, który działa kolejno compute i write, jest jedynym sposobem.

+0

Dlaczego procesy użytkowania i zapisu do pliku jest po prostu IO i nie wpływają na GIL? –

+1

2-5 minut jest wydawane na obliczenia, w porównaniu do tylko 1 minuty IO. Jeśli maszyna ma wiele rdzeni, można przyspieszyć obliczenia, rozkładając pracę między rdzeniami. – unutbu

+0

W porządku, masz rację. Przepraszam. –

3

Można użyć czegoś takiego jak Queue.Queue (moduł jest tutaj: Queue) i threading.Thread (lub threading.start_new_thread jeśli chcesz tylko funkcję), moduł jest tutaj: threading - Jako zapisu pliku nie obciąża CPU i używać więcej IO. (i GIL na to nie wpływa).

2

Prostym sposobem byłoby użycie tylko wątków i kolejki. Z drugiej strony, jeśli część obliczeniowa nie zależy od stanu globalnej i masz maszynę z wieloma rdzeniami procesora, bardziej efektywny sposób byłoby użyć process pool

from multiprocessing import Pool 

def compute_data(x): 
    return some_calculation_with(x) 

if __name__ == '__main__': 
    pool = Pool(processes=4) # let's say you have quad-core, so start 4 workers 

    with open("output_file","w") as outfile: 
     for calculation_result in pool.imap(compute_data, range(200)): 
     # pool.imap returns results as they come from process pool  
      outfile.write(calculation_result) 
Powiązane problemy