2013-08-16 16 views
7

Załóżmy, że reprezentuję wektor cech za pomocą słownika (dlaczego? Ponieważ wiem, że funkcje są rzadkie, ale więcej o tym później).Jak skutecznie obliczyć wewnętrzny produkt dwóch słowników?

Jak należy wdrożyć wewnętrzną iloczyn dwóch takich słowników (oznaczone A, B)

Próbowałem naiwne podejście:

for k in A: 
    if k in B: 
    sum += A[k] * B[k] 

ale okazuje się być powolne.

trochę więcej szczegółów:

  • używam słownika reprezentować funkcje ponieważ

    1. klawiszy właściwości są ciągi
    2. Istnieje ~ 20K możliwych kluczy
    3. Każdy wektor jest skąpe (powiedzmy, około 1000 niezerowych elementów).
  • Jestem bardzo zainteresowany obliczania parami produkt wewnętrzną N = 2000 różnych słowników (czyli ich jądro liniowy).

+0

można spróbować 'k, v w A.iteritems(): suma + = v * B.get (k, 0)' – val

+0

Może to być oczywiste, ale ja doszedłem do wniosku, że tak czy owak: czy upewniasz się, że "A" ma najmniejszy "len()" dwóch słowników? W zależności od okoliczności może to mieć ogromny wpływ na prędkość. (A ponieważ operacja jest przemienna, nie miałaby żadnego wpływu na odpowiedź.) – Noah

Odpowiedz

1

Oto moja odpowiedź (obserwuję na sugestię przez @ Valentin Clement):

Najpierw owinąć scipy.sparse dok_matrix. Chodzi o to, aby przypisać każdej z możliwych funkcji indeks.

import scipy.sparse as sps 
import numpy as np 

class MSK: 
    # DD is a dict of dict, whose values are of type float. 
    # features - the set of possible features keys 
    def __init__(self, DD, features): 
     self.features = {k: j for (j, k) in enumerate(features)} 
     self.strings = DD.keys() 
     n = len(self.strings) 
     d = len(self.features) 
     self.M = sps.dok_matrix((n, d), dtype=np.float64) 
     for (i, s) in enumerate(self.strings): 
      v = DD[s] 
      for k in v: 
       j = self.features[k] 
       self.M[i, j] = v[k] 

I badanie przy użyciu następującego kodu, gdzie liczba elementów jest 800, wymiarowości jest 800, ale sparsity 200 (dokładnie 200 elementy są non-zero).

np.random.seed(1) 
N = 800 
DD = dict() 
R = range(N) 
for i in xrange(N): 
    DD[i] = dict() 
    S = np.random.permutation(R) 
    S = S[:N/4] 
    for j in S: 
     DD[i][j] = np.random.randn(1)[0] 

K = MSK(DD, R) 
import cProfile 
cProfile.runctx("A = K.M * K.M.T", globals(), locals()) 
print A.todense() 

wyjście jest:

2080520 function calls (2080519 primitive calls) in 2.884 seconds 

Umożliwia przykładowo 3 sekund. Moja naiwna implementacja (która używa instrukcji @ Joowani) zajmuje około 19 sekund.

(MSK oznacza MatrixSparseKeys)

+0

FYI, zastępując dok_matrix() przez lil_matrix(), możemy uzyskać jeszcze lepszą wydajność. –

6

Nie jestem pewien szybciej, ale tutaj jest inne podejście:

keys = A.viewkeys() & B.viewkeys() 
the_sum = sum(a[k] * b[k] for k in keys) 
+2

Możesz użyć 'A.keys() i B.keys()' na Pythonie 3 i 'A.viewkeys() i B. viewkeys() 'na Pythonie 2.7. –

+0

W rzeczywistości 'set (A) .intersection (B)' pojawia się szybciej w niektórych benchmarkach właśnie zrobiłem. –

1

Należy spróbować użyć namedtuples zamiast dict.

from collections import namedtuple 
A = dict 
B = dict 
_A = namedtuple('_A', A.keys()) 
_B = namedtuple('_B', B.keys()) 
DictA = _A(**A) 
DictB = _B(**B) 

, a następnie użyj ich jako dyktanda. Więcej informacji na temat namedtuples tutaj: What are "named tuples" in Python?

+0

W jaki sposób przyspieszy to obliczenia? –

+0

