2013-09-24 13 views
6

mam rzadki tablicę a (głównie zera):kompresji tablicy Rzadki pomocą SIMD (AVX2)

unsigned char a[1000000]; 

i am utworzyć tablicę b indeksów niezerową elementów a wykorzystaniem instrukcji SIMD na architekturę Intel x64 z AVX2. Szukam wskazówek, jak zrobić to skutecznie. W szczególności, czy istnieją instrukcje SIMD, aby uzyskać pozycje kolejnych niezerowych elementów w rejestrze SIMD, rozmieszczone w sposób ciągły?

+1

Nie bezpośrednio, ale można "pcmpeqb" przeciw zeru, potem 'pmovmskb', że do normalnego rejestru, i wyodrębnić pierwszy indeks z' bsf' (a następnie drugi i tak dalej, mam nadzieję, że nie za dużo) – harold

+1

Musisz być bardziej konkretny niż "SIMD" - jaką architekturę wybierasz?X86, ARM, PowerPC, POWER i niektóre GPGPU mają różne rozszerzenia SIMD. Również x86 ma wiele rozszerzeń SIMD: MMX, SSE, SSE2, SSE3, SSSE3, SSE4, AVX, AVX2, itp. (Zwróć uwagę, że AVX2 ma instrukcje SIMD, które mogą być przydatne w tym kontekście). –

+0

@Paul R Przepraszam za to. Poprawiłem moje pytanie - AVX2 jest akceptowalny. –

Odpowiedz

0

Chociaż zestaw instrukcji AVX2 ma wiele instrukcji GATHER, ale jego wydajność jest zbyt wolna. I najbardziej skuteczny sposób to zrobić - aby ręcznie przetworzyć tablicę.

1

Jeśli oczekujesz liczba elementów niezerowych być bardzo niska (czyli znacznie mniej niż 1%), następnie można po prostu sprawdzić każdy kawałek 16-bajtowy dla bycia niezerowe:

