2017-08-07 29 views
6

Od sierpnia 2017 r. Panda DataFame.apply() jest nadal ograniczona do pracy z jednym rdzeniem, co oznacza, że ​​wielordzeniowa maszyna będzie marnować większość czasu obliczeniowego po uruchomieniu df.apply(myfunc, axis=1).W jaki sposób można zrównoleglić apply() na Pandas Dataframes wykorzystujących wszystkie rdzenie na jednym komputerze?

W jaki sposób można wykorzystać wszystkie swoje rdzenie do uruchomienia, aby równolegle stosować ramkę danych?

Odpowiedz

7

Najprostszym sposobem jest użycie Dask's map_partitions. Trzeba te importu (trzeba będzie pip install dask):

import pandas as pd 
import dask.dataframe as dd 
from dask.multiprocessing import get 

i składnia jest

data = <your_pandas_dataframe> 
ddata = dd.from_pandas(data, npartitions=30) 

def myfunc(x,y,z, ...): return <whatever> 

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get) 

(wierzę, że 30 jest odpowiednia liczba partycji, jeśli masz 16 rdzeni). Tylko dla kompletności, I timed różnicę na moim komputerze (16 rdzeni):

data = pd.DataFrame() 
data['col1'] = np.random.normal(size = 1500000) 
data['col2'] = np.random.normal(size = 1500000) 

ddata = dd.from_pandas(data, npartitions=30) 
def myfunc(x,y): return y*(x**2+1) 
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1) 
def pandas_apply(): return apply_myfunc_to_DF(data) 
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get) 
def vectorized(): return myfunc(data['col1'], data['col2'] ) 

t_pds = timeit.Timer(lambda: pandas_apply()) 
print(t_pds.timeit(number=1)) 

28,16970546543598

t_dsk = timeit.Timer(lambda: dask_apply()) 
print(t_dsk.timeit(number=1)) 

2,708152851089835

t_vec = timeit.Timer(lambda: vectorized()) 
print(t_vec.timeit(number=1)) 

0,010668013244867325

dając czynnikiem 10 SpeedUp dzieje z pand dotyczą dask stosować na partycje. Oczywiście, jeśli masz funkcję, którą możesz wektoryzować, powinieneś - w tym przypadku funkcja (y*(x**2+1)) jest wektoryzowana w sposób trywialny, ale jest wiele rzeczy, których nie można wektoryzować.

+2

Dobrze wiedzieć, dzięki za wysyłkę. Czy możesz wyjaśnić, dlaczego wybrałeś 30 partycji? Czy wydajność zmienia się po zmianie tej wartości? –

+0

@AndrewL Zakładam, że każda partycja jest obsługiwana przez oddzielny proces, a przy 16 rdzeniach przyjmuję, że 16 lub 32 procesy mogą działać jednocześnie. Próbowałem go, a wydajność wydaje się poprawić do 32 partycji, ale dalsze wzrosty nie mają korzystnego efektu. Zakładam, że na czterordzeniowym komputerze chciałbyś 8 partycji, itp. Zwróć uwagę, że zauważyłem poprawę w zakresie od 16 do 32, więc myślę, że naprawdę chcesz 2x $ NUM_PROCESSORS –

Powiązane problemy