2012-06-18 8 views
23

Rozważ kilka instancji serwera WWW działających równolegle. Każdy serwer zawiera odniesienie do jednego udostępnionego "Strażnika statusu", którego rolą jest utrzymywanie ostatnich żądań ze wszystkich serwerów.Tematyczny bufor okrągły w języku Java

Na przykład (N=3):

Server a: "Request id = ABCD"  Status keeper=["ABCD"] 
Server b: "Request id = XYZZ"  Status keeper=["ABCD", "XYZZ"] 
Server c: "Request id = 1234"  Status keeper=["ABCD", "XYZZ", "1234"] 
Server b: "Request id = FOO"   Status keeper=["XYZZ", "1234", "FOO"] 
Server a: "Request id = BAR"   Status keeper=["1234", "FOO", "BAR"] 

W dowolnym momencie w czasie, „opiekun Stan” można nazwać z aplikacji monitorującej, który czyta te ostatnie N żądania raportu SLA.

Jaki jest najlepszy sposób wdrożenia tego scenariusza producent-konsument w Javie, co daje serwerom internetowym wyższy priorytet niż raport SLA?

CircularFifoBuffer wydaje się być odpowiednią strukturą danych do przechowywania żądań, ale nie jestem pewien, jaki jest optymalny sposób wdrożenia wydajnej współbieżności.

+0

Zdefiniuj "wyższy priorytet". Co jeśli raport zaczął czytać bufor? Czy powinien się złamać i zacząć od nowa, jeśli ktoś chce to napisać? Czy to z kolei może prowadzić do głodu? –

+0

Nie powinien nigdy głodować i nigdy nie powinien być zatrzymany, ale może poczekać trochę dłużej - co oznacza, że ​​jego priorytet powinien powoli rosnąć wraz z upływem czasu. –

+0

Ilu producentów i ilu konsumentów ma bufor pierścieniowy, usunę trochę kodu, gdy podasz dane. – bestsss

Odpowiedz

16
Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer()); 
+0

+1 Czy "lot" nie powinien być lotny? –

+2

Naprawdę nie ma znaczenia, o ile kod inicjujący nie nadaje się do wyścigu. – MahdeTo

+0

Skąd pochodzi BufferUtils? Próbowałem użyć tego z Apache, w gradle file: "compile" org.apache.commons: commons-collections4: 4.1 '", ale go tam nie ma ... –

2

Chciałbym rzucić okiem na ArrayDeque, lub dla bardziej jednoczesnej implementacji zajrzyj do biblioteki Disruptor, która jest jednym z najbardziej zaawansowanych/złożonych buforów pierścieniowych w Javie.

Alternatywą jest użycie nieokreślonej kolejki, która jest bardziej równoległa, ponieważ producent nigdy nie musi czekać na klienta. Java Chronicle

Jeśli twoje potrzeby nie usprawiedliwiają złożoności, ArrayDeque może być wszystkim, czego potrzebujesz.

+0

Jedna ważna kwestia: "ArrayDeque" nie jest ograniczone do rozmiaru. Używa on okrągłej tablicy, prawda, ale zmieni rozmiar, aby pomieścić więcej elementów w razie potrzeby. OP musiałby ręcznie "pop()" element przed wstawieniem nowego po pewnym czasie, jednocześnie jednocześnie jawnie utrzymując bezpieczeństwo wątków ... – thkala

+1

Jeśli potrzebujesz go mieć ograniczony rozmiar, możesz użyć ArrayBlockingQueue. –

+1

'ArrayBlockingQueue' ogranicza swój rozmiar, blokując aż do usunięcia elementu. O ile mogę powiedzieć, OP chce, aby kolejka domyślnie upuszczała/nadpisywała najstarszy element, zachowując tylko najnowsze elementy "N". – thkala

7

Oto pozbawiona blokady implementacja bufora pierścieniowego. Implementuje bufor o ustalonym rozmiarze - nie ma funkcji FIFO. Sugerowałbym zamiast tego przechowywanie Collection żądań dla każdego serwera. W ten sposób raport może wykonać filtrowanie zamiast filtrować strukturę danych.

/** 
* Container 
* --------- 
* 
* A lock-free container that offers a close-to O(1) add/remove performance. 
* 
*/ 
public class Container<T> implements Iterable<T> { 

