Mam bazę danych Cassandra, z której analizowałem dane przy użyciu SparkSQL za pomocą Apache Spark. Teraz chcę wstawić te analizowane dane do PostgreSQL. Czy jest jakiś sposób, aby to osiągnąć bezpośrednio poza używaniem sterownika PostgreSQL (osiągnąłem to za pomocą PostREST i sterownika, chcę wiedzieć, czy istnieją jakieś metody, takie jak saveToCassandra()
)?Wstawianie danych analitycznych ze Sparka do PostgreStu
Odpowiedz
W tej chwili nie ma natywnej implementacji zapisu RDD do dowolnego systemu DBMS. Oto linki do pokrewnych dyskusji na liście użytkowników Spark: one, two
Generalnie, najbardziej wydajnych podejście byłoby następujące:
- potwierdzenia liczby partycji w RDD, nie powinno być za nisko i za wysoko. 20-50 partycji powinno być w porządku, jeśli liczba jest niższa - wywołanie
repartition
z 20 partycjami, jeśli wyższe - wywołaniecoalesce
do 50 partycji - Wywołanie transformacji
mapPartition
, w jej obrębie wywołanie funkcji do wstawiania rekordów do DBMS przy użyciu JDBC. W tej funkcji otwarciu połączenia z bazą danych i użyj polecenia COPY z this API, to pozwala wyeliminować konieczność stosowania osobnego polecenia dla każdego rekordu - w ten sposób wkładka byłyby przetwarzane znacznie szybciej
ten sposób wstawiania danych do PostgreSQL w sposób równoległy, wykorzystując do 50 połączeń równoległych (zależy od rozmiaru klastra Sparka i jego konfiguracji). Całe podejście może być zaimplementowane jako funkcja Java/Scala akceptująca RDD, a ciąg połączenia
Odpowiedź od 0x0FFF jest dobra. Oto dodatkowy punkt, który byłby przydatny.
Używam foreachPartition
do utrzymywania w magazynie zewnętrznym. Jest to również inline z wzorca projektowego Design Patterns for using foreachRDD
podanej w dokumentacji Spark https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams
Przykład:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
Można używać Postgres skopiować api to napisać, jej znacznie szybciej w ten sposób. Zobacz następujące dwie metody - jedna iteruje nad RDD, aby wypełnić bufor, który może zostać zapisany przez api kopiowania. Jedyną rzeczą, którą musisz się zająć, jest utworzenie poprawnej instrukcji w formacie csv, która będzie używana przez api kopiowania.
def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
val sb = mutable.StringBuilder.newBuilder
val now = System.currentTimeMillis()
rdd.collect().foreach(itr => {
itr.foreach(_.createCSV(sb, now).append("\n"))
})
copyIn("myTable", new StringReader(sb.toString), "statement")
sb.clear
}
def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
val conn = connectionPool.getConnection()
try {
conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
} catch {
case se: SQLException => logWarning(se.getMessage)
case t: Throwable => logWarning(t.getMessage)
} finally {
conn.close()
}
}
- 1. Podłącz pyodbc do PostgreStu
- 2. Wstawianie do bazy danych
- 3. Wstawianie obrazu do bazy danych
- 4. Wstawianie danych binarnych do MySQL (bez PreparedStatement)
- 5. Programowe wstawianie danych do dokumentu InDesign
- 6. Oracle: Wstawianie danych rowtype do innej tabeli
- 7. Wstawianie danych do MongoDB z mgo
- 8. Wstawianie danych z jednego serwera do drugiego?
- 9. Czy są jakieś api dla danych analitycznych pulpitów?
- 10. Wstawianie rekordów zbioru danych w bazie danych
- 11. Wykresy na zapas zastosowań analitycznych
- 12. Ręczne wstawianie danych w Firebase
- 13. Wstawianie danych za pomocą Node.js
- 14. Wstawianie danych do tabel mysql przy użyciu ansible
- 15. Entity Framework Wstawianie danych początkowych podczas przebudowy
- 16. Wstawianie obrazu do ggplot2
- 17. Hibernate powolny, aby uzyskać połączenie PostgreStu
- 18. Wstawianie plików do ponownego załadowania
- 19. Wstawianie daty do db2
- 20. Wstawianie wideo do Mediastore
- 21. Wstawianie do dwóch tabel
- 22. Wstawianie obiektów do MongoDB
- 23. Wstawianie danych do kolumn json w postgresql przy użyciu clojure.java.jdbc
- 24. Wstawianie do tabeli mysql i nadpisywanie bieżących danych
- 25. wstawianie danych z jednej tabeli do drugiej w mysql
- 26. Wstawianie w wektorze w odniesieniu do danych tego samego wektora
- 27. Wstawianie tablicy do bazy danych Postgresql w Golang
- 28. Wstawianie nazwy pliku do bazy
- 29. Raport nie jest aktualizowana w jeżyna analitycznych
- 30. wstawianie danych do zestawu danych przy użyciu tabeli danych w języku C# .net
Czy bufor SBB StringBuilder nie zwiększy się bez powiązania, jak na liczbę rekordów w RDD EventModel? dlaczego nie zabraknie Ci pamięci? – nont
Używam tego rozwiązania, które działa już od wielu miesięcy i nie widziałem jak do tej pory brakowało mu pamięci. Ilość danych, które posiadam, jest dość spora - 100 000/s. Co więcej, jeśli obawiasz się o to, zawsze możesz mieć inną kontrolę, na podstawie której wywołasz copyIn i wyczyścisz bufor. – smishra