2016-07-19 12 views
11

Mam żenująco równoległe zadanie, za pomocą którego używam Sparka do dystrybucji obliczeń. Te obliczenia są w Pythonie i używam PySpark do odczytu i wstępnego przetwarzania danych. Dane wejściowe do mojego zadania są przechowywane w HBase. Niestety, muszę jeszcze znaleźć zadowalający (tj. Łatwy w użyciu i skalowalny) sposób na odczyt/zapis danych HBase z/do Sparka przy użyciu Pythona.Jak podłączyć HBase i Spark za pomocą Pythona?

Co mam zbadane poprzednio:

  • Podłączenie od wewnątrz moich procesach wykorzystujących happybase Pythona. Ten pakiet umożliwia połączenie z HBase w Pythonie za pomocą funkcji Thrift API HBase. W ten sposób, w zasadzie pomijam Spark do odczytu/zapisu danych i brakuje mi optymalizacji potencjalnych HBase-Spark. Szybkości odczytu wydają się rozsądnie szybkie, ale szybkość zapisu jest wolna. To jest obecnie moje najlepsze rozwiązanie.

  • Użycie SparkContext's newAPIHadoopRDD i saveAsNewAPIHadoopDataset, które wykorzystują interfejs MapReduce HBase. Przykłady tego były kiedyś zawarte w bazie kodów Spark (see here). Jednak są one obecnie uważane za przestarzałe na korzyść wiązań Spark HBase (see here). Odkryłem również, że ta metoda jest powolna i kłopotliwa (do czytania, pisanie działało dobrze), na przykład, ponieważ ciągi znaków zwrócone od newAPIHadoopRDD musiały być analizowane i przekształcane na różne sposoby w celu znalezienia żądanych obiektów Pythona.

Alternatywy, że jestem świadomy:

  • obecnie używam Cloudera za CDH i wersja 5.7.0 oferuje hbase-spark (CDH release notes i a detailed blog post). Ten moduł (wcześniej znany jako SparkOnHBase) będzie oficjalnie częścią HBase 2.0. Niestety, to wspaniałe rozwiązanie wydaje się działać tylko w Scala/Java.

  • Huawei's Spark-SQL-on-HBase/Astro (Nie widzę różnicy między tymi dwoma ...). Nie wygląda na tak solidny i dobrze obsługiwany, jak chciałbym, aby moje rozwiązanie było.

+1

Nie wiem, w jaki sposób CDH rozprowadził złącze świecy hbase. Śledzenie ścieżki zapisu wygląda na to, że używa starej bazy kodu, która nie obsługuje ścieżki zapisu. –

+2

Czy kiedykolwiek znalazłeś rozwiązanie? Ktokolwiek połączył ten PR, który usunął wszystkie istniejące przykłady robocze z folderu iskry/przykładów, zrobił wielką krzywdę dla użytkowników pyspark. –

Odpowiedz

13

znalazłem this comment przez jednego z twórców hbase-spark, co zdaje się sugerować, że jest to sposób na wykorzystanie PySpark kwerendy HBase użyciu Spark SQL.

I rzeczywiście, the pattern described here mogą być stosowane do kwerendy SQL Spark HBase z użyciem PySpark, jak przedstawiono w poniższym przykładzie:

from pyspark import SparkContext 
from pyspark.sql import SQLContext 

sc = SparkContext() 
sqlc = SQLContext(sc) 

data_source_format = 'org.apache.hadoop.hbase.spark' 

df = sc.parallelize([('a', '1.0'), ('b', '2.0')]).toDF(schema=['col0', 'col1']) 

# ''.join(string.split()) in order to write a multi-line JSON string here. 
catalog = ''.join("""{ 
    "table":{"namespace":"default", "name":"testtable"}, 
    "rowkey":"key", 
    "columns":{ 
     "col0":{"cf":"rowkey", "col":"key", "type":"string"}, 
     "col1":{"cf":"cf", "col":"col1", "type":"string"} 
    } 
}""".split()) 


# Writing 
df.write\ 
.options(catalog=catalog)\ # alternatively: .option('catalog', catalog) 
.format(data_source_format)\ 
.save() 

# Reading 
df = sqlc.read\ 
.options(catalog=catalog)\ 
.format(data_source_format)\ 
.load() 

Próbowałem hbase-spark-1.2.0-cdh5.7.0.jar (jak dystrybuowane przez Cloudera) za to, ale wpadł kłopoty (org.apache.hadoop.hbase.spark.DefaultSource does not allow create table as select podczas pisania, java.util.NoSuchElementException: None.get podczas czytania). Jak się okazuje, obecna wersja CDH nie zawiera zmian do hbase-spark, które umożliwiają integrację Spark SQL-HBase.

Co dla mnie oznacza to pakiet iskier shc, znaleziony here.Jedyną zmianą musiałem zrobić do powyższego skryptu jest zmienić:

data_source_format = 'org.apache.spark.sql.execution.datasources.hbase' 

Oto jak złożyć powyższy skrypt na moim klastra CDH, idąc za przykładem z shc README:

spark-submit --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories http://repo.hortonworks.com/content/groups/public/ --files /opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml example.py 

Większość pracy nad shc wydaje się być już połączone w module HBase hbase-spark, do wydania w wersji 2.0. Dzięki temu możliwe jest wykonywanie zapytań w języku Spark SQL o HBase przy użyciu powyższego wzorca (patrz: https://hbase.apache.org/book.html#_sparksql_dataframes). Mój powyższy przykład pokazuje, jak to wygląda dla użytkowników PySpark.

Na koniec zastrzeżenie: moje powyższe przykładowe dane zawierają tylko ciągi. Konwersja danych w Pythonie nie jest obsługiwana przez shc, więc miałem problemy z liczbami całkowitymi i zmiennymi nie wyświetlającymi się w HBase lub z dziwnymi wartościami.

+0

Czy mogę użyć tego do zarządzania dużymi danymi z więcej niż 1 milionem wierszy? –

Powiązane problemy