    // The capacity of the container. 
    final int capacity; 
    // The list. 
    AtomicReference<Node<T>> head = new AtomicReference<Node<T>>(); 
    // TESTING { 
    AtomicLong totalAdded = new AtomicLong(0); 
    AtomicLong totalFreed = new AtomicLong(0); 
    AtomicLong totalSkipped = new AtomicLong(0); 

    private void resetStats() { 
    totalAdded.set(0); 
    totalFreed.set(0); 
    totalSkipped.set(0); 
    } 
    // TESTING } 

    // Constructor 
    public Container(int capacity) { 
    this.capacity = capacity; 
    // Construct the list. 
    Node<T> h = new Node<T>(); 
    Node<T> it = h; 
    // One created, now add (capacity - 1) more 
    for (int i = 0; i < capacity - 1; i++) { 
     // Add it. 
     it.next = new Node<T>(); 
     // Step on to it. 
     it = it.next; 
    } 
    // Make it a ring. 
    it.next = h; 
    // Install it. 
    head.set(h); 
    } 

    // Empty ... NOT thread safe. 
    public void clear() { 
    Node<T> it = head.get(); 
    for (int i = 0; i < capacity; i++) { 
     // Trash the element 
     it.element = null; 
     // Mark it free. 
     it.free.set(true); 
     it = it.next; 
    } 
    // Clear stats. 
    resetStats(); 
    } 

    // Add a new one. 
    public Node<T> add(T element) { 
    // Get a free node and attach the element. 
    totalAdded.incrementAndGet(); 
    return getFree().attach(element); 
    } 

    // Find the next free element and mark it not free. 
    private Node<T> getFree() { 
    Node<T> freeNode = head.get(); 
    int skipped = 0; 
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free. 
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) { 
     skipped += 1; 
     freeNode = freeNode.next; 
    } 
    // Keep count of skipped. 
    totalSkipped.addAndGet(skipped); 
    if (skipped < capacity) { 
     // Put the head as next. 
     // Doesn't matter if it fails. That would just mean someone else was doing the same. 
     head.set(freeNode.next); 
    } else { 
     // We hit the end! No more free nodes. 
     throw new IllegalStateException("Capacity exhausted."); 
    } 
    return freeNode; 
    } 

    // Mark it free. 
    public void remove(Node<T> it, T element) { 
    totalFreed.incrementAndGet(); 
    // Remove the element first. 
    it.detach(element); 
    // Mark it as free. 
    if (!it.free.compareAndSet(false, true)) { 
     throw new IllegalStateException("Freeing a freed node."); 
    } 
    } 

    // The Node class. It is static so needs the <T> repeated. 
    public static class Node<T> { 

    // The element in the node. 
    private T element; 
    // Are we free? 
    private AtomicBoolean free = new AtomicBoolean(true); 
    // The next reference in whatever list I am in. 
    private Node<T> next; 

    // Construct a node of the list 
    private Node() { 
     // Start empty. 
     element = null; 
    } 

    // Attach the element. 
    public Node<T> attach(T element) { 
     // Sanity check. 
     if (this.element == null) { 
     this.element = element; 
     } else { 
     throw new IllegalArgumentException("There is already an element attached."); 
     } 
     // Useful for chaining. 
     return this; 
    } 

    // Detach the element. 
    public Node<T> detach(T element) { 
     // Sanity check. 
     if (this.element == element) { 
     this.element = null; 
     } else { 
     throw new IllegalArgumentException("Removal of wrong element."); 
     } 
     // Useful for chaining. 
     return this; 
    } 

    public T get() { 
     return element; 
    } 

    @Override 
    public String toString() { 
     return element != null ? element.toString() : "null"; 
    } 
    } 

    // Provides an iterator across all items in the container. 
    public Iterator<T> iterator() { 
    return new UsedNodesIterator<T>(this); 
    } 

    // Iterates across used nodes. 
    private static class UsedNodesIterator<T> implements Iterator<T> { 
    // Where next to look for the next used node. 

    Node<T> it; 
    int limit = 0; 
    T next = null; 

    public UsedNodesIterator(Container<T> c) { 
     // Snapshot the head node at this time. 
     it = c.head.get(); 
     limit = c.capacity; 
    } 

    public boolean hasNext() { 
     // Made into a `while` loop to fix issue reported by @Nim in code review 
     while (next == null && limit > 0) { 
     // Scan to the next non-free node. 
     while (limit > 0 && it.free.get() == true) { 
      it = it.next; 
      // Step down 1. 
      limit -= 1; 
     } 
     if (limit != 0) { 
      next = it.element; 
     } 
     } 
     return next != null; 
    } 

    public T next() { 
     T n = null; 
     if (hasNext()) { 
     // Give it to them. 
     n = next; 
     next = null; 
     // Step forward. 
     it = it.next; 
     limit -= 1; 
     } else { 
     // Not there!! 
     throw new NoSuchElementException(); 
     } 
     return n; 
    } 

    public void remove() { 
     throw new UnsupportedOperationException("Not supported."); 
    } 
    } 

    @Override 
    public String toString() { 
    StringBuilder s = new StringBuilder(); 
    Separator comma = new Separator(","); 
    // Keep counts too. 
    int usedCount = 0; 
    int freeCount = 0; 
    // I will iterate the list myself as I want to count free nodes too. 
    Node<T> it = head.get(); 
    int count = 0; 
    s.append("["); 
    // Scan to the end. 
    while (count < capacity) { 
     // Is it in-use? 
     if (it.free.get() == false) { 
     // Grab its element. 
     T e = it.element; 
     // Is it null? 
     if (e != null) { 
      // Good element. 
      s.append(comma.sep()).append(e.toString()); 
      // Count them. 
      usedCount += 1; 
     } else { 
      // Probably became free while I was traversing. 
      // Because the element is detached before the entry is marked free. 
      freeCount += 1; 
     } 
     } else { 
     // Free one. 
     freeCount += 1; 
     } 
     // Next 
     it = it.next; 
     count += 1; 
    } 
    // Decorate with counts "]used+free". 
    s.append("]").append(usedCount).append("+").append(freeCount); 
    if (usedCount + freeCount != capacity) { 
     // Perhaps something was added/freed while we were iterating. 
     s.append("?"); 
    } 
    return s.toString(); 
    } 
} 

