2016-03-15 29 views
7

Kiedy próbowałem uruchomić Kafka Consumer with Avro nad danymi z moim schematem, zwraca błąd "AvroRuntimeException: źle sformułowane dane. Długość jest ujemna: -40". Widzę, że inni mieli podobne problemy: coverting byte array to json, Avro write and read i Kafka Avro Binary *coder. Mam również odwoływać ten Consumer Group Example, z których wszystkie zostały pomocne, jednak bez pomocy z tego błędu tak daleko .. To działa aż do tej części kodu (linia 73)Kafka Avro Consumer z problemami dekodera

dekodera Dekoder = DecoderFactory.get(). BinaryDecoder (byteArrayInputStream, null);

Próbowałem innych dekoderów i wydrukowałem zawartość zmiennej byteArrayInputStream, która wygląda jak sądzę, że można oczekiwać serializowanych danych avro (w wiadomości widzę schemat, niektóre dane i niektóre zniekształcone dane) Mam wydrukowano Bajty dostępne za pomocą metody .available(), która zwraca 594. Mam problem ze zrozumieniem przyczyny tego błędu. Apache Nifi służy do tworzenia strumienia Kafki z tym samym schematem z hdfs. Byłbym wdzięczny za każdą pomoc.

Odpowiedz

13

Być może problemem jest niedopasowanie między tym, jak dane Avro są zapisywane (kodowane) przez Nifi w porównaniu do sposobu, w jaki aplikacja konsumencka odczytuje (dekoduje) dane.

W skrócie, Avro API udostępnia dwa różne podejścia do serializacji:

  1. Za stworzenie odpowiednich Avro plików: Aby zakodować rekordy danych, ale również osadzić schematu Avro w rodzaju preambuły (przez org.apache.avro.file.{DataFileWriter/DataFileReader}). Osadzenie schematu w plikach Avro ma wiele sensu, ponieważ (a) typowo "ładunek" plików Avro jest o rzędy wielkości większe od wbudowanego schematu Avro i (b) możesz wtedy kopiować lub przenosić te pliki w pobliżu treści twojego serca i nadal upewnij się, że możesz je przeczytać ponownie bez konieczności konsultowania się z kimś lub czymś.
  2. Aby zakodować tylko rekordy danych, tj. Nie osadzać schematu (przez org.apache.avro.io.{BinaryEncoder/BinaryDecoder}, należy zauważyć różnicę w nazwie pakietu: io tutaj powyżej file powyżej). Takie podejście jest często preferowane, gdy Avro-kodowanie wiadomości, które są zapisywane do tematu Kafki, na przykład, ponieważ w porównaniu do wariantu 1 powyżej nie ponosisz kosztów ponownego wmontowania schematu Avro do każdej wiadomości, zakładając, że (bardzo rozsądna) polityka jest taka, że ​​dla tego samego tematu Kafki wiadomości są formatowane/kodowane przy użyciu tego samego schematu Avro. Jest to istotną zaletą, ponieważ w kontekście danych strumieniowych dane są często mniejsze (zwykle od 100 bajtów do kilkuset KB) niż pliki Avro z danymi w reszcie, jak opisano powyżej (często setki lub tysiące MB); więc rozmiar schematu Avro jest stosunkowo duży, a więc nie chcesz go osadzać 2000x podczas zapisywania 2000 rekordów danych do Kafki. Wadą jest to, że musisz "w jakiś sposób" śledzić, jak schematy Avro mapują na tematy Kafki - a dokładniej, musisz w jakiś sposób śledzić, z jakim schematem Avro wiadomość została zakodowana, bez wchodzenia na ścieżkę bezpośredniego osadzania schematu. Dobrą wiadomością jest to, że jest tooling available in the Kafka ecosystem (Avro schema registry), aby robić to w sposób przejrzysty. Zatem w porównaniu do wariantu 1 wariant 2 zyskuje na wydajności kosztem wygody.

Efekt jest taki, że "format przewodu" dla zakodowanych danych Avro będzie wyglądał inaczej w zależności od tego, czy używasz (1) czy (2) powyżej.

Nie jestem zaznajomiony z Apache Nifi, ale szybkie spojrzenie na kod źródłowy (np. ConvertAvroToJSON.java) sugeruje mi, że używa wariantu 1, tj. Osadza schemat Avro obok rekordów Avro. Twój kod konsumenta jednak używa DecoderFactory.get().binaryDecoder(), a zatem wariantu 2 (bez wbudowanego schematu).

Być może to wyjaśnia błąd, na który natrafiłeś?

+1

DZIĘKUJEMY @miguno, to było dokładnie to! Kołyszę się i toczę za pomocą dekodera do DataFileReader z dwiema liniowymi zmianami. DatumReader datumReader = new SpecificDatumReader (schema); DataFileStream dataFileReader = new DataFileStream (inputStream, datumReader); – SparkleGoat

+0

Korekta * Kołyszę się i toczę teraz, gdy zmieniłem na DataFileReader z dwiema zmianami liniowymi. Masz rację binaryDecoder nie był prawidłowym wyborem dla zadania. – SparkleGoat

+1

Cieszę się, że się udało! –