2011-01-03 11 views
24

Używam BlockingQueue: s (próbuję obu ArrayBlockingQueue i LinkedBlockingQueue) do przekazywania obiektów między różnymi wątkami w aplikacji, nad którą obecnie pracuję. Wydajność i opóźnienie są stosunkowo ważne w tej aplikacji, więc byłem ciekawy, ile czasu zajmuje przesłanie obiektów między dwoma wątkami za pomocą BlockingQueue. Aby to zmierzyć, napisałem prosty program z dwoma wątkami (jeden konsument i jeden producent), w którym pozwoliłem producentowi przekazać znacznik czasu (wzięty przy użyciu System.nanoTime()) do konsumenta, patrz kod poniżej.Java BlockingQueue opóźnienie wysokie w systemie Linux

Przypominam sobie, że przeczytałem gdzieś na jakimś forum, że zajęło to około 10 mikrosekund dla kogoś, kto próbował tego (nie wiem na jakim systemie operacyjnym i sprzęcie, który był włączony), więc nie byłem zbyt zaskoczony, gdy zajęło to ~ 30 mikrosekund dla mnie na moim pudełku Windows 7 (procesor Intel E7500 core 2 duo, 2,93 GHz), podczas gdy w tle jest wiele innych aplikacji. Byłem jednak bardzo zaskoczony, gdy wykonałem ten sam test na znacznie szybszym serwerze Linux (dwa czterordzeniowe procesory Intel X5677 3,46 GHz, z systemem Debian 5 z jądrem 2.6.26-2-amd64). Spodziewałem się, że opóźnienie będzie mniejsze niż w moim oknie z oknami, ale przeciwnie było znacznie wyższe - ~ 75 - 100 mikrosekund! Oba testy zostały wykonane przy użyciu Sun Hotspot JVM w wersji 1.6.0-23.

Czy ktoś inny wykonał podobne testy z podobnymi wynikami w systemie Linux? Czy ktoś może wiedzieć, dlaczego jest tak wolniejszy w Linuksie (z lepszym sprzętem), czy to możliwe, że przełączanie wątków jest po prostu znacznie wolniejsze w Linuksie w porównaniu z oknami? Jeśli tak jest, wygląda na to, że okna są lepiej dostosowane do niektórych aplikacji. Jakąkolwiek pomoc w zrozumieniu względnie wysokich wartości jest bardzo cenna.

Edit:
Po Komentarz od DaveC, ja też zrobiłem test, gdzie ograniczony JVM (na maszynie Linux) z pojedynczym rdzeniem (to znaczy wszystkie wątki uruchomione na tym samym rdzeniu). To znacznie zmieniło wyniki - opóźnienie spadło poniżej 20 mikrosekund, czyli było lepsze niż wyniki na komputerze z systemem Windows. Zrobiłem też kilka testów, w których ograniczyłem wątek producenta do jednego rdzenia i wątek konsumencki do drugiego (próbując oba mieć je na tym samym gnieździe i na różnych gniazdach), ale to nie pomagało - opóźnienie było wciąż ~ 75 mikrosekundy. Przy okazji, ta aplikacja testowa jest praktycznie wszystkim, co używam na maszynie podczas wykonywania testu.

Czy ktoś wie, czy te wyniki mają sens? Czy naprawdę powinno być o wiele wolniej, jeśli producent i konsument działają na różnych rdzeniach? Wszelkie dane wejściowe są naprawdę doceniane.

Edited ponownie (6 stycznia):
eksperymentowałem z różnymi zmianami w kodzie i uruchomiony środowiska:

  1. uaktualnieniu jądra do 2.6.36.2 (od 2.6.26.2). Po aktualizacji jądra zmierzony czas zmienił się na 60 mikrosekund przy bardzo małych zmianach, od 75-100 przed aktualizacją. Ustawienie powinowactwa procesora dla wątków producenta i konsumenta nie miało żadnego skutku, z wyjątkiem ograniczania ich do tego samego rdzenia. Podczas pracy na tym samym rdzeniu zmierzone opóźnienie wynosiło 13 mikrosekund.

  2. W oryginalnym kodzie kazałem producentowi iść spać na 1 sekundę pomiędzy każdą iteracją, aby dać konsumentowi wystarczająco dużo czasu na obliczenie upływu czasu i wydrukowanie go na konsoli. Jeśli usuniemy połączenie do Thread.sleep() i zamiast tego pozwól zarówno producentowi, jak i konsumentowi wywołać barrier.await() w każdej iteracji (konsument wywoła to po wydrukowaniu czasu, który upłynął do konsoli), zmierzone opóźnienie zostanie zmniejszone z 60 mikrosekund do poniżej 10 mikrosekund. W przypadku uruchamiania wątków na tym samym rdzeniu opóźnienie wynosi poniżej 1 mikrosekundy. Czy ktoś może wyjaśnić, dlaczego tak znacznie zmniejszyło to opóźnienie?Moim pierwszym przypuszczeniem było to, że zmiana spowodowała, że ​​producent wywołał funkcję queue.put() przed konsumentem o nazwie queue.take(), więc konsument nigdy nie musiał blokować, ale po zabawie ze zmodyfikowaną wersją ArrayBlockingQueue, znalazłem to przypuszczenie jest fałszywe - konsument faktycznie blokował. Jeśli masz inne domysły, daj mi znać. (Btw, jeśli pozwolę producentowi wywołać zarówno Thread.sleep() jak i barrier.await(), opóźnienie pozostaje na poziomie 60 mikrosekund).

  3. Próbowałem także inne podejście - zamiast wywoływać queue.take(), zadzwoniłem do kolejki.poll() z limitem czasu 100 mikropów. Zmniejszyło to średnie opóźnienie do poniżej 10 mikrosekund, ale jest oczywiście o wiele bardziej obciążające procesor (ale prawdopodobnie mniej intensywnego procesora, który jest zajęty czekaniem?).

