2017-09-30 11 views
5

Grałem z operatorem Java Flow offer, ale po przeczytaniu dokumentacji i wykonaniu testu nie rozumiem.Java 9 Behavior of Flow SubmissionPublisher offer method

Oto moja próba

@Test 
public void offer() throws InterruptedException { 
    //Create Publisher for expected items Strings 
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
    //Register Subscriber 
    publisher.subscribe(new CustomSubscriber<>()); 
    publisher.subscribe(new CustomSubscriber<>()); 
    publisher.subscribe(new CustomSubscriber<>()); 
    publisher.offer("item", (subscriber, value) -> false); 
    Thread.sleep(500); 
} 

Operator oferta odebrać element być emitowane i funkcję BiPredicate i ile mi zrozumieć czytając dokumentację, tylko w przypadku, gdy funkcja predykat jest prawdziwa pozycja to zostanie wyemitowany.

Bur po zdać test wynik jest

Subscription done: 
Subscription done: 
Subscription done: 
Got : item --> onNext() callback 
Got : item --> onNext() callback 
Got : item --> onNext() callback 

There's nie zmieni się w wyniku jeśli zamiast false Wracam prawdziwe.

Ktoś może wyjaśnić mi tę operację nieco lepiej, proszę.

Odpowiedz

5

Nie, funkcja orzeczenie jest wykorzystywany do określenia, czy ponawiania operacja Publishing, jak wspomniano w docs:

onDrop - o ile nie zerowa, uchwyt wykorzystane po kropli do abonenta z argumentami subskrybenta i elementu; jeśli zwróci wartość true, oferta zostanie ponownie podjęta (raz).

Nie ma wpływu na to, czy przedmiot ma zostać wysłany początkowo.

EDIT: Przykład jak krople mogą wystąpić podczas korzystania z metody

wymyśliłem przykład jak krople mogą wystąpić podczas wywoływania metody offeroffer. Nie sądzę, że wynik jest w 100% deterministyczny, ale jest wyraźna różnica, gdy jest uruchamiana kilka razy. Możesz po prostu zmienić program obsługi, aby zwracał wartość true zamiast false, aby zobaczyć, jak próba zmniejsza liczbę kropli z powodu nasyconych buforów. W tym przykładzie spadek zazwyczaj występuje, ponieważ maksymalna pojemność bufora jest wyraźnie mała (przekazywana do konstruktora z SubmissionPublisher). Ale gdy ponawiania jest włączona po małym okresie uśpienia, krople są usuwane:

public class SubmissionPubliserDropTest { 

    public static void main(String[] args) throws InterruptedException { 
     // Create Publisher for expected items Strings 
     // Note the small buffer max capacity to be able to cause drops 
     SubmissionPublisher<String> publisher = 
           new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2); 
     // Register Subscriber 
     publisher.subscribe(new CustomSubscriber<>()); 
     publisher.subscribe(new CustomSubscriber<>()); 
     publisher.subscribe(new CustomSubscriber<>()); 
     // publish 3 items for each subscriber 
     for(int i = 0; i < 3; i++) { 
      int result = publisher.offer("item" + i, (subscriber, value) -> { 
       // sleep for a small period before deciding whether to retry or not 
       try { 
        Thread.sleep(200); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       return false; // you can switch to true to see that drops are reduced 
      }); 
      // show the number of dropped items 
      if(result < 0) { 
       System.err.println("dropped: " + result); 
      } 
     } 
     Thread.sleep(3000); 
     publisher.close(); 
    } 
} 

class CustomSubscriber<T> implements Flow.Subscriber<T> { 

    private Subscription sub; 

    @Override 
    public void onComplete() { 
     System.out.println("onComplete"); 
    } 

    @Override 
    public void onError(Throwable th) { 
     th.printStackTrace(); 
     sub.cancel(); 
    } 

    @Override 
    public void onNext(T arg0) { 
     System.out.println("Got : " + arg0 + " --> onNext() callback"); 
     sub.request(1); 
    } 

    @Override 
    public void onSubscribe(Subscription sub) { 
     System.out.println("Subscription done"); 
     this.sub = sub; 
     sub.request(1); 
    } 

} 
+0

Mogę być głupi, ale po zmodyfikowaniu testu i ponownym przeczytaniu dokumentu nadal go nie dostaję. Czy myślisz, że mógłbyś opracować bardzo prosty przykład? Nie martw się, jeśli nie możesz, wymyślę. – paul

+1

Zgodnie z dokumentacją "Element może zostać upuszczony przez jednego lub więcej subskrybentów". Jak mogę odtworzyć, który się zachowuje ?. Próbowałem po raz pierwszy w trybieNext i naSubskrybuj po raz pierwszy, po prostu wykonaj subskrypcję.cancel(), ale nadal nie widzę żadnych zmian w wyniku – paul

+0

@paul Jestem również zainteresowany takim przykładem (wypróbowanie go teraz) . Uaktualni odpowiedź, jeśli ją wymyślę. – manouti

4

SubmissionPublisher.offer stwierdza, że ​​

Element mogą zostać usunięte przez jednego lub więcej abonentów, jeśli limity zasobów są przekroczone, w którym to przypadku dany program obsługi (jeśli nie jest pusty) jest wywoływany, a jeśli zwraca true, ponowna próba jednokrotnego użycia.

Wystarczy, aby zrozumieć, w obu połączeń

publisher.offer("item", (subscriber, value) -> true); // the handler would be invoked 

publisher.offer("item", (subscriber, value) -> false); // the handler wouldn't be invoked 

Ale wciąż publisher publikuje daną pozycję, aby każdy z jego obecnej abonenta. co dzieje się w bieżącym scenariuszu.


Scenariusz do sprawdzania, czy obsługi, które zostały przewidziane jest uzyskiwanie wywołana lub nie próbując odtworzyć to trudne pod względem ograniczenia zasobów, jak doc proponuje:

rzecz może zostać usunięty przez jednego lub więcej subskrybentów , jeśli limity zasobów są przekroczone, w którym to przypadku dany program obsługi (jeśli nie jest pusty) jest wywoływany , a jeśli zwraca wartość true, ponowna próba jednokrotna.

Jeszcze można spróbować upuszczanie elementów z limity czasu ustawiony na minimum podstawowej z wykorzystaniem przeciążonej metoda offer​(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)

timeout - jak długo czekać na zasoby dla każdego abonenta przed rezygnację, w jednostkach jednostki

unit - TIMEUNIT określania sposobu zinterpretować parametr timeout

Od metod offer może spaść elementy (albo natychmiast, albo z ograniczonego limitu czasu), który będzie okazją do wtrącić do obsługi i ponownie.

+0

Dzięki za informację;) – paul