2015-02-03 9 views
8

Mam bazę danych Cassandra, z której analizowałem dane przy użyciu SparkSQL za pomocą Apache Spark. Teraz chcę wstawić te analizowane dane do PostgreSQL. Czy jest jakiś sposób, aby to osiągnąć bezpośrednio poza używaniem sterownika PostgreSQL (osiągnąłem to za pomocą PostREST i sterownika, chcę wiedzieć, czy istnieją jakieś metody, takie jak saveToCassandra())?Wstawianie danych analitycznych ze Sparka do PostgreStu

Odpowiedz

13

W tej chwili nie ma natywnej implementacji zapisu RDD do dowolnego systemu DBMS. Oto linki do pokrewnych dyskusji na liście użytkowników Spark: one, two

Generalnie, najbardziej wydajnych podejście byłoby następujące:

  1. potwierdzenia liczby partycji w RDD, nie powinno być za nisko i za wysoko. 20-50 partycji powinno być w porządku, jeśli liczba jest niższa - wywołanie repartition z 20 partycjami, jeśli wyższe - wywołanie coalesce do 50 partycji
  2. Wywołanie transformacji mapPartition, w jej obrębie wywołanie funkcji do wstawiania rekordów do DBMS przy użyciu JDBC. W tej funkcji otwarciu połączenia z bazą danych i użyj polecenia COPY z this API, to pozwala wyeliminować konieczność stosowania osobnego polecenia dla każdego rekordu - w ten sposób wkładka byłyby przetwarzane znacznie szybciej

ten sposób wstawiania danych do PostgreSQL w sposób równoległy, wykorzystując do 50 połączeń równoległych (zależy od rozmiaru klastra Sparka i jego konfiguracji). Całe podejście może być zaimplementowane jako funkcja Java/Scala akceptująca RDD, a ciąg połączenia

1

Odpowiedź od 0x0FFF jest dobra. Oto dodatkowy punkt, który byłby przydatny.

Używam foreachPartition do utrzymywania w magazynie zewnętrznym. Jest to również inline z wzorca projektowego Design Patterns for using foreachRDD podanej w dokumentacji Spark https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams

Przykład:

dstream.foreachRDD { rdd => 
    rdd.foreachPartition { partitionOfRecords => 
    // ConnectionPool is a static, lazily initialized pool of connections 
    val connection = ConnectionPool.getConnection() 
    partitionOfRecords.foreach(record => connection.send(record)) 
    ConnectionPool.returnConnection(connection) // return to the pool for future reuse 
    } 
} 
1

Można używać Postgres skopiować api to napisać, jej znacznie szybciej w ten sposób. Zobacz następujące dwie metody - jedna iteruje nad RDD, aby wypełnić bufor, który może zostać zapisany przez api kopiowania. Jedyną rzeczą, którą musisz się zająć, jest utworzenie poprawnej instrukcji w formacie csv, która będzie używana przez api kopiowania.

def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = { 
     val sb = mutable.StringBuilder.newBuilder 
     val now = System.currentTimeMillis() 

     rdd.collect().foreach(itr => { 
      itr.foreach(_.createCSV(sb, now).append("\n")) 
     }) 

     copyIn("myTable", new StringReader(sb.toString), "statement") 
     sb.clear 
    } 


def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = { 
     val conn = connectionPool.getConnection() 
     try { 
      conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader) 
     } catch { 
      case se: SQLException => logWarning(se.getMessage) 
      case t: Throwable => logWarning(t.getMessage) 
     } finally { 
      conn.close() 
     } 
    } 
+0

Czy bufor SBB StringBuilder nie zwiększy się bez powiązania, jak na liczbę rekordów w RDD EventModel? dlaczego nie zabraknie Ci pamięci? – nont

+0

Używam tego rozwiązania, które działa już od wielu miesięcy i nie widziałem jak do tej pory brakowało mu pamięci. Ilość danych, które posiadam, jest dość spora - 100 000/s. Co więcej, jeśli obawiasz się o to, zawsze możesz mieć inną kontrolę, na podstawie której wywołasz copyIn i wyczyścisz bufor. – smishra

Powiązane problemy