2016-03-29 25 views
7

Jeśli publikuję kilka wiadomości z rzędu do klastra Kafka (przy użyciu new Producer API), otrzymuję od producenta Future dla każdej wiadomości.Gwarantowana dostawa wielu wiadomości do klastra Kafka

Teraz, zakładając mam skonfigurowany mój producent mieć max.in.flight.requests.per.connection = 1 i retries > 0 mogę tylko czekać na ostatni przyszłości i mieć pewność, że wszystkie poprzednie zostały również dostarczone (w kolejności)? Czy muszę czekać na wszystkie kontrakty futures? W kodzie mogę zrobić:

Producer<String, String> producer = new KafkaProducer<>(myConfig); 
Future<?> f = null; 
for(MessageType message : messages){ 
    f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()); 
} 
try { 
    f.get(); 
} catch(ExecutionException e) { 
    //handle exception 
} 

zamiast tego:

Producer<String, String> producer = new KafkaProducer<>(myConfig); 
List<Future<?>> futureList = new ArrayList<>(); 
for(MessageType message : messages){ 
    futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue())); 
} 
try { 
    for(Future<?> f : futureList) { 
    f.get(); 
    } 
} catch(ExecutionException e) { 
    //handle exception 
} 

i mieć pewność, że jeśli nic nie zostanie złapany tutaj (od pierwszego fragmentu):

try { 
    f.get(); 
} catch(ExecutionException e) { 

następnie wszystkie moje wiadomości zostały zapisane w klastrze w zamów (bez względu na to, czy producent wykonał jakiekolwiek próby pod maską), a jeśli coś pójdzie nie tak, to dostanę tam wyjątek, nawet jeśli nie była to ostatnia przyszłość (na którą czekam), która pierwsza napotkała problem?

Czy są jeszcze jakieś dziwne przypadki narożne?

+0

Jeśli przejrzysz tylko ostatnią przyszłość, nic nie powiesz o poprzednich żądaniach. Włączenie ponownych prób może spowodować zmianę kolejności komunikatów w przypadku awarii. – Sebastian

+0

Jeśli mam max.in.flight.requests.per.connection = 1 to nie powinno zapobiegać zmianie kolejności wiadomości? Jeśli poprzednia partia zakończyła się niepowodzeniem, należy ją powtórzyć przed kontynuowaniem następnego, przynajmniej zgodnie z moją interpretacją. – Manjabes

+0

Spowoduje to ponowienie, ale jeśli nadal nie powiedzie się 'n' razy (' n = numberOfRetries'), przechodzi do następnej wiadomości i potencjalnie pomyślnie ją wstawia. Aby poradzić sobie z tą sytuacją, musisz sprawdzić wynik każdej indywidualnej przyszłości. – Sebastian

Odpowiedz

1

Można to zrobić, ale tylko jeśli a) ustalić ponownych prób być nieskończona (lub skutecznie nieskończona) ib) są OK dane odrzutów w przypadku napotkania non-retriable wyjątek.

Aby wyjaśnić nieco więcej, Kafka ma dwie klasy wyjątków. Odwracalne wyjątki to awarie, w których możesz odnieść sukces, jeśli uruchomisz go ponownie. Na przykład kod NotEnoughReplicasException oznacza, że ​​jest mniej replik, niż jest to wymagane, więc żądanie zostanie odrzucone. Ale jeśli nieudany broker wróci do trybu online, możesz mieć wystarczającą liczbę replik, być z powrotem w dobrej formie, a żądanie się powiedzie, jeśli wyślesz je ponownie. W przeciwieństwie do tego, SerializationException nie można odzyskać, ponieważ nie mamy powodu, aby sądzić, że jeśli spróbujesz ponownie szeregować, wynik będzie inny.

Ponowna próba producenta ma zastosowanie tylko do momentu, w którym trafił wyjątek niepodlegający odpowiadanie. Jeśli więc nigdy nie trafisz żadnego z nich, użyjesz nieskończonych prób i użyjesz innych ustawień, o których wspomniałeś, zamówienie i pomyślna dostawa są gwarantowane po ostatecznej przyszłości. Ponieważ jednak możesz napotkać wyjątki, których nie można odzyskać, zdecydowanie lepiej jest obsłużyć każdą przyszłość (lub wywołanie zwrotne) i zapewnić, że przynajmniej zalogujesz się, jeśli żądanie się nie powiedzie.

1

Oprócz tego, co powiedział Ewen, możesz również zadzwonić pod numer flush() po zakończeniu wysyłania wszystkich wiadomości w pętli. To połączenie będzie blokowane do momentu zakończenia wszystkich kontraktów futures, więc po tym czasie możesz sprawdzić futures dla jakichkolwiek wyjątków. Musiałbyś trzymać się wszystkich przyszłości, aby móc to zrobić.

Alternatywnym sposobem byłoby użycie wywołania zwrotnego z wysyłanymi i przechowywanie wszelkich zwróconych wyjątków, jak pokazano poniżej. Ponowne użycie koloru powoduje, że wszystkie wysyłania zostały zakończone przed sprawdzeniem wyjątków.

Producer<String, String> producer = new KafkaProducer<>(myConfig); 
final ArrayList<Exception> exceptionList = new ArrayList<>(); 

for(MessageType message : messages){ 
    producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() { 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception exception) { 
     if (exception != null) { 
     exceptionList.add(exception); 
     } 
    } 
    }); 
} 

producer.flush(); 

if (!exceptionList.isEmpty()) { 
    // do stuff 
} 
Powiązane problemy