2016-11-22 19 views
5

Obecnie próbuję odczytać duży plik (80 milionów linii), w którym muszę wykonać intensywne obliczeniowo mnożenie macierzy dla każdego wpisu. Po obliczeniu tego chcę wstawić wynik do bazy danych. Ze względu na czasochłonność tego procesu, chcę podzielić plik na wiele rdzeni, aby przyspieszyć ten proces.Python: Plik procesu używający wielu rdzeni

Po zbadaniu znalazłem tę obiecującą próbę, która podzieliła plik na części n.

def file_block(fp, number_of_blocks, block): 
    ''' 
    A generator that splits a file into blocks and iterates 
    over the lines of one of the blocks. 

    ''' 

    assert 0 <= block and block < number_of_blocks 
    assert 0 < number_of_blocks 

    fp.seek(0,2) 
    file_size = fp.tell() 

    ini = file_size * block/number_of_blocks 
    end = file_size * (1 + block)/number_of_blocks 

    if ini <= 0: 
     fp.seek(0) 
    else: 
     fp.seek(ini-1) 
     fp.readline() 

    while fp.tell() < end: 
     yield fp.readline() 

Iteracyjnie, można wywołać funkcję tak:

if __name__ == '__main__': 
    fp = open(filename) 
    number_of_chunks = 4 
    for chunk_number in range(number_of_chunks): 
     print chunk_number, 100 * '=' 
     for line in file_block(fp, number_of_chunks, chunk_number): 
      process(line) 

Chociaż to działa, ja napotkasz problemy, parallelizing to przy użyciu wieloprocesorowe:

fp = open(filename) 
number_of_chunks = 4 
li = [file_block(fp, number_of_chunks, chunk_number) for chunk_number in range(number_of_chunks)] 

p = Pool(cpu_count() - 1) 
p.map(processChunk,li) 

Z istoty błędu, że generatory nie mogą być marynowane.

Podczas gdy rozumiem ten błąd, jest za drogie, aby najpierw przetworzyć cały plik, aby umieścić wszystkie linie na liście.

Ponadto chcę używać bloków linii na rdzeniu jednej iteracji, ponieważ jest bardziej wydajny, aby wstawić wiele wierszy do bazy danych na raz (zamiast 1 za 1, jeśli przy użyciu typowego podejścia mapie)

Thanks za pomoc.

+3

Można wykonać wstępne przejście dużego pliku, aby zanotować wartości współrzędnych wyszukiwania i liczbę linii do odczytania z tej pozycji. Następnie możesz wywołać proces wieloprocesowy za pomocą tych dwóch liczb i pozostawić generator schowany w każdym procesie. – kezzos

+0

Czy można najpierw podzielić plik na cztery pliki? – cwallenpoole

+0

Przenieś otwarcie pliku i kod 'file_block' w każdym wątku, zamiast próbować zainicjować go przed rozpoczęciem wątku. Nie będzie problemu z otwieraniem pliku 4 razy zamiast raz, o ile jest on tylko do odczytu. –

Odpowiedz

3

Zamiast tworzyć generatory z przodu i przekazywać je do każdego wątku, należy pozostawić to kodowi nici.

def processChunk(params): 
    filename, chunk_number, number_of_chunks = params 
    with open(filename, 'r') as fp: 
     for line in file_block(fp, number_of_chunks, chunk_number): 
      process(line) 

li = [(filename, i, number_of_chunks) for i in range(number_of_chunks)] 
p.map(processChunk, li)