2016-03-17 22 views
10

Używam CDH5.5Zapytanie HIVE stół w pyspark

Mam tabeli utworzonej w domyślnej HIVE bazy i mogli zapytać go z polecenia ula.

Wyjście

hive> use default; 

OK 

Time taken: 0.582 seconds 


hive> show tables; 

OK 

bank 
Time taken: 0.341 seconds, Fetched: 1 row(s) 

hive> select count(*) from bank; 

OK 

542 

Time taken: 64.961 seconds, Fetched: 1 row(s) 

Jednak jestem w stanie do kwerendy tabeli z pyspark ponieważ nie można rozpoznać tabeli.

from pyspark.context import SparkContext 

from pyspark.sql import HiveContext 

sqlContext = HiveContext(sc) 


sqlContext.sql("use default") 

DataFrame[result: string] 

sqlContext.sql("show tables").show() 

+---------+-----------+ 

|tableName|isTemporary| 

+---------+-----------+ 

+---------+-----------+ 


sqlContext.sql("FROM bank SELECT count(*)") 

16/03/16 20:12:13 INFO parse.ParseDriver: Parsing command: FROM bank SELECT count(*) 
16/03/16 20:12:13 INFO parse.ParseDriver: Parse Completed 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/lib/spark/python/pyspark/sql/context.py", line 552, in sql 
     return DataFrame(self._ssql_ctx.sql(sqlQuery), self) 
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/usr/lib/spark/python/pyspark/sql/utils.py", line 40, in deco 
     raise AnalysisException(s.split(': ', 1)[1]) 
    **pyspark.sql.utils.AnalysisException: no such table bank; line 1 pos 5** 

New Error

>>> from pyspark.sql import HiveContext 
>>> hive_context = HiveContext(sc) 
>>> bank = hive_context.table("default.bank") 
16/03/22 18:33:30 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 
16/03/22 18:33:30 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 
16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:50 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table. 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/lib/spark/python/pyspark/sql/context.py", line 565, in table 
    return DataFrame(self._ssql_ctx.table(tableName), self) 
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco 
    return f(*a, **kw) 
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o22.table. 
: org.apache.spark.sql.catalyst.analysis.NoSuchTableException 
    at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123) 
    at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.hive.client.ClientInterface$class.getTable(ClientInterface.scala:123) 
    at org.apache.spark.sql.hive.client.ClientWrapper.getTable(ClientWrapper.scala:60) 
    at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:406) 
    at org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:422) 
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203) 
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:203) 
    at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:422) 
    at org.apache.spark.sql.SQLContext.table(SQLContext.scala:739) 
    at org.apache.spark.sql.SQLContext.table(SQLContext.scala:735) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

dzięki

Odpowiedz

20

Nie możemy podać nazwę tabeli Hive bezpośrednio do ula sposób kontekst sql, ponieważ nie rozumie nazwę tabeli ula. Jednym ze sposobów, aby przeczytać Hive stolik w pyspark powłoce wynosi:

from pyspark.sql import HiveContext 
hive_context = HiveContext(sc) 
bank = hive_context.table("default.bank") 
bank.show() 

Aby uruchomić SQL na chrząszcza tabeli: Po pierwsze, musimy zarejestrować ramkę danych otrzymujemy od czytania ula tabeli. Następnie możemy uruchomić zapytanie SQL.

bank.registerTempTable("bank_temp") 
hive_context.sql("select * from bank_temp").show() 
+0

dzięki.Jednak otrzymuję ten błąd. – Chn

+0

banku = hive_context.table ("Bank") Traceback (najnowsza wezwanie ostatni): Plik "", wiersz 1, w Plik „/usr/lib/spark/python/pyspark/sql/context.py ", wiersz 565, w tabeli return DataFrame (self._ssql_ctx.table (tableName), self) Plik" /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py ", wiersz 538, w __call__ Plik" /usr/lib/spark/python/pyspark/sql/utils.py ", wiersz 36, w deco return f (* a, ** kw) Plik"/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py ", wiersz 300, w get_return_value py4j.protocol.Py4JJavaError: Wystąpił błąd podczas wywoływania o30.table. – Chn

+0

Edytowałem odpowiedź, aby dołączyć nazwę bazy danych. Teraz powinno działać. – bijay697

0

Próbuję również to zrobić. Kiedy uruchomić pierwszy zestaw poleceń, pojawia się poniższy błąd:

line 300, in get_return_value 

py4j.protocol.Py4JJavaError: Wystąpił błąd podczas wywoływania o28.table. : org.apache.spark.sql.types.DataTypeException: Unsupported dataType: char (1). Jeśli masz strukturę, a jej nazwa zawiera znaki specjalne, użyj backticków () to quote that field name, e.g. x + y. Zwróć uwagę, że funkcja backtick nie jest obsługiwana w nazwie pola ..

-2

możesz użyć sqlCtx.sql. ula-site.xml powinny być kopiowane iskra ścieżkę conf.

my_dataframe = sqlCtx.sql ("select * from kategoriach") my_dataframe.show()

7

SparkSQL zostanie wysłane z własnym metastore (Derby), dzięki czemu może działać nawet, jeśli w systemie nie zainstalowano ula. Jest to domyślny tryb:

W powyższym pytaniu utworzyłeś tabelę w ulu. Otrzymujesz błąd table not found, ponieważ SparkSQL używa domyślnego metastore, który nie zawiera metadanych twojej tabeli ula.

Jeśli chcesz, aby SparkSQL używał metastoreku ula zamiast niego i uzyskiwał dostęp do tabel ula, musisz dodać hive-site.xml w folderze iskry.

0

Nie jestem pewien, czy to nie jest jeszcze rozwiązany, było wymeldowanie jądra pyspark z integracją Liwiusza i to jak ja testowałem ula konfigurację

from pyspark.sql import Row 
from pyspark.sql import HiveContext 
sqlContext = HiveContext(sc) 
test_list = [('A', 25),('B', 20),('C', 25),('D', 18)] 
rdd = sc.parallelize(test_list) 
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) 
schemaPeople = sqlContext.createDataFrame(people) 
# Register it as a temp table 
sqlContext.registerDataFrameAsTable(schemaPeople, "test_table") 
sqlContext.sql("show tables").show() 


Output: 
-------- 
+--------+----------+-----------+ 
|database| tableName|isTemporary| 
+--------+----------+-----------+ 
|  |test_table|  true| 
+--------+----------+-----------+ 

Now one can query it in many different ways, 
1. jupyter kernel(sparkmagic syntax): 
    %%sql 
    SELECT * FROM test_table limit 4 
2. Using default HiveContext: 
    sqlContext.sql("Select * from test_table").show() 
0

Na mój problem, cp ula miejscu. xml do twojego $ SPARK_HOME/conf oraz cp do mysql-connect-java - *. jar do twojego $ SPARK_HOME/jars, to rozwiązanie rozwiązało mój problem.