Zmieniano ponownie (10 stycznia) - Problem rozwiązany:
ninjalj zasugerował, że opóźnienie ~ 60 mikrosekund było spowodowane procesor mający obudzić się z głębszych stanów uśpienia - i był całkowicie w porządku! Po wyłączeniu stanów C w systemie BIOS opóźnienie zostało zmniejszone do < 10 mikrosekund. To tłumaczy, dlaczego mam o wiele lepsze opóźnienie w punkcie 2 powyżej - kiedy wysyłałem obiekty częściej, procesor był wystarczająco zajęty, aby nie przejść do głębszych stanów snu. Wielkie podziękowania dla wszystkich, którzy poświęcili czas na przeczytanie mojego pytania i podzielili się tutaj swoimi przemyśleniami!

...

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.CyclicBarrier; 

public class QueueTest { 

    ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10); 
    Thread consumerThread; 
    CyclicBarrier barrier = new CyclicBarrier(2); 
    static final int RUNS = 500000; 
    volatile int sleep = 1000; 

    public void start() { 
     consumerThread = new Thread(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        barrier.await(); 
        for(int i = 0; i < RUNS; i++) { 
         consume(); 

        } 
       } catch (Exception e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 
     consumerThread.start(); 

     try { 
      barrier.await(); 
     } catch (Exception e) { e.printStackTrace(); } 

     for(int i = 0; i < RUNS; i++) { 
      try { 
       if(sleep > 0) 
        Thread.sleep(sleep); 
       produce(); 

      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    public void produce() { 
     try { 
      queue.put(System.nanoTime()); 
     } catch (InterruptedException e) { 
     } 
    } 

    public void consume() { 
     try { 
      long t = queue.take(); 
      long now = System.nanoTime(); 
      long time = (now - t)/1000; // Divide by 1000 to get result in microseconds 
      if(sleep > 0) { 
       System.out.println("Time: " + time); 
      } 

     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

    } 

    public static void main(String[] args) { 
     QueueTest test = new QueueTest(); 
     System.out.println("Starting..."); 
     // Run first once, ignoring results 
     test.sleep = 0; 
     test.start(); 
     // Run again, printing the results 
     System.out.println("Starting again..."); 
     test.sleep = 1000; 
     test.start(); 
    } 
} 
+1

wypróbowałeś test na Linuksie ograniczając jvm do używania tylko jednego procesora? może pomóc określić, gdzie upływa czas. – DaveC

+0

Interesujące - próbowałem ograniczyć go do konkretnego procesora, uruchamiając aplikację za pomocą polecenia "taskset 0x00000001 java QueueTest", a opóźnienie zostało zredukowane z około 75-100 do ~ 20 mikrosekund! Nie jestem pewien, czy rozumiem, ale ... – Johan

+0

@Johan: Czy te razy raportujesz to samo w wielu iteracjach? CyklicBarrier służy do koordynowania wątków pracujących na niezależnych zadaniach. Twoje zadania nie są jednak niezależne. Masz zarówno producenta, jak i Konsument czeka na barierce, a następnie (gdy oba wątki osiągną punkt zapory) zasadniczo rozpoczynają synchronizację w kolejce blokującej. Można zobaczyć przeplot różnego rodzaju kombinacji harmonogramów raportujących różne opóźnienia. – Cratylus

Odpowiedz

3

użyłbym tylko ArrayBlockingQueue jeśli możesz. Kiedy go użyłem, opóźnienie wynosiło od 8 do 18 mikrosekund na Linuksie. Jakiś punkt uwagi.

  • Koszt to w dużej mierze czas potrzebny do wybudzenia wątku. Po przebudzeniu wątku jego dane/kod nie będą w pamięci podręcznej, więc odkryjesz, że jeśli zrobisz to, co się stanie po obudzeniu wątku, może to potrwać 2-5 razy dłużej, niż gdybyś wielokrotnie robił to samo.
  • Niektóre operacje używają wywołań systemu operacyjnego (takich jak blokady/cykliczne bariery), które często są droższe w scenariuszu z niskim opóźnieniem niż zajęte. Sugeruję próbę zajętego czekania na twojego producenta, zamiast używania CyclicBarrier. Możesz również zająć się oczekiwaniem na klienta, ale może to być nieuzasadnione kosztowo w prawdziwym systemie.
+0

Dziękuję za odpowiedź. Zdaję sobie sprawę, że większość czasu poświęca się na przebudzenie wątku konsumenckiego, ale myślałem, że zmiana kontekstu w Linuksie będzie znacznie tańsza niż te, które otrzymam. Nie jestem pewien, czy w pełni rozumiem twoją drugą kwestię - CyclicBarrier jest używany tylko raz tutaj (niekoniecznie konieczne), nie w każdej iteracji, kiedy wysyłany jest nowy znacznik czasu. – Johan

1

@Peter Lawrey

Niektóre operacje używać połączeń OS (takich jak Blokowanie/barier cyklicznych)

te nie są OS (jądro) wywołuje. Wdrożone za pomocą prostego CAS (który na X86 przychodzi w/wolnej przestrzeni pamięci także)

Jeszcze jedno: nie używaj ArrayBlockingQueue, chyba że wiesz dlaczego (ty go używasz).

@OP: Spójrz na ThreadPoolExecutor, oferuje doskonałe ramy producent/konsument.

Edit poniżej:

zmniejszyć latencję (obnażając zajęty czekać) zmienić kolejkę do SynchronousQueue dodać następujące jak przed rozpoczęciem konsumenta

... 
consumerThread.setPriority(Thread.MAX_PRIORITY); 
consumerThread.start(); 

Jest to najlepiej jak potrafisz otrzymać.


Edycja2: Tutaj w/synchronizacji. kolejka. I nie drukuje wyników.

package t1; 

import java.math.BigDecimal; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.SynchronousQueue; 

public class QueueTest { 

    static final int RUNS = 250000; 

    final SynchronousQueue<Long> queue = new SynchronousQueue<Long>(); 

    int sleep = 1000; 

    long[] results = new long[0]; 
    public void start(final int runs) throws Exception { 
     results = new long[runs]; 
     final CountDownLatch barrier = new CountDownLatch(1); 
     Thread consumerThread = new Thread(new Runnable() { 
      @Override 
      public void run() { 
       barrier.countDown(); 
       try { 

        for(int i = 0; i < runs; i++) {       
         results[i] = consume(); 

        } 
       } catch (Exception e) { 
        return; 
       } 
      } 
     }); 
     consumerThread.setPriority(Thread.MAX_PRIORITY); 
     consumerThread.start(); 


     barrier.await(); 
     final long sleep = this.sleep; 
     for(int i = 0; i < runs; i++) { 
      try {     
       doProduce(sleep); 

      } catch (Exception e) { 
       return; 
      } 
     } 
    } 

    private void doProduce(final long sleep) throws InterruptedException { 
     produce(); 
    } 

    public void produce() throws InterruptedException { 
     queue.put(new Long(System.nanoTime()));//new Long() is faster than value of 
    } 

    public long consume() throws InterruptedException { 
     long t = queue.take(); 
     long now = System.nanoTime(); 
     return now-t; 
    } 

    public static void main(String[] args) throws Throwable {   
     QueueTest test = new QueueTest(); 
     System.out.println("Starting + warming up..."); 
     // Run first once, ignoring results 
     test.sleep = 0; 
     test.start(15000);//10k is the normal warm-up for -server hotspot 
     // Run again, printing the results 
     System.gc(); 
     System.out.println("Starting again..."); 
     test.sleep = 1000;//ignored now 
     Thread.yield(); 
     test.start(RUNS); 
     long sum = 0; 
     for (long elapsed: test.results){ 
      sum+=elapsed; 
     } 
     BigDecimal elapsed = BigDecimal.valueOf(sum, 3).divide(BigDecimal.valueOf(test.results.length), BigDecimal.ROUND_HALF_UP);   
     System.out.printf("Avg: %1.3f micros%n", elapsed); 
    } 
} 
6

testu nie jest dobrą miarą kolejka przełączania opóźnienia, ponieważ masz jeden wątek odczyt kolejkę, która pisze synchronicznie do System.out (doing ciąg i długą konkatenacji, gdy jest na nim) przed podjęciem ponownie . Aby zmierzyć to poprawnie, musisz przenieść tę aktywność z tego wątku i wykonać jak najmniej pracy w wykonywanym wątku.

Lepiej od razu wykonuj obliczenia (teraz-teraz) w odbiorniku i dodaj wynik do innej kolekcji, która jest okresowo odwadniana przez inny wątek, który generuje wyniki. Zwykle robię to poprzez dodanie do odpowiednio wyprzedanej struktury wspieranej przez macierz dostępnej poprzez AtomicReference (stąd wątek raportujący musi tylko pobraćAndSet na tym odnośniku z innym wystąpieniem tej struktury pamięci, aby pobrać najnowszą serię wyników, np. Make 2 list, ustaw jeden jako aktywny, każdy wątek xsa budzi się i zamienia aktywne i pasywne). Następnie możesz zgłosić jakąś dystrybucję zamiast każdego wyniku (np. Zakres decylowy), co oznacza, że ​​nie generujesz ogromnych plików dziennika przy każdym uruchomieniu i nie dostajesz przydatnych informacji dla Ciebie.

FWIW Zgadzam się z czasów Peter Lawrey podanych & jeśli opóźnienie jest naprawdę krytyczna to trzeba pomyśleć o zajęty czeka z odpowiednim powinowactwie CPU (tj poświęcają rdzeń do tego wątku)

EDIT po 6 stycznia

Jeśli usunąć wywołanie Thread.Sleep() i zamiast pozwolić zarówno barrier.await połączenia producentów i konsumentów() w każdej iteracji (konsument wzywa go po wydrukowaniu czas, który upłynął do konsoli), zmierzone l atencja jest zmniejszona z 60 mikrosekund do poniżej 10 mikrosekund. W przypadku uruchamiania wątków na tym samym rdzeniu opóźnienie wynosi poniżej 1 mikrosekundy. Czy ktoś może wyjaśnić, dlaczego tak znacznie zmniejszyło to opóźnienie?

Patrzysz na różnicy pomiędzy java.util.concurrent.locks.LockSupport#park (i odpowiadające unpark) i Thread#sleep. Większość j.u.c. materiał jest zbudowany na LockSupport (często przez AbstractQueuedSynchronizer, który zapewnia lub bezpośrednio) i to (w Hotspot) rozwiązuje się do sun.misc.Unsafe#park (i unpark), a to kończy się w rękach biblioteki pthread (posix threads). Zazwyczaj pthread_cond_broadcast obudzić i pthread_cond_wait lub pthread_cond_timedwait dla rzeczy takich jak BlockingQueue#take.

Nie mogę powiedzieć, że kiedykolwiek patrzyłem, jak faktycznie wdrożono Thread#sleep (ponieważ nigdy nie natknąłem się na coś niskiego opóźnienia, które nie jest oparte na warunku oczekiwania), ale wyobrażam sobie, że powoduje to zdegradowane przez program w sposób bardziej agresywny niż mechanizm sygnalizacji pthread i to właśnie stanowi przyczynę różnicy opóźnień.

+0

Dziękuję za twój wkład. W tym konkretnym teście nie uważam, że synchroniczny zapis do System.out powinien być problemem, ponieważ mam wątek producenta czekający 2 sekundy, zanim wstawi nowy znacznik czasu do kolejki .... chyba że mnie nie będzie. coś tutaj? Twoje rozwiązanie do rejestrowania znaczników czasu za pomocą dwóch list i AtomicReferences brzmi jak świetny sposób rejestrowania opóźnień w mojej "prawdziwej" aplikacji. – Johan

+0

tak, fair point, przegapiłem sen przed produkcją. W tym czasie czas jest zdominowany przez czas przebudzenia. Powinieneś być w stanie to zobaczyć, dodając możliwość zmiany szybkości, z jaką oferujesz znaczniki czasu do kolejki, opóźnienie będzie się zmniejszać wraz ze wzrostem stawki ofert, chyba że coś dziwnego dzieje się na pudełku. – Matt

+0

@Matt: Twoje punkty są bardzo dobre.Ale nigdy mniej, dlaczego ten sam kod jest tak różny w różnych systemach Windows i Linux jest wciąż niewyjaśniony – Cratylus

0

Jeśli opóźnienie jest krytyczne i nie potrzebujesz ścisłej semantyki FIFO, możesz rozważyć opcję LinkedTransferQueue JSR-166. Umożliwia eliminację, dzięki czemu przeciwstawne operacje mogą wymieniać wartości zamiast synchronizować w strukturze danych kolejek. Takie podejście pomaga zmniejszyć rywalizację, umożliwia równoległą wymianę i zapobiega karom snu/pobudki.

+0

Dziękuję, zagłębię się w LinkedTransferQueue i sprawdzę, czy jest odpowiedni dla mojej aplikacji. – Johan

Powiązane problemy