Zauważ, że jest to zbliżone do zakładania O1. A Separator po raz pierwszy emituje "", a następnie jego parametr od tego momentu.

Edytuj: Dodano metody testowania.

// ***** Following only needed for testing. ***** 
private static boolean Debug = false; 
private final static String logName = "Container.log"; 
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\"); 

private static synchronized void log(boolean toStdoutToo, String s) { 
    if (Debug) { 
    if (toStdoutToo) { 
     System.out.println(s); 
    } 
    log(s); 
    } 
} 

private static synchronized void log(String s) { 
    if (Debug) { 
    try { 
     log.writeLn(logName, s); 
    } catch (IOException ex) { 
     ex.printStackTrace(); 
    } 
    } 
} 
static volatile boolean testing = true; 

// Tester object to exercise the container. 
static class Tester<T> implements Runnable { 
    // My name. 

    T me; 
    // The container I am testing. 
    Container<T> c; 

    public Tester(Container<T> container, T name) { 
    c = container; 
    me = name; 
    } 

    private void pause() { 
    try { 
     Thread.sleep(0); 
    } catch (InterruptedException ex) { 
     testing = false; 
    } 
    } 

    public void run() { 
    // Spin on add/remove until stopped. 
    while (testing) { 
     // Add it. 
     Node<T> n = c.add(me); 
     log("Added " + me + ": " + c.toString()); 
     pause(); 
     // Remove it. 
     c.remove(n, me); 
     log("Removed " + me + ": " + c.toString()); 
     pause(); 
    } 
    } 
} 
static final String[] strings = { 
    "One", "Two", "Three", "Four", "Five", 
    "Six", "Seven", "Eight", "Nine", "Ten" 
}; 
static final int TEST_THREADS = Math.min(10, strings.length); 

public static void main(String[] args) throws InterruptedException { 
    Debug = true; 
    log.delete(logName); 
    Container<String> c = new Container<String>(10); 

    // Simple add/remove 
    log(true, "Simple test"); 
    Node<String> it = c.add(strings[0]); 
    log("Added " + c.toString()); 
    c.remove(it, strings[0]); 
    log("Removed " + c.toString()); 

    // Capacity test. 
    log(true, "Capacity test"); 
    ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length); 
    // Fill it. 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    log("Added " + strings[i] + " " + c.toString()); 
    } 
    // Add one more. 
    try { 
    c.add("Wafer thin mint!"); 
    } catch (IllegalStateException ise) { 
    log("Full!"); 
    } 
    c.clear(); 
    log("Empty: " + c.toString()); 

    // Iterate test. 
    log(true, "Iterator test"); 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    } 
    StringBuilder all = new StringBuilder(); 
    Separator sep = new Separator(","); 
    for (String s : c) { 
    all.append(sep.sep()).append(s); 
    } 
    log("All: "+all); 
    for (int i = 0; i < strings.length; i++) { 
    c.remove(nodes.get(i), strings[i]); 
    } 
    sep.reset(); 
    all.setLength(0); 
    for (String s : c) { 
    all.append(sep.sep()).append(s); 
    } 
    log("None: " + all.toString()); 

    // Multiple add/remove 
    log(true, "Multi test"); 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    log("Added " + strings[i] + " " + c.toString()); 
    } 
    log("Filled " + c.toString()); 
    for (int i = 0; i < strings.length - 1; i++) { 
    c.remove(nodes.get(i), strings[i]); 
    log("Removed " + strings[i] + " " + c.toString()); 
    } 
    c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]); 
    log("Empty " + c.toString()); 

    // Multi-threaded add/remove 
    log(true, "Threads test"); 
    c.clear(); 
    for (int i = 0; i < TEST_THREADS; i++) { 
    Thread t = new Thread(new Tester<String>(c, strings[i])); 
    t.setName("Tester " + strings[i]); 
    log("Starting " + t.getName()); 
    t.start(); 
    } 
    // Wait for 10 seconds. 
    long stop = System.currentTimeMillis() + 10 * 1000; 
    while (System.currentTimeMillis() < stop) { 
    Thread.sleep(100); 
    } 
    // Stop the testers. 
    testing = false; 
    // Wait some more. 
    Thread.sleep(1 * 100); 
    // Get stats. 
    double added = c.totalAdded.doubleValue(); 
    double skipped = c.totalSkipped.doubleValue(); 
    //double freed = c.freed.doubleValue(); 
    log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped)/added) + ")"); 
} 
+0

Czy masz formalną weryfikację poprawności tego algorytmu? Niezabezpieczone struktury danych są bardzo trudne do uzyskania, chyba że unikniesz ponownego wykorzystywania węzłów ... – thkala

+0

@thkala - jak "formalnie" potrzebujesz? Podstawowym algorytmem jest metoda 'getFree', która wybiera wolny węzeł i zaznacza go do użycia. Jest to dość proste, a jego poprawność powinna być oczywista. Dodałem moje metody testowe. Być może oni ci pomogą. – OldCurmudgeon

+0

To rodzaj "formalnych", które zostały opublikowane i recenzowane przez algorytmy.Pracowałem z niezabezpieczonymi strukturami danych w znacznym stopniu i mogą być one niezwykle trudne do uzyskania. Jest zbyt wiele narożnych przypadków ... – thkala

1

Również spojrzeć na java.util.concurrent.

blokujące kolejek blokuje dopóki jest o co konsumpcji lub (opcjonalnie) przestrzeń do wytwarzania:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

współbieżne połączone kolejka nie jest blokowanie i wykorzystuje zręczny algorytmu, który umożliwia wytwórcy do odbiorcy do być aktywne jednocześnie:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

+0

CLQ jest niezwiązane, to nie działa. – bestsss

1

Hazelcast na Queue oferuje niemal wszystko można zapytać, ale nie suppor t circularity. Ale z twojego opisu nie jestem pewien, czy naprawdę tego potrzebujesz.

0

Jeśli to ja, użyłbym CircularFIFOBuffer, jak wskazano, i synchronizować wokół bufora podczas pisania (dodawania). Gdy aplikacja monitorująca chce odczytać bufor, zsynchronizuj go z buforem, a następnie skopiuj lub sklonuj, aby użyć go do raportowania.

Ta sugestia jest oparta na założeniu, że opóźnienie jest minimalne, aby skopiować/sklonować bufor do nowego obiektu. Jeśli istnieje duża liczba elementów, a czas kopiowania jest długi, nie jest to dobry pomysł.

przykład Pseudo-Code:

public void writeRequest(String requestID) { 
    synchronized(buffer) { 
     buffer.add(requestID); 
    } 
} 

public Collection<String> getRequests() { 
    synchronized(buffer) { 
     return buffer.clone(); 
    } 
} 
2

Może chcesz spojrzeć na Disruptor - Concurrent Programming Framework.

  • Znajdź referat opisujący alternatyw, projektowanie a także comparement wydajności do java.util.concurrent.ArrayBlockingQueue tutaj: pdf
  • Rozważmy czytać pierwsze trzy artykuły z BlogsAndArticles

Jeśli biblioteka jest zbyt dużo, kij do java.util.concurrent.ArrayBlockingQueue