int mask = _mm_movemask_epi8(_mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
    if (mask != 65535) { 
     //store zero bits of mask with scalar code 
    } 

Jeśli odsetek pozytywnych elementów jest wystarczająco mały, koszt nieprzewidzianych gałęzi i koszt powolnego kodu skalarnego wewnątrz "jeśli" byłby znikomy.


Jeśli chodzi o dobre ogólne rozwiązanie, najpierw należy rozważyć wdrożenie zagęszczania strumienia przez SSE. Usuwa wszystkie elementy z zerowych tablicy bajtów (Idea pochodzi z here):

__m128i shuf [65536]; //must be precomputed 
char cnt [65536]; //must be precomputed 

int compress(const char *src, int len, char *dst) { 
    char *ptr = dst; 
    for (int i = 0; i < len; i += 16) { 
     __m128i reg = _mm_load_si128((__m128i*)&src[i]); 
     __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
     int mask = _mm_movemask_epi8(zeroMask); 
     __m128i compressed = _mm_shuffle_epi8(reg, shuf[mask]); 
     _mm_storeu_si128((__m128i*)ptr, compressed); 
     ptr += cnt[mask]; //alternative: ptr += 16-_mm_popcnt_u32(mask); 
    } 
    return ptr - dst; 
} 

Jak widać, (_mm_shuffle_epi8 + lookup table) może czynić cuda. Nie znam innego sposobu wektoryzacji strukturalnie złożonego kodu, takiego jak kompresja strumienia.


Teraz jedyny pozostały problem z Twoją prośbą to fakt, że chcesz uzyskać indeksy. Każdy indeks musi być przechowywany w 4-bajtowej wartości, więc fragment 16 bajtów wejściowych może wytworzyć do 64 bajtów danych wyjściowych, które nie mieszczą się w jednym rejestrze SSE.

Jednym ze sposobów radzenia sobie z tym jest uczciwe rozpakowanie wyjścia do 64 bajtów. Więc zamieniasz reg na stałą (0,1,2,3,4, ..., 15) w kodzie, następnie rozpakowujesz rejestr SSE do 4 rejestrów i dodajesz rejestr z czterema wartościami i. Wymagałoby to znacznie więcej instrukcji: 6 instrukcji rozpakowywania, 4 dodaje i 3 sklepy (jeden już tam jest). Co do mnie, jest to ogromne obciążenie, zwłaszcza jeśli spodziewasz się mniej niż 25% elementów niezerowych.

Można również ograniczyć liczbę niezerowych bajtów przetwarzanych przez iterację pojedynczej pętli o 4, tak aby jeden rejestr był zawsze wystarczający do wydrukowania. Oto przykładowy kod:

__m128i shufMask [65536]; //must be precomputed 
char srcMove [65536]; //must be precomputed 
char dstMove [65536]; //must be precomputed 

int compress_ids(const char *src, int len, int *dst) { 
    const char *ptrSrc = src; 
    int *ptrDst = dst; 
    __m128i offsets = _mm_setr_epi8(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15); 
    __m128i base = _mm_setzero_si128(); 
    while (ptrSrc < src + len) { 
     __m128i reg = _mm_loadu_si128((__m128i*)ptrSrc); 
     __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
     int mask = _mm_movemask_epi8(zeroMask); 
     __m128i ids8 = _mm_shuffle_epi8(offsets, shufMask[mask]); 
     __m128i ids32 = _mm_unpacklo_epi16(_mm_unpacklo_epi8(ids8, _mm_setzero_si128()), _mm_setzero_si128()); 
     ids32 = _mm_add_epi32(ids32, base); 
     _mm_storeu_si128((__m128i*)ptrDst, ids32); 
     ptrDst += dstMove[mask]; //alternative: ptrDst += min(16-_mm_popcnt_u32(mask), 4); 
     ptrSrc += srcMove[mask]; //no alternative without LUT 
     base = _mm_add_epi32(base, _mm_set1_epi32(dstMove[mask])); 
    } 
    return ptrDst - dst; 
} 

Wadą tego podejścia jest to, że teraz każda kolejna iteracja pętli nie można uruchomić, dopóki linia ptrDst += dstMove[mask]; jest wykonywany na poprzedniej iteracji. Tak więc krytyczna ścieżka dramatycznie wzrosła. Sprzętu hiperwątkowości lub jego ręczna emulacja może usunąć tę karę.


Jak widać, istnieje wiele odmian tej podstawowej idei, z których wszystkie rozwiązują problem z różnym stopniem efektywności. Możesz także zmniejszyć rozmiar LUT, jeśli ci się nie podoba (ponownie, kosztem zmniejszenia wydajności przepustowości).

Tego podejścia nie można w pełni rozszerzyć na szersze rejestry (tj.AVX2 i AVX-512), ale można próbować łączyć instrukcje kilku kolejnych iteracji w jedną instrukcję AVX2 lub AVX-512, co nieco zwiększa przepustowość.

Uwaga: Nie testowałem żadnego kodu (ponieważ wstępne obliczenia LUT wymagają zauważalnego wysiłku).

+0

Byłoby miło zobaczyć, jak twoje podejście LUT porównuje się do mojej [odpowiedzi] (http://stackoverflow.com/a)/41958528/2439725) na podstawie instrukcji Bit Manipulation (BMI1 i BMI2). – wim

2

pięciu metod do obliczania wskaźników na nonzeros są:

  • Semi wektorowy pętla: Załaduj wektor SIMD ze znakami, porównaj z zera i zastosować movemask. Użyj małej pętli skalarnej, jeśli dowolny znak jest niezerowy (również sugerowany przez @stgatilov). Działa to dobrze w przypadku bardzo rzadkich tablic. Funkcja arr2ind_movmsk w poniższym kodzie używa instrukcji BMI1 dla pętli skalarnej.

  • Wektoryzacja: procesory Intel Haswell i nowsze obsługują zestawy instrukcji BMI1 i BMI2. BMI2 zawiera z instrukcją pext (Parallel bits extract, see wikipedia link), , która okazuje się przydatna tutaj. Zobacz kod arr2ind_pext w poniższym kodzie.

  • Klasyczna pętla skalarna z instrukcją: arr2ind_if.

  • Skalarna pętla bez rozgałęzień: arr2ind_cmov.

  • Lookup tabela: @stgatilov pokazuje, że możliwe jest użycie tabeli odnośników zamiast pdep i inne całkowitą instrukcje. To może dobrze działać, jednak tablica odnośników jest dość duża: nie mieści się w pamięci podręcznej L1. Nie testowano tutaj. Zobacz także dyskusję here.


/* 
gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c 

example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros: 
       ./a.out 20000 25 
*/ 

#include <stdio.h> 
#include <immintrin.h> 
#include <stdint.h> 
#include <omp.h> 
#include <string.h> 


__attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0, k; 
    __m256i msk; 
    m0=0; 
    for (i=0;i<n;i=i+32){        /* Load 32 bytes and compare with zero:   */ 
     msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256()); 
     k=_mm256_movemask_epi8(msk); 
     k=~k;           /* Search for nonzero bits instead of zero bits. */ 
     while (k){ 
     ind[m0]=i+_tzcnt_u32(k);      /* Count the number of trailing zero bits in k. */ 
     m0++; 
     k=_blsr_u32(k);        /* Clear the lowest set bit in k.     */ 
     } 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    uint64_t  cntr_const = 0xFEDCBA; 
    __m256i  shft  = _mm256_set_epi64x(0x04,0x00,0x04,0x00); 
    __m256i  vmsk  = _mm256_set1_epi8(0x0F); 
    __m256i  cnst16  = _mm256_set1_epi32(16); 
    __m256i  shf_lo  = _mm256_set_epi8(0x80,0x80,0x80,0x0B, 0x80,0x80,0x80,0x03, 0x80,0x80,0x80,0x0A, 0x80,0x80,0x80,0x02, 
              0x80,0x80,0x80,0x09, 0x80,0x80,0x80,0x01, 0x80,0x80,0x80,0x08, 0x80,0x80,0x80,0x00); 
    __m256i  shf_hi  = _mm256_set_epi8(0x80,0x80,0x80,0x0F, 0x80,0x80,0x80,0x07, 0x80,0x80,0x80,0x0E, 0x80,0x80,0x80,0x06, 
              0x80,0x80,0x80,0x0D, 0x80,0x80,0x80,0x05, 0x80,0x80,0x80,0x0C, 0x80,0x80,0x80,0x04); 
    __m128i  pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80, 0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00);            

    __m256i  i_vec  = _mm256_setzero_si256(); 
    m0=0; 
    for (i=0;i<n;i=i+16){ 
     __m128i v   = _mm_load_si128((__m128i *)&a[i]);      /* Load 16 bytes.                    */ 
     __m128i msk  = _mm_cmpeq_epi8(v,_mm_setzero_si128());    /* Generate 16x8 bit mask.                  */ 
       msk  = _mm_srli_epi64(msk,4);        /* Pack 16x8 bit mask to 16x4 bit mask.               */ 
       msk  = _mm_shuffle_epi8(msk,pshufbcnst);      /* Pack 16x8 bit mask to 16x4 bit mask.               */ 
       msk  = _mm_xor_si128(msk,_mm_set1_epi32(-1));    /* Invert 16x4 mask.                   */ 
     uint64_t msk64  = _mm_cvtsi128_si64x(msk);        /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/ 
     int  p   = _mm_popcnt_u64(msk64)>>2;        /* p is the number of nonzeros in 16 bytes of a.            */ 
     uint64_t cntr  = _pext_u64(cntr_const,msk64);       /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */ 
                        /* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers.     */ 
     __m256i cntr256 = _mm256_set1_epi64x(cntr); 
       cntr256 = _mm256_srlv_epi64(cntr256,shft); 
       cntr256 = _mm256_and_si256(cntr256,vmsk); 
     __m256i cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo); 
     __m256i cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi); 
       cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo); 
       cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi); 

          _mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo);  /* Note that the stores of iteration i and i+16 may overlap.               */ 
          _mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi); /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */ 
       m0   = m0+p; 
       i_vec  = _mm256_add_epi32(i_vec,cnst16); 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    m0=0; 
    for (i=0;i<n;i++){ 
     if (a[i]!=0){ 
     ind[m0]=i; 
     m0=m0+1; 
     } 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    m0=0; 
    for (i=0;i<n;i++){ 
     ind[m0]=i; 
     m0=(a[i]==0)? m0 : m0+1; /* Compiles to cmov instruction. */ 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){ 
    int i; 
    for (i=0;i<m;i++) printf("i=%d, ind[i]=%d a[ind[i]]=%u\n",i,ind[i],a[ind[i]]); 
    printf("\n"); fflush(stdout); 
    return 0; 
} 


__attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){ 
    int i;        /* Compute a hash to compare the results of different methods. */ 
    unsigned int chk=0; 
    for (i=0;i<m;i++){ 
     chk=((chk<<1)|(chk>>31))^(ind[i]); 
    } 
    printf("chk = %10X\n",chk); 
    return 0; 
} 



