2017-11-01 70 views
9

Przypuśćmy, że mam bardzo duży plik tekstowy składający się z wielu linii, które chciałbym cofnąć. I nie dbam o ostateczne zamówienie. Plik wejściowy zawiera symbole cyrylicy. Używam multiprocessing do przetwarzania na kilku rdzeniach.Dlaczego funkcja multiprocessing.Lock() nie blokuje udostępnionego zasobu w języku Python?

napisałem taki program:

# task.py 

import multiprocessing as mp 


POOL_NUMBER = 2 


lock_read = mp.Lock() 
lock_write = mp.Lock() 

fi = open('input.txt', 'r') 
fo = open('output.txt', 'w') 

def handle(line): 
    # In the future I want to do 
    # some more complicated operations over the line 
    return line.strip()[::-1] # Reversing 

def target(): 
    while True: 
     try: 
      with lock_read: 
       line = next(fi) 
     except StopIteration: 
      break 

     line = handle(line) 

     with lock_write: 
      print(line, file=fo) 

pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)] 
for p in pool: 
    p.start() 
for p in pool: 
    p.join() 

fi.close() 
fo.close() 

Ten program nie powiedzie się z powodu błędu:

Process Process-2: 
Process Process-1: 
Traceback (most recent call last): 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "task.py", line 22, in target 
    line = next(fi) 
    File "/usr/lib/python3.5/codecs.py", line 321, in decode 
    (result, consumed) = self._buffer_decode(data, self.errors, final) 
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte 
Traceback (most recent call last): 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "task.py", line 22, in target 
    line = next(fi) 
    File "/usr/lib/python3.5/codecs.py", line 321, in decode 
    (result, consumed) = self._buffer_decode(data, self.errors, final) 
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte 

Z drugiej strony, wszystko działa dobrze, jeśli mogę ustawić POOL_NUMBER = 1. Ale to nie ma sensu, jeśli chcę uzyskać całkowitą wydajność.

Dlaczego ten błąd się zdarza? I jak mogę to naprawić?

Używam Python 3.5.2.

I generowane dane za pomocą tego skryptu:

# gen_file.py 

from random import randint 


LENGTH = 100 
SIZE = 100000 


def gen_word(length): 
    return ''.join(
     chr(randint(ord('а'), ord('я'))) 
     for _ in range(length) 
    ) 


if __name__ == "__main__": 
    with open('input.txt', 'w') as f: 
     for _ in range(SIZE): 
      print(gen_word(LENGTH), file=f) 
+0

