2009-12-01 9 views
15

Prowadzę wiele "zawstydzająco równoległych" projektów, które chciałabym zrównoważyć z modułem multiprocessing. Jednak często wymagają one czytania dużych plików (większych niż 2 GB), przetwarzania ich wiersz po wierszu, wykonywania podstawowych obliczeń, a następnie zapisywania wyników. Jaki jest najlepszy sposób na podzielenie pliku i przetworzenie go za pomocą modułu wieloprocesorowego Pythona? Czy należy używać Queue lub JoinableQueue w multiprocessing? A może sam moduł Queue? Lub, czy powinienem mapować plik iterowalny nad pulą procesów przy użyciu multiprocessing? Eksperymentowałem z tymi podejściami, ale koszty ogólne są ogromne w dystrybucji danych po linii. Oparłem się na lekkim projekcie filtrów rurowych, używając cat file | process1 --out-file out1 --num-processes 2 | process2 --out-file out2, który przekazuje pewien procent danych wejściowych pierwszego procesu bezpośrednio do drugiego wejścia (patrz this post), ale chciałbym mieć rozwiązanie zawarte w całości w Pythonie.Jaki jest najlepszy sposób na dzielenie dużych plików w Pythonie na potrzeby wieloprocesowości?

Co zaskakujące, dokumentacja w języku Python nie sugeruje kanonicznego sposobu wykonania tej czynności (pomimo obszernej sekcji dotyczącej wytycznych programowania w dokumentacji multiprocessing).

Dzięki, Vince

Informacje dodatkowe: Czas przetwarzania jednej linii zmienia. Niektóre problemy są szybkie i ledwo nie są związane z I/O, niektóre są związane z CPU. Zależne od CPU, niezależne zadania uzyskają pozycję zrównoleglania, tak że nawet nieefektywne sposoby przypisywania danych do funkcji przetwarzania będą nadal korzystne pod względem czasu zegara ściennego.

Dobrym przykładem jest skrypt, który wyodrębnia pola z linii, sprawdza różne flagi bitowe i zapisuje linie z pewnymi flagami do nowego pliku w całkowicie nowym formacie. To wydaje się być problemem związanym z wejściem/wyjściem, ale kiedy uruchomiłem go z moją tanią, współbieżną wersją z rurkami, było to około 20% szybsze. Po uruchomieniu go z pulą i mapą lub kolejką w trybie multiprocessing jest zawsze o ponad 100% wolniejszy.

+0

To jest dla mnie wielki zarzut o niezbyt eleganckim języku skryptowym - proste obliczenia równoczesne to ból bez wątków . Oczywiście, możesz to zrobić, ale niektóre zadania są o wiele prostsze w przypadku modelu z wątkami i zamkami. –

+0

Gwintowana wersja "równoległa" (chyba) nigdy nie będzie szybsza, z wyjątkiem faktu, że wątki są szybciej tworzone niż procesy.GIL jest ogromnym wąskim gardłem dla programów wielowątkowych związanych z procesorem. Ponadto nie ma żadnych zmiennych obiektów, które muszą być współdzielone między procesami/wątkami, więc wielowątkowość nie jest tak naprawdę potrzebna w przypadku przetwarzania wieloprocesowego. – Vince

+0

@Jeśli tak, to wszystko zależy od dokładnej okoliczności. W twoim, może nigdy nie być. W innych może. Chodzi mi o to, że dla większości współbieżnych operacji, które musiałem wykonać (w C), rzadko istniało uzasadnienie dla używania dodatkowego, potrzebnego do prawidłowego IPC, gdy wątki i blokady dają o wiele prostszy model. W przypadku większych problemów, które wymagają skalowania na różnych komputerach, jest to inna historia. –

Odpowiedz

8

Jedna z najlepszych architektur jest już częścią systemu operacyjnego Linux. Nie są wymagane żadne specjalne biblioteki.

Chcesz zaprojektować "fan-out".

  1. Program "główny" tworzy wiele podprocesów połączonych rurami.

  2. Główny program odczytuje plik, zapisując linie do rur, wykonując minimalne filtrowanie wymagane do obsłużenia linii odpowiednimi podprocesami.

Każdy podproces powinien być prawdopodobnie potokiem różnych procesów, które odczytują i zapisują ze standardowego wejścia.

Nie potrzebujesz struktury danych kolejki, to jest dokładnie to, czym jest potok w pamięci - kolejka bajtów między dwoma współbieżnymi procesami.

+0

Spojrzę na wdrożenie tego podejścia w Pythonie, ponieważ moduł wieloprocesowy ma rury. Jak widzisz w oryginalnym poście, używam tego podejścia w powłoce, z wielkim sukcesem. Naiwnie myślałem, że nigdy nie uda mi się osiągnąć paralelizmu danych z rurami. – Vince

+0

Proste rurki muszlowe to idealna forma równoległości. Właśnie to Linux robi najlepiej. Często jest to idealne rozwiązanie. –

+0

Oto wynik: http://github.com/vsbuffalo i wyniki na komputerze z 32 procesorami http://paste.pocoo.org/show/154252/. Dzięki S.Lott! – Vince

1

To zależy w dużym stopniu od formatu pliku.

Czy ma sens podzielenie go w dowolnym miejscu? Czy musisz podzielić go na nową linię? Czy musisz się upewnić, że podzielisz go na końcu definicji obiektu?

Zamiast dzielić plik, należy użyć wielu czytników tego samego pliku, używając os.lseek, aby przeskoczyć do odpowiedniej części pliku.

Aktualizacja: Dodano plakat, że chce podzielić na nowe linie. Następnie proponuję następujące:

