2015-07-08 16 views
5

Próbuję zapisać niektóre dane do naszej bazy danych Oracle przy użyciu funkcji Spark 1.4.0 DataFrame.write.jdbc().Pisanie do bazy danych Oracle przy użyciu Apache Spark 1.4.0

Funkcja symmetric read.jdbc() do odczytu danych z bazy danych Oracle do obiektów DataFrame działa dobrze. Jednak podczas gdy piszę dataframe back (Próbowałem też napisać dokładnie ten sam obiekt, który dostałam od zachodzącego CverWrite true bazy danych) daje następujący wyjątek:

Exception in thread "main" java.sql.SQLSyntaxErrorException: ORA-00902: Ungültiger Datentyp 

    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450) 
    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399) 
    at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1017) 
    at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:655) 
    at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249) 
    at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566) 
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215) 
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58) 
    at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:943) 
    at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1075) 
    at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3820) 
    at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3897) 
    at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1361) 
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:252) 
    at main3$.main(main3.scala:72) 
    at main3.main(main3.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 

tabela ma 2 podstawowe kolumny ciągów. Gdy są one Integer, może to również zapisać.

Właściwie kiedy głębiej, zdaję sobie sprawę, że odwzorowuje StringType do „TEKST”, który nie jest rozpoznawany przez Oracle (powinno być „VARCHAR” zamiast). Kod jest następujący od jdbc.scala które można znaleźć na GitHub:

def schemaString(df: DataFrame, url: String): String = { 
     val sb = new StringBuilder() 
     val dialect = JdbcDialects.get(url) 
     df.schema.fields foreach { field => { 
     val name = field.name 
     val typ: String = 
      dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse(
      field.dataType match { 
      case IntegerType => "INTEGER" 
      case LongType => "BIGINT" 
      case DoubleType => "DOUBLE PRECISION" 
      case FloatType => "REAL" 
      case ShortType => "INTEGER" 
      case ByteType => "BYTE" 
      case BooleanType => "BIT(1)" 
      case StringType => "TEXT" 
      case BinaryType => "BLOB" 
      case TimestampType => "TIMESTAMP" 
      case DateType => "DATE" 
      case DecimalType.Unlimited => "DECIMAL(40,20)" 
      case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC") 
      }) 
     val nullable = if (field.nullable) "" else "NOT NULL" 
     sb.append(s", $name $typ $nullable") 
     }} 
     if (sb.length < 2) "" else sb.substring(2) 
    } 

Więc pytanie jest gdzieś jestem w błędzie lub SparkSQL nie obsługuje Oracle i należy zainstalować wtyczkę aby skorzystać SparkSQL z Oracle?

Mój główny jest prosty:

val conf = new SparkConf().setAppName("Parser").setMaster("local[*]") 
val sc = new SparkContext(conf) 
val sqlContext = new SQLContext(sc) 



val reader = sqlContext.read 
val frame = reader.jdbc(url,"STUDENTS",connectionprop) 

frame.printSchema() 
frame.show() 


val row = Row("3","4") 


val struct = 
    StructType(
    StructField("ONE", StringType, true) :: 
     StructField("TWO", StringType, true) :: Nil) 

val arr = Array(row) 
val rddRow = sc.parallelize(arr) 
val dframe = sqlContext.createDataFrame(rddRow,struct 
) 
dframe.printSchema() 
dframe.show() 

dframe.write.jdbc(url,"STUDENTS",connectionprop) 

Odpowiedz

7

Rzeczywista odpowiedź - to nie jest możliwe, aby odpisać do Oracle przy użyciu istniejącego DataFrame.write.jdbc() w 1.4.0 Ale jeśli nie masz nic przeciwko temu, aby uaktualnić do Spark 1.5 jest trochę hackish sposób to zrobić. Jak opisano here istnieją dwa problemy:

łatwa - sposób iskra, by sprawdzić istnienie tabeli nie jest kompatybilny z wyroczni

SELECT 1 FROM $table LIMIT 1 

które można łatwo uniknąć metodą użytkowego bezpośredni zapisać tabeli

org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(df, url, table, props) 

i twarda (jak można się domyślić) - nie ma żadnego dostępnego dialektu danych Oracle po wyjęciu z pudełka. Przyjęty z tego samego roztworu artykułu:

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect} 
import org.apache.spark.sql.types._ 

    val OracleDialect = new JdbcDialect { 
    override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle") 

    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { 
     case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) 
     case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC)) 
     case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC)) 
     case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC)) 
     case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC)) 
     case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC)) 
     case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC)) 
     case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC)) 
     case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB)) 
     case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) 
     case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) 
//  case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) 
     case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC)) 
     case _ => None 
    } 
    } 

    JdbcDialects.registerDialect(OracleDialect) 

więc wreszcie przykład praca powinna wyglądać podobnie do

val url: String = "jdbc:oracle:thin:@your_domain:1521/dbname" 
    val driver: String = "oracle.jdbc.OracleDriver" 
    val props = new java.util.Properties() 
    props.setProperty("user", "username") 
    props.setProperty("password", "userpassword") 
    org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(dataFrame, url, "table_name", props) 
+0

Niestety, API wewnętrznego zapłonowej. Więc kod może zostać złamany dla późniejszych wersji.Oto przypadek: czwarty parametr został zmieniony z Property na JDBCOptions w Spark 2.1. Zobacz https://issues.apache.org/jira/browse/SPARK-19296?filter=-2 –

-1

można użyć org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable. tak jak mówi Aerondir.

0

Aktualizacja: Począwszy od Spark 2.x

Jest to problem, w którym każdy columnName jest dwukrotnie cytowany w Spark podczas tworzenia jdbc stołu i stąd wszystkie columnNames tabeli Oracle staje się wielkość liter podczas próby ich zapytać przez sqlPlus.

select colA from myTable; => doesn't works anymore 
select "colA" from myTable; => works 

[Rozwiązanie] Dataframe to Oracle creates table with case sensitive column

Powiązane problemy