można odnieść do odpowiedzi na temat [tworzenie pojedynczego pliku z wielu procesów w python] (https : //stackoverflow.com/a/11196615/4662041) – Sheshnath

+0

Czy próbowałeś odczytać ten plik i wydrukować jego dane? jeśli znowu złapiesz ten błąd! to znaczy, że powinieneś go przeczytać, jako tryb binarny z "rb" ... – DRPK

+0

@DRPK Zrobiłem. Jeśli usuniemy 'line = handle (line)' z mojego skryptu, pojawi się ten sam błąd. – Fomalhaut

Odpowiedz

3

Chodzi tu czyta plik z wielu procesów nie działa, jak myślisz, nie można dzielić przedmiotu open między procesami.

Można utworzyć globalną zmienną current_line, a za każdym razem odczytać plik i przetworzyć bieżącą linię, a nie idealnie.

Oto inne podejście, przy zastosowaniu procesów basen i map metoda, jestem iteracji po pliku, a dla każdej linii I enqueue metodę docelowa:

from multiprocessing import Lock 
from multiprocessing import Pool 
import time 
import os 

POOL_NUMBER = 8 

def target(line): 
    # Really need some processing here 
    for _ in range(2**10): 
     pass 
    return line[::-1] 


pool = Pool(processes=POOL_NUMBER) 
os.truncate('output.txt', 0) # Just to make sure we have plan new file 
with open('input.txt', 'r') as fi: 
    t0 = time.time() 
    processed_lines = pool.map(target, fi.readlines()) 
    print('Total time', time.time() - t0) 

    with open('output.txt', 'w') as fo: 
     for processed_line in processed_lines: 
      fo.writelines(processed_line) 

z 8 procesu na moim komputerze: Total time 1.3367934226989746

a z 1 procesu: Total time 4.324501991271973

To działa najlepiej, jeśli funkcja docelowa jest związana CPU, inny ap zalecane byłoby podzielenie pliku na fragmenty POOL_NUMBER i sprawienie, aby każdy proces zapisał przetworzony fragment danych (z blokadą!) do pliku wyjściowego.

Innym podejściem jest stworzenie procesu głównego, który wykonuje zadanie zapisu dla pozostałych procesów, na przykład here.

EDIT

Po skomentować pomyślałem, że nie zmieści się plik do pamięci. W tym celu można po prostu powtórzyć nad obiektem pliku, który będzie czytał wiersz po wierszu w pamięci. Ale nie musimy modyfikować kod trochę duży:

POOL_NUMBER = 8 
CHUNK_SIZE = 50000 

def target(line): 
    # This is not a measurable task, since most of the time wil spent on writing the data 
    # if you have a CPU bound task, this code will make sense 
    return line[::-1] 


pool = Pool(processes=POOL_NUMBER) 
os.truncate('output.txt', 0) # Just to make sure we have plan new file 
processed_lines = [] 

with open('input.txt', 'r') as fi: 
    t0 = time.time() 
    for line in fi: 
     processed_lines.append(pool.apply_async(target, (line,))) # Keep a refernce to this task, but don't 

     if len(processed_lines) == CHUNK_SIZE: 
      with open('output.txt', 'w') as fo: # reading the file line by line 
       for processed_line in processed_lines: 
        fo.writelines(processed_line.get()) 
      processed_lines = [] # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory! 
    print('Total time', time.time() - t0) 

Należy pamiętać, że można grać ze zmienną CHUNK_SIZE kontrolowania ile pamięci używasz. Dla mnie 5000 to około 10 KB dla każdego procesu.

P.S

Myślę, że najlepiej byłoby podzielić duży plik na mniejsze pliki, w ten sposób rozwiązać blokadę odczytu/zapisu w pliku, a także umożliwić skalowanie do przetwarzania (nawet na innej maszynie!)

+0

Dziękujemy za podane rozwiązanie. Ale niestety ma to wielką wadę. Cały plik wejściowy przechodzi do pamięci RAM (ponieważ 'fi.readlines()' działa w ten sposób), a także 'przetworzone_linii' również zajmuje dużo pamięci. Innymi słowy, twój skrypt pochłania zbyt dużo pamięci i byłby niewydajny w przypadku naprawdę dużego 'input.txt' (na przykład 100 milionów linii). Czy można zaktualizować skrypt, aby rozwiązać ten problem? – Fomalhaut

+0

@Fomalhaut Zaktualizowałem moją odpowiedź, weź pętlę :) –

0

Wygląda na to, że line = next(fi) nie jest przetwarzany poprawnie pod różnymi Process.

Możliwe jest ominięcie potrzeby korzystania z next(fi) za pomocą tymczasowego bufora linii wypełnionych głównym wątkiem programu i odczytywanych przy każdym procesie. Do tej roli lepiej użyć multiprocessing.Queue.

Więc to jest mój skrypt:

from time import sleep, time 
import multiprocessing as mp 
import queue 


MAX_QUEUE_SIZE = 1000 
QUEUE_TIMEOUT = 0.000001 
POOL_NUMBER = 4 


def handle(line): 
    sleep(0.00001) # Some processing here that takes time 
    return line.strip()[::-1] 


def target(fout, write_lock, lines_queue): 
    while True: 
     try: 
      line = lines_queue.get(timeout=1.0) 
      line = handle(line) 
      with write_lock: 
       print(line, file=fout) 
       fout.flush() 
     except queue.Empty: 
      break 


if __name__ == "__main__": 
    time_begin = time() 

    with open('output.txt', 'w') as fout: 
     write_lock = mp.Lock() 
     lines_queue = mp.Queue() 

     processes = [ 
      mp.Process(target=target, args=(fout, write_lock, lines_queue)) 
      for _ in range(POOL_NUMBER) 
     ] 
     for p in processes: 
      p.start() 

     with open('input.txt', 'r') as fin: 
      while True: 
       try: 
        while lines_queue.qsize() < MAX_QUEUE_SIZE: 
         line = next(fin) 
         lines_queue.put(line) 
        sleep(QUEUE_TIMEOUT) 
       except StopIteration: 
        break 

     for p in processes: 
      p.join() 

    time_end = time() 
    print("Time:", time_end - time_begin) 

Na moim CPU Mam ten wynik:

POOL_NUMBER = 1 -> Time: 17.877086400985718 
POOL_NUMBER = 2 -> Time: 8.611438989639282 
POOL_NUMBER = 3 -> Time: 6.332395553588867 
POOL_NUMBER = 4 -> Time: 5.321753978729248 
Powiązane problemy