int main(int argc, char **argv){ 
int n, i, m; 
unsigned int j, k, d; 
unsigned char *a; 
int *ind; 
double t0,t1; 
int meth, nrep; 
char txt[30]; 

sscanf(argv[1],"%d",&n);   /* Length of array a.         */ 
n=n>>5;        /* Adjust n to a multiple of 32.       */ 
n=n<<5; 
sscanf(argv[2],"%u",&d);   /* The approximate fraction of nonzeros in a is: d/1024 */ 
printf("n=%d, d=%u\n",n,d); 

a=_mm_malloc(n*sizeof(char),32); 
ind=_mm_malloc(n*sizeof(int),32);  

            /* Generate a pseudo random array a.      */ 
j=73659343;     
for (i=0;i<n;i++){ 
    j=j*653+1; 
    k=(j & 0x3FF00)>>8;    /* k is a pseudo random number between 0 and 1023  */ 
    if (k<d){ 
     a[i] = (j&0xFE)+1;   /* Set a[i] to nonzero.         */ 
    }else{ 
     a[i] = 0; 
    } 

} 

/* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d, a[i]=%u\n",i,a[i]);}} printf("\n"); */ /* Uncomment this line to print the nonzeros in a. */ 

char txt0[]="arr2ind_movmsk: "; 
char txt1[]="arr2ind_pext: "; 
char txt2[]="arr2ind_if:  "; 
char txt3[]="arr2ind_cmov: "; 

nrep=10000;         /* Repeat a function nrep times to make relatively accurate timings possible.       */ 
               /* With nrep=1000000: ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 519     */ 
               /* With nrep=10000:  ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513     */ 
printf("nrep = \%d \n\n",nrep); 
arr2ind_movmsk(a,n,ind,&m);     /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */ 
for (meth=0;meth<4;meth++){ 
    t0=omp_get_wtime(); 
    switch (meth){ 
     case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m);   strcpy(txt,txt0); break; 
     case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m);   strcpy(txt,txt1); break; 
     case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m);    strcpy(txt,txt2); break; 
     case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m);   strcpy(txt,txt3); break; 
     default: ; 
    } 
    t1=omp_get_wtime(); 
    printf("method = %s ",txt); 
    /* print_chk(a,ind,m); */ 
    printf(" elapsed time = %6.2f\n",t1-t0); 
} 
print_nonz(a, ind, 2);           /* Do something with the results     */ 
printf("density = %f %% \n\n",((double)m)/((double)n)*100);  /* Actual nonzero density of array a.   */ 

