5

Mam scenariusz, w którym będę otrzymywać dane strumieniowe, które są przetwarzane przez mój program strumieniowania iskier, a dane wyjściowe dla każdego interwału są dołączane do mojego istniejący stół z kandży.java.lang.UnsupportedOperationException: "Zapisywanie do pustej tabeli Cassandra jest niedozwolone

Obecnie mój program do strumieniowania iskier generuje ramkę danych, którą muszę zapisać w moim stole z kassandra. Problem Jestem obecnie stoi to nie jestem w stanie dołączyć dane/wierszy do mojego istniejącej tabeli Cassandra kiedy używam poniżej polecenia

dff.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "xxx", "yyy" -> "retail")).save() 

Czytałem w poniższy link http://rustyrazorblade.com/2015/08/migrating-from-mysql-to-cassandra-using-spark/ gdzie zdał mode = „dołączyć” do metody save ale jego błąd składni rzucanie

również byłem nt stanie zrozumieć, gdzie muszę ustalić z linku poniżej https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/rlGGWQF2wnM

potrzebujesz pomocy tak jak rozwiązać ten issue.I'm pisać moją iskrę przesyłanie strumieniowe zadań w scala

Odpowiedz

8

Myślę, że trzeba to zrobić w następujący sposób:

dff.write.format("org.apache.spark.sql.cassandra").mode(SaveMode.Append).options(Map("table" -> "xxx", "yyy" -> "retail")).save() 

Sposób Cassandra obsługuje siły danych, aby zrobić tak zwanych „upserts” - trzeba pamiętać, że wkładka może zastąpić niektóre z wierszy gdzie klucz podstawowy już zapisanego rekordu jest taki sam jak klucz podstawowy wstawionego rekordu. Cassandra jest bazą do szybkiego zapisu, więc nie sprawdza istnienia danych przed ich zapisaniem.

Powiązane problemy