faktycznie nazwane krotki są szybsze i wydajniejsze niż dyktują (http://stackoverflow.com/questions/1336791/dictionary-vs-object-which-is-more-efficient-and-why), używając tego zamiast dyktowania mogą poprawić szybkość obliczeń –

+0

Jeśli spojrzysz na pola po nazwie, to nadal jest wyszukiwanie słownika pod maską. –

7

Hmm, wydaje się Twoje podejście jest rzeczywiście najlepszy dla gęstych wektorów:

>>> # Eric's answer 
>>> timeit.timeit('sum([A[k]*B[k] for k in set(A.keys()) & set(B.keys())])', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 
0.4360210521285808 

>>> # My comment 
>>> timeit.timeit('for k,v in A.iteritems(): sum += v*B.get(k,0)', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100));sum=0', number=10000) 
0.4082838999682963 

# My comment, more compact 
>>> timeit.timeit('sum(v*B.get(k,0) for k,v in A.iteritems())', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 
0.38053266868496394 

>>> #Your approach 
>>> timeit.timeit('for k in A: sum += A[k]*B[k] if k in B else 0.', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100));sum=0', number=10000) 
0.35574231962510794 

>>> # Your approach, more compact 
>>> timeit.timeit('sum(A[k]*B[k] for k in A if k in B)', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 
0.3400850549682559 

Dla rzadszy tych, odpowiedź Erica działa lepiej, ale twój jest wciąż najszybciej:

# Mine 
>>> timeit.timeit('sum(v*B.get(k,0) for k,v in A.iteritems())', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 
0.1390782696843189 

# Eric's 
>>> timeit.timeit('sum([A[k]*B[k] for k in set(A.keys()) & set(B.keys())])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 
0.11702822992151596 

# Yours 
>>> timeit.timeit('sum(A[k]*B[k] for k in A if k in B)', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 
0.07878250570843193 

EDIT

Po bawić f lub nieco, wydaje się, że sum([x for x ...]) jest znacznie szybszy niż sum(x for x in ...). Rebenchmarking z tym i uwagą Janne za pomocą klawiszy w odpowiedzi Erica, twoje jest nadal na górze (z Joowani dając lekką poprawę):

>>> timeit.timeit('sum([v*B.get(k,0) for k,v in A.items()])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
1.1604375791416714 
>>> timeit.timeit('sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
0.9234189571552633 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
0.5411289579401455 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
0.5198972138696263 

skalowanie do bardzo dużych rozmiarach, widać dokładnie taki sam wzór:

>>> #Mine 
>>> timeit.timeit('sum([v*B.get(k,0) for k,v in A.iteritems()])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
45.328807250833506 

>>> #Eric's 
>>> timeit.timeit('sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
28.042937058640973 

>>> #Yours 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
16.55080344861699 

>>> #Joowani's 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
15.485236119691308 

Myślę, że sztuczka Joowan nie poprawia tu znacznie, ponieważ wektory są mniej więcej tej samej wielkości, ale w zależności od twojego problemu (jeśli niektóre wektory są śmiesznie mniejsze od innych), może to być bardziej znaczące ...

Edycja ponownie

Ups, wydaje się, że powinienem podjęły kolejną kawę przed wysłaniem ... Jak Eric wskazał (choć zupełnie nie zauważył ...), definiowanie tablicy w setup utrzymuje ją na to samo dla wszystkich prób, co nie jest najlepszym sposobem na porównanie. z właściwego losowych wektorów testowanego wyniki nie różnią się znacząco, ale w trosce o kompletność:

>>> timeit.timeit('mine(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
6.294158102577967 
>>> timeit.timeit('erics(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
6.068052507449011 
>>> timeit.timeit('yours(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
5.745110704570834 
>>> timeit.timeit('joowanis(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
5.737499445367575 

Aby skalować:

>>> timeit.timeit('mine(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
5.0510995368395015 
>>> timeit.timeit('erics(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
4.350612399185138 
>>> timeit.timeit('yours(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
4.15619379016789 
>>> timeit.timeit('joowanis(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
4.185129374341159 

myślę Najważniejsze jest to, że nie można oczekiwać znacznego przyspieszenia sprytnie zmieniając swoje wyrażenia dla tego rodzaju rzeczy ... Może mógłbyś spróbować zrobić numeryczną część w C/Cython lub używając pakietu Scipy's Sparse?

+0

Właśnie zaktualizowałem kopalnię z 'set (d.keys())' do 'd.viewkeys()', co może dać zwiększenie prędkości – Eric

+0

Ponadto, nie jestem pewien, czy ten test jest znaczący, jeśli początkowy słownik jest inny dla każdy fragment kodu: – Eric

+0

To prawda, ale sądzę, że może to nam dać przybliżone przybliżenie szybkości każdego z nich. – val

2

W przypadku, gdy A jest znacznie dłuższy niż B, może to pomóc?

if len(A) > len(B): 
    A, B = B, A 

for k in A: 
    if k in B: 
     the_sum += A[k] * B[k] 
+0

Wow, to była szybka edycja Eric, dzięki! – Joohwan

Powiązane problemy