2015-07-10 11 views
15

Mam przykładową aplikację do odczytu z plików CSV do ramki danych. Ramka danych może być przechowywana w tabeli Hive w formacie parkietu, przy użyciu metody: .Zapisz ramkę danych Spark jako dynamicznie podzieloną tabelę w Hive

Powyższy kod działa poprawnie, ale mam tyle danych na każdy dzień, że chcę dynamicznie partycjonować tabelę ula na podstawie daty utworzenia (kolumna w tabeli).

czy istnieje sposób na dynamiczną partycję ramki danych i zapisanie jej w magazynie magazynu. Chcesz powstrzymać się od zakodowania instrukcji wstawiania za pomocą hivesqlcontext.sql(insert into table partittioin by(date)....).

Pytanie można uznać za rozszerzenie: How to save DataFrame directly to Hive?

każda pomoc jest mile widziane.

Odpowiedz

12

wierzę, że działa coś takiego:

df jest dataframe z rok, miesiąc i innych kolumn

df.write.partitionBy('year', 'month').saveAsTable(...) 

lub

df.write.partitionBy('year', 'month').insertInto(...) 
+0

Próbowałem tej metody Partitionby. Działa tylko na poziomie RDD, po utworzeniu ramki danych większość metod jest stylizowana na DBMS, np. groupby, orderby, ale nie służą do pisania w różnych folderach partycji w Hive. – Chetandalal

+4

Ok, więc udało się go rozwiązać w wersji 1.4. Tryb df.write(). (SaveMode.Append) .partitionBy ("date"). saveAsTable ("Tablename"); . To jednak zmienia moje pole daty na wartość całkowitą i usuwa aktualną datę. na przykład w kolumnie jest 9 unikalnych dat, ale teraz są one zapisane jako 1,2,3 .... a nazwa folderu to data = 1,2,3, ... zamiast daty = 20141121. Daj mi znać, jeśli istnieje sposób, aby to zrobić. – Chetandalal

+0

@ subramaniam-ramasubramanian: proszę odpowiedzieć na pytanie OP jako odpowiedź zamiast edytować istniejącą odpowiedź –

22

udało mi się napisać do podzielono ula tabeli przy użyciu df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")

Musiałem włączyć fo dzięki właściwościom, które sprawiają, że działa.

 
hiveContext.setConf("hive.exec.dynamic.partition", "true") 
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") 
+0

Gdzie należy ustawić powyższe 2 parametry? Próbowałem zalogować się w powłoce ula i uruchamiać powyższe polecenia, ale nie udało się. Jestem pewien, że robię to źle. Czy możesz powiedzieć, gdzie mogę ustawić te właściwości? –

+2

@VrushankDoshi Ustawiłbyś to w programie iskrowym, zaraz po utworzeniu twojego hiveContext. Val sparkConf = nowy SparkConf() Val SC = nowy SparkContext (sparkConf) Val hiveContext = Nowa org.apache.spark.sql.hive.HiveContext (sc) hiveContext.setConf ("hive.exec.dynamic.partition" , "true") hiveContext.setConf ("hive.exec.dynamic.partition.mode "," nonstrict ") – MV23

3

Zmierzyłem się również z tą samą sztuczką, ale rozwiązałem ją.

  1. Kiedy robimy tabelę jako podzieloną na partycje, a następnie partycjonowana kolumna staje się wrażliwa na wielkość liter.

  2. Partycjonowana kolumna powinna być obecna w DataFrame o tej samej nazwie (wielkość liter ma znaczenie). Kod:

    var dbName="your database name" 
    var finaltable="your table name" 
    
    // First check if table is available or not.. 
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) { 
        //If table is not available then it will create for you.. 
        println("Table Not Present \n Creating table " + finaltable) 
        sparkSession.sql("use Database_Name") 
        sparkSession.sql("SET hive.exec.dynamic.partition = true") 
        sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ") 
        sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400") 
        sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID  string,EMP_Name   string,EMP_Address    string,EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)") 
        //Table is created now insert the DataFrame in append Mode 
        df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable) 
    } 
    
+0

df.write.mode (SaveMode.Append) .insertInto (empDB +". "+ finaltable) nie musisz wspominać partitionBy? example df.write.mode (SaveMode.Append). partitionBy ("EMP_DEP") .insertInto (empDB + "." + finaltable) –

+0

Nie ma potrzeby ... jego opcjonalny –

+0

nie działa dla mnie, liczba tabel wynosi zero –

Powiązane problemy