2017-10-08 28 views
5

tworzę temat Kafka o właściwościach poniżejKafka nie usuwając klucz z nagrobka

min.cleanable.dirty.ratio = 0,01, delete.retention.ms = 100, segment.ms = 100, cleanup.policy = kompaktowy

Powiedzmy wstawić par kV w celu 1111: 1, 1111: 2, 1111: null, 2222: 1 Co się dzieje teraz jest wyjątkiem ostatniego komunikatu, zagęszczanie dziennika działa na resztę wiadomości i usuwa pierwsze dwa, ale zachowuje 1111: null

Zgodne z dokumentacją

Kafka log compaction also allows for deletes. A message with a key and a null payload acts like a tombstone, a delete marker for that key. Tombstones get cleared after a period. 

Więc mam nadzieję, że kiedy delete.retention.ms zostanie osiągnięty, null marker powinien usunąć wiadomość z kluczem

Mam dwa pytania - Dlaczego czy znacznik nagrobka nie działa? Dlaczego ostatnia wiadomość jest ignorowana podczas zagęszczania?

To co server.properties plik -

log.retention.ms=100 
log.retention.bytes=1073741824 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=100 
log.cleaner.delete.retention.ms=100 
log.cleaner.enable=true 
log.cleaner.min.cleanable.ratio=0.01 

Odpowiedz

3

rekordy Tombstone zachowały się już z projektem. Powodem jest to, że brokerzy nie śledzą konsumentów. Załóżmy, że konsument przejdzie do trybu offline przez pewien czas po przeczytaniu pierwszego rekordu. Podczas gdy konsument jest wyłączony, zrób kopię zagięć. Jeśli zagęszczenie dziennika usunie rekord nagrobka, konsument nigdy nie dowie się o tym, że rekord został usunięty. Jeśli konsument zaimplementuje pamięć podręczną, może się zdarzyć, że rekord nigdy nie zostanie usunięty. W ten sposób nagrobek jest przechowywany dłużej, aby umożliwić konsumentowi offline otrzymywanie wszystkich nagrobków do lokalnego czyszczenia.

Nagrobek zostanie usunięty dopiero po delete.retention.ms (wartość domyślna to 1 dzień). Uwaga: jest to konfiguracja poziomu tematycznego i nie ma dla niej konfiguracji na poziomie brokera. Dlatego musisz ustawić konfigurację dla każdego tematu, jeśli chcesz to zmienić.

+0

opisałem w moim poście, że temat został stworzony z delete.retention.ms = 100, co oznacza, że ​​klucz znacznik nagrobek byłby oczyszczone po 100 ms po wysłaniu. Jedno pytanie: czy może być związane z ustawieniem compact.policy? Czytałem gdzieś, że musimy ustawić go na zwarty, usuń, aby umożliwić również usuwanie. – Sam

+0

Jeśli włączysz opcję "kompaktuj, usuń", w zasadzie uzyskasz TTL, a rekordy, które są starsze, zostaną skasowane (nawet jeśli nie ma nagrobka). Cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Włączanie + blogu + kompresja + i dekompresja+do+kojenia Czy możesz dwukrotnie sprawdzić konfigurację tematu, że jest naprawdę skonfigurowany za pomocą delete.retention.ms = 100? –

+0

Uruchomiłem - opis na ten temat, a wynik to Configs: min.cleanable.dirty.ratio = 0.01, delete.retention.ms = 100, cleanup.policy = compact, segment.ms = 100 – Sam

1

Algorytm usuwania nagrobka w zagęszczonej ma być następujący.

  1. Nagrobek nigdy nie jest usuwany, gdy nadal znajduje się w brudnej części dziennika.
  2. Po tym jak nagrobek znajduje się w oczyszczonej części kłody, dodatkowo opóźniamy usunięcie nagrobka przez delete.retention.ms od czasu, gdy nagrobek znajduje się w oczyszczonej części.

Jest możliwe, że nagrobki nadal znajdują się w brudnej części kłody, a tym samym nie są usuwane. Wywoływanie kilku wiadomości z różnych kluczy powinno popchnąć nagrobki do wyczyszczonej części dziennika i usunąć je.

0

zwartym wątek ma dwie części:

1) Oczyszczony porcji: Porcje log kafka czyszczenia kafka czyszczone przynajmniej raz.

2) Brudna porcja: Porcja kafka log nie jest czyszczona przez kafka, nawet raz do tej pory. Kafka utrzymuje brudne przesunięcie. Wszystkie wiadomości z offsetem> = brudne przesunięcie należą do brudnej części.

Uwaga: Czyścik Kafka czyści wszystkie segmenty (niezależnie od tego, czy segment jest w wyczyszczonej/zabrudzonej części) i ponownie je kopiuje za każdym razem, gdy brudny wskaźnik osiągnie min.cleanable.dirty.ratio.

Nagrobki są usuwane segmentowo. Nagrobki w segmencie są usuwane, jeśli spełnia segmentów poniżej warunków:

  1. Segment powinny być oczyszczone części dziennika.

  2. Ostatni czas modyfikacji segmentu powinien wynosić < = (czas ostatniej modyfikacji segmentu zawierający komunikat z offsetem = (przesunięcie brudne - 1)) - delete.retention.ms.

Trudno jest opracować drugi punkt, ale w prostych słowach, Drugi punkt zakłada => wielkość segmentu powinna być równa log.segment.bytes/segment.bytes (1GB domyślnie). Aby wielkość segmentu (w czystszej części) była równa 1 GB, musisz wygenerować dużą liczbę wiadomości z wyróżniającymi się kluczami. Ale wyprodukowałeś tylko 4 wiadomości z 3 wiadomościami mającymi ten sam klucz. Z tego powodu nagrobki nie są usuwane w segmencie zawierającym komunikat 1111: null (Segment nie spełnia drugiego punktu, o którym wspomniałem powyżej).

Masz dwie opcje, aby usunąć nagrobki z 4 komunikaty:

  1. sprawiają delete.retention.ms = 0 lub
  2. zrobić log.segment.bytes/segment.bytes = 50.

Source Code (Extra Reading): https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala

try { 
     // clean segments into the new destination segment 
     for (old <- segments) { 
     val retainDeletes = old.lastModified > deleteHorizonMs 
     info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes." 
      .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) 
     cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats) 
     } 
Powiązane problemy