Próbuję zapisać niektóre dane do naszej bazy danych Oracle przy użyciu funkcji Spark 1.4.0 DataFrame.write.jdbc().Pisanie do bazy danych Oracle przy użyciu Apache Spark 1.4.0
Funkcja symmetric read.jdbc() do odczytu danych z bazy danych Oracle do obiektów DataFrame działa dobrze. Jednak podczas gdy piszę dataframe back (Próbowałem też napisać dokładnie ten sam obiekt, który dostałam od zachodzącego CverWrite true bazy danych) daje następujący wyjątek:
Exception in thread "main" java.sql.SQLSyntaxErrorException: ORA-00902: Ungültiger Datentyp
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1017)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:655)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58)
at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:943)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1075)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3820)
at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3897)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1361)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:252)
at main3$.main(main3.scala:72)
at main3.main(main3.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
tabela ma 2 podstawowe kolumny ciągów. Gdy są one Integer, może to również zapisać.
Właściwie kiedy głębiej, zdaję sobie sprawę, że odwzorowuje StringType do „TEKST”, który nie jest rozpoznawany przez Oracle (powinno być „VARCHAR” zamiast). Kod jest następujący od jdbc.scala które można znaleźć na GitHub:
def schemaString(df: DataFrame, url: String): String = {
val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
df.schema.fields foreach { field => {
val name = field.name
val typ: String =
dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse(
field.dataType match {
case IntegerType => "INTEGER"
case LongType => "BIGINT"
case DoubleType => "DOUBLE PRECISION"
case FloatType => "REAL"
case ShortType => "INTEGER"
case ByteType => "BYTE"
case BooleanType => "BIT(1)"
case StringType => "TEXT"
case BinaryType => "BLOB"
case TimestampType => "TIMESTAMP"
case DateType => "DATE"
case DecimalType.Unlimited => "DECIMAL(40,20)"
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
})
val nullable = if (field.nullable) "" else "NOT NULL"
sb.append(s", $name $typ $nullable")
}}
if (sb.length < 2) "" else sb.substring(2)
}
Więc pytanie jest gdzieś jestem w błędzie lub SparkSQL nie obsługuje Oracle i należy zainstalować wtyczkę aby skorzystać SparkSQL z Oracle?
Mój główny jest prosty:
val conf = new SparkConf().setAppName("Parser").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val reader = sqlContext.read
val frame = reader.jdbc(url,"STUDENTS",connectionprop)
frame.printSchema()
frame.show()
val row = Row("3","4")
val struct =
StructType(
StructField("ONE", StringType, true) ::
StructField("TWO", StringType, true) :: Nil)
val arr = Array(row)
val rddRow = sc.parallelize(arr)
val dframe = sqlContext.createDataFrame(rddRow,struct
)
dframe.printSchema()
dframe.show()
dframe.write.jdbc(url,"STUDENTS",connectionprop)
Niestety, API wewnętrznego zapłonowej. Więc kod może zostać złamany dla późniejszych wersji.Oto przypadek: czwarty parametr został zmieniony z Property na JDBCOptions w Spark 2.1. Zobacz https://issues.apache.org/jira/browse/SPARK-19296?filter=-2 –