2015-10-02 11 views
8

Użycie Sparka 1.4.0, próbuję wstawić dane z Spark DataFrame do bazy danych MemSQL (która powinna być dokładnie taka jak interakcja z bazą danych MySQL) przy użyciu metody insertIntoJdbc(). Jednak wciąż otrzymuję wyjątek Runtime TableAlreadyExists.Spark DataFrame InsertIntoJDBC - TableAlreadyExists Exception

Najpierw tworzę tabelę MemSQL tak:

CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT); 

Potem stworzyć prosty dataframe w Spark i spróbuj wstawić do MemSQL tak: docs

val df = sc.parallelize(Array(123,234)).toDF.toDF("val") 
//df: org.apache.spark.sql.DataFrame = [val: int] 

df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false) 

java.lang.RuntimeException: Table table1 already exists. 

Odpowiedz

6

To rozwiązanie stosuje się do ogólnych połączeń JDBC, choć odpowiedź przez @wayne jest chyba lepszym rozwiązaniem dla memSQL specjalnie.

insertIntoJdbc wydaje się być przestarzałe od wersji 1.4.0, a używanie go w rzeczywistości wywołuje metodę write.jdbc().

write() zwraca obiekt DataFrameWriter. Jeśli chcesz dołączyć dane do tabeli, będziesz musiał zmienić tryb zapisywania obiektu na "append".

Kolejną kwestią związaną z przykładem w powyższym pytaniu jest fakt, że schemat DataFrame nie pasuje do schematu tabeli docelowej.

Poniższy kod podaje przykładowy przykład z powłoki Spark. Używam spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar, aby rozpocząć sesję iskrzenia powłoki.

import java.util.Properties 

val prop = new Properties() 
prop.put("user", "root") 
prop.put("password", "") 

val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val") 
val dfWriter = df.write.mode("append") 

dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop) 
+2

Witam łokieć, używam iskry 1.5 i nadal otrzymuję tabelę już istnieje wyjątek nawet po powiedzeniu write.mode ("append") Czy chcesz komentarz na ten temat? Istnieje już obiekt o nazwie "customer_spark" w bazie danych –

+0

Hey @DJElbow, to samo tutaj, nadal uzyskanie wyjątku "Tabela" table1 "już istnieje". when write.mode (SaveMode.Append). Sprawdziłem i kiedy używam użytkownika "root", działa on świetnie, ale kiedy używam użytkownika z uprawnieniami CREATE/INSERT/UPDATE, otrzymuję ten błąd. – marnun

3

insertIntoJDBC są rzeczywiście nieprawidłowe ; mówią, że stół musi już istnieć, ale w rzeczywistości jeśli tak, to będzie rzucać błąd, jak widać powyżej:

https://github.com/apache/spark/blob/03cca5dce2cd7618b5c0e33163efb8502415b06e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L264

zalecamy korzystanie z naszego złącze MemSQL iskry, które można znaleźć tutaj:

https://github.com/memsql/memsql-spark-connector

Jeśli to tej biblioteki i import com.memsql.spark.connector._ w kodzie, można użyć df.saveToMemSQL (...), aby zapisać swój DataFrame do MemSQL. można znaleźć w dokumentacji dla naszej złącza tutaj:

http://memsql.github.io/memsql-spark-connector/latest/api/#com.memsql.spark.connector.DataFrameFunctions

+0

Bardzo ładna. To upraszcza sprawy. Czy gdzieś dostępny jest skompilowany słoik? Nie możesz go znaleźć. – DJElbow

+1

Jeśli dodasz maven.memsql.com jako przelicznik, możesz uwzględnić go jako zależność w projekcie: https://github.com/memsql/memsql-spark-connector#using –

1

Miałem ten sam problem. Aktualizacja wersji iskry do wersji 1.6.2 działała dobrze

Powiązane problemy