Załóżmy, że masz 4 procesy. Wtedy prostym rozwiązaniem jest os.lseek do 0%, 25%, 50% i 75% pliku, a następnie odczytaj bajty, aż trafisz na pierwszą nową linię. To jest twój punkt wyjścia dla każdego procesu. Nie musisz dzielić pliku, aby to zrobić, po prostu szukaj odpowiedniej lokalizacji w dużym pliku w każdym procesie i zacznij czytać stamtąd.

+0

Podziel na newline. – Vince

+0

Zaktualizowałem komentarz, aby wyjaśnić, jak używać os.lseek w twoim przypadku. –

4

Nie wspomnisz, w jaki sposób przetwarzasz linie; prawdopodobnie najważniejsza informacja.

Czy każda linia jest niezależna? Czy obliczenia zależą od jednego wiersza przed następnym? Czy muszą być przetwarzane w blokach? Jak długo trwa przetwarzanie dla każdej linii? Czy istnieje etap przetwarzania, który musi zawierać "wszystkie" dane na końcu? A może wyniki pośrednie zostaną wyrzucone i utrzymywana będzie tylko całkowita suma? Czy plik może być początkowo podzielony przez podzielenie rozmiaru pliku przez liczbę wątków? A może rośnie w miarę przetwarzania?

Jeśli linie są niezależne, a plik nie rośnie, jedyną potrzebną koordynacją jest wydzielenie "początkowych adresów" i "długości" każdemu z pracowników; mogą niezależnie otwierać i szukać w aktach, a następnie po prostu koordynować ich wyniki; być może przez czekanie, aż wyniki N powrócą do kolejki.

Jeśli linie nie są niezależne, odpowiedź będzie bardzo zależała od struktury pliku.

+0

Niestety, każdy plik jest niezależny, nic nie jest zależne, nic nie jest udostępniane (z wyjątkiem opcjonalnych liczników). Klasycznym przykładem jest funkcja pobierająca linię, decyduje, czy chce ją zachować, czy nie, wykonuje drobne obliczenia na utrzymywanych liniach, formatuje te obliczenia, a następnie zapisuje te linie do pliku dla tego procesu. Wszystkie pliki można następnie połączyć w osobny proces. Odnośnie do wyszukiwania plików - wyszukiwanie odbywa się za pomocą liczby bajtów w języku Python, co może albo wprowadzić złożoność w dopasowywaniu linii do bajtów. Czy warto? – Vince

+0

PS: plik nie rośnie, wyniki pośrednie są dołączane do pliku (jeden plik na proces, aby zapobiec konfliktom zapisu we/wy). To naprawdę jest kłopotliwie równoległy problem. – Vince

1

Wiem, że konkretnie zapytałeś o Python, ale zachęcam Cię do spojrzenia na Hadoop (http://hadoop.apache.org/): implementuje algorytm Map and Reduce, który został specjalnie zaprojektowany, aby rozwiązać ten problem.

Powodzenia

+0

Nie masz jeszcze pojęcia, czy został zaprojektowany dla tego problemu. Jak zauważyli inni, nie wiemy wystarczająco dużo na temat problemu. –

+1

@ San Jacinto ... Wydaje mi się, że czytam: "często wymagają czytania dużych plików (większych niż 2 gb), przetwarzania ich wiersz po wierszu, wykonywania podstawowych obliczeń, a następnie pisania wyników" to jest dla mnie wystarczająco dobre, ponieważ nie jestem podając konkretny szczegół wdrożenia, ale ogólną obserwację. Wyluzuj, chłopie. – Escualo

+0

Użyłem hadoop i map/reduce before. Uwielbiam oba, a map/reduce można (i jest nieco) stosować tutaj. Hadoop rozwiązuje niektóre problemy z I/O z ich HFS (IIRC). Pytam o krok przed mapą/zmniejsz: jakie podejście należy podjąć, aby podzielić plik, aby funkcja była na nim mapowana. Kolejka? Plik jest iterowalny? – Vince

0

Jeżeli czas pracy jest długi, zamiast czytać każdy proces jego następną linię przez Queue mają procesy czytać partii linii. W ten sposób narzut jest amortyzowany w kilku liniach (na przykład w tysiącach lub więcej).

6

Jedną z strategii jest przypisanie każdemu pracownikowi przesunięcia, więc jeśli masz osiem procesów roboczych, które przypiszesz, to numery od 0 do 7. Pracownik numer 0 czyta pierwszy rekord, następnie pomija 7 i przechodzi do przetwarzania ósmego rekordu itp. , numer 1 pracownika odczytuje drugi rekord, a następnie pomija 7 i przetwarza 9. rekord ...

Istnieje wiele korzyści tego schematu. Nie ma znaczenia, jak duży jest plik, a praca jest zawsze dzielona równomiernie, procesy na tym samym komputerze będą przetwarzane z mniej więcej taką samą szybkością i będą używać tych samych obszarów bufora, aby nie ponosić nadmiernego obciążenia we/wy. Dopóki plik nie zostanie zaktualizowany, możesz ponownie uruchomić pojedyncze wątki w celu odzyskania po niepowodzeniach.

1

Fredrik Lundh to Some Notes on Tim Bray's Wide Finder Benchmark to interesująca lektura, o bardzo podobnym przypadku użycia, z wieloma dobrymi radami. Różni inni autorzy również zaimplementowali to samo, niektórzy są powiązani z tego artykułu, ale możesz spróbować google'ować do "python wide finder" lub czegoś, aby znaleźć coś więcej. (istniało również rozwiązanie oparte na module multiprocessing, ale to już nie jest dostępne)

+0

Szkoda, że ​​źródła wielu zgłoszeń trudno jest wyśledzić. Istnieje wiele przydatnych technik, które można nauczyć się od wpisów do wideofinder/widefinder2. – jmanning2k

Powiązane problemy