/* print_nonz(a, ind, m); */ /* Uncomment this line to print the indices of the nonzeros.      */ 

return 0; 
} 

/* 
With nrep=1000000: 
./a.out 10016 4 ; ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 48 ; ./a.out 10016 519 ; ./a.out 10016 519  
With nrep=10000: 
./a.out 1000000 5 ; ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 52 ; ./a.out 1000000 513 ; ./a.out 1000000 513  
*/ 


Kod badano rozmiaru tablicy n = 10016 (dane mieszczą się w pamięci podręcznej L1), a n = 1000000 z różnych niezerowych gęstościach około 0,5%, 5% i 50%. Aby uzyskać dokładną synchronizację, funkcje nazywane były odpowiednio 1000000 i 10000 razy.


Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500 
        0.53%  5.1%  50.0% 
arr2ind_movmsk:  0.27  0.53  4.89 
arr2ind_pext:   1.44  1.59  1.45 
arr2ind_if:   5.93  8.95  33.82 
arr2ind_cmov:   6.82  6.83  6.82 

Time in seconds, size n=1000000, 1e4 function calls. 

        0.49%  5.1%  50.1% 
arr2ind_movmsk:  0.57  2.03  5.37 
arr2ind_pext:   1.47  1.47  1.46 
arr2ind_if:   5.88  8.98  38.59 
arr2ind_cmov:   6.82  6.81  6.81 


W tych przykładach vectorized pętle są szybsze niż skalarnych pętli. Wydajność arr2ind_movmsk zależy w dużej mierze od gęstości a. Jest tylko szybsza niż arr2ind_pext, jeśli gęstość jest wystarczająco mała. Równowartość progu zależy również od rozmiaru tablicy n. Funkcja "arr2ind_if" wyraźnie cierpi z powodu niepowodzenia prognozy rozgałęzień przy gęstości niezerowej równej 50%.