2016-08-19 9 views
5

Obecnie próbuję zbiorczo migrować zawartość bardzo dużej tabeli MySQL do pliku parkietu przez Spark SQL. Ale gdy tak się stanie, szybko zabraknie mi pamięci, nawet jeśli ustawię wyższy limit pamięci sterownika (używam iskry w trybie lokalnym). Przykładowy kod:Masowa migracja danych przez Spark SQL

Dataset<Row> ds = spark.read() 
    .format("jdbc") 
    .option("url", url) 
    .option("driver", "com.mysql.jdbc.Driver") 
    .option("dbtable", "bigdatatable") 
    .option("user", "root") 
    .option("password", "foobar") 
    .load(); 

ds.write().mode(SaveMode.Append).parquet("data/bigdatatable"); 

Wygląda Spark próbuje odczytać całą zawartość tabeli do pamięci, która nie będzie działać bardzo dobrze. Jakie jest najlepsze podejście do masowej migracji danych za pośrednictwem Spark SQL?

+0

Jesteś uzyskanie OOM nie dlatego, że iskra jest źle skonfigurowana, prawdopodobnie należy włączyć przesyłanie strumieniowe w sterowniku: http://stackoverflow.com/a/2448019/2439539 – r90t

Odpowiedz

3

W twoim rozwiązaniu Spark odczyta całą zawartość tablicy w jedną partycję, zanim zacznie pisać. Jednym ze sposobów można uniknąć, że jest podział części czytania, ale to wymaga numeryczną sekwencyjną kolumny w danych źródłowych:

Dataset<Row> ds = spark.read() 
    .format("jdbc") 
    .option("url", url) 
    .option("driver", "com.mysql.jdbc.Driver") 
    .option("dbtable", "bigdatatable") 
    .option("user", "root") 
    .option("password", "foobar") 
    .option("partitionColumn", "NUMERIC_COL") 
    .option("lowerBound", "1") 
    .option("upperBound", "10000") 
    .option("numPartitions", "64") 
    .load(); 

W powyższym przykładzie, kolumna „NUMERIC_COL” musi istnieć w danych i to najlepiej, aby były one równomiernie od 1 do 10000. Oczywiście, jest to wiele wymagań, a taka kolumna prawdopodobnie nie będzie istnieć, więc powinieneś prawdopodobnie utworzyć widok w bazie danych z taką kolumną lub dodać ją w zapytaniu (zauważ, że użyłem ogólnej składni SQL, musisz dostosować się do DBMS):

String query = "(select mod(row_number(), 64) as NUMERIC_COL, * from bigdatatable) as foo" 

Dataset<Row> ds = spark.read() 
    .format("jdbc") 
    .option("url", url) 
    .option("driver", "com.mysql.jdbc.Driver") 
    .option("dbtable", query) 
    .option("user", "root") 
    .option("password", "foobar") 
    .option("partitionColumn", "NUMERIC_COL") 
    .option("lowerBound", "0") 
    .option("upperBound", "63") 
    .option("numPartitions", "64") 
    .load(); 
+0

To wygląda dobrze. Dzięki @Daniel –