2016-03-17 14 views
10

Sto usando CDH5.5tabella Query alveare in pyspark

Ho una tabella creata nel database HIVE predefinito e in grado di eseguire una query dal comando HIVE.

uscita

hive> use default; 

OK 

Time taken: 0.582 seconds 


hive> show tables; 

OK 

bank 
Time taken: 0.341 seconds, Fetched: 1 row(s) 

hive> select count(*) from bank; 

OK 

542 

Time taken: 64.961 seconds, Fetched: 1 row(s) 

tuttavia, sono in grado di interrogare la tabella da pyspark in quanto non in grado di riconoscere il tavolo.

from pyspark.context import SparkContext 

from pyspark.sql import HiveContext 

sqlContext = HiveContext(sc) 


sqlContext.sql("use default") 

DataFrame[result: string] 

sqlContext.sql("show tables").show() 

+---------+-----------+ 

|tableName|isTemporary| 

+---------+-----------+ 

+---------+-----------+ 


sqlContext.sql("FROM bank SELECT count(*)") 

16/03/16 20:12:13 INFO parse.ParseDriver: Parsing command: FROM bank SELECT count(*) 
16/03/16 20:12:13 INFO parse.ParseDriver: Parse Completed 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/lib/spark/python/pyspark/sql/context.py", line 552, in sql 
     return DataFrame(self._ssql_ctx.sql(sqlQuery), self) 
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/usr/lib/spark/python/pyspark/sql/utils.py", line 40, in deco 
     raise AnalysisException(s.split(': ', 1)[1]) 
    **pyspark.sql.utils.AnalysisException: no such table bank; line 1 pos 5** 

nuovo errore

>>> from pyspark.sql import HiveContext 
>>> hive_context = HiveContext(sc) 
>>> bank = hive_context.table("default.bank") 
16/03/22 18:33:30 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 
16/03/22 18:33:30 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 
16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:50 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table. 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/lib/spark/python/pyspark/sql/context.py", line 565, in table 
    return DataFrame(self._ssql_ctx.table(tableName), self) 
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco 
    return f(*a, **kw) 
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o22.table. 
: org.apache.spark.sql.catalyst.analysis.NoSuchTableException 
    at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123) 
    at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.hive.client.ClientInterface$class.getTable(ClientInterface.scala:123) 
    at org.apache.spark.sql.hive.client.ClientWrapper.getTable(ClientWrapper.scala:60) 
    at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:406) 
    at org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:422) 
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203) 
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:203) 
    at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:422) 
    at org.apache.spark.sql.SQLContext.table(SQLContext.scala:739) 
    at org.apache.spark.sql.SQLContext.table(SQLContext.scala:735) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

grazie

risposta

20

Non possiamo passare il nome della tabella Hive direttamente ad alveare metodo contesto SQL poiché non comprende il nome della tabella Hive. Un modo per leggere la tabella Hive in guscio pyspark è:

from pyspark.sql import HiveContext 
hive_context = HiveContext(sc) 
bank = hive_context.table("default.bank") 
bank.show() 

Per eseguire il codice SQL sul tavolo alveare: In primo luogo, abbiamo bisogno di registrare il frame di dati si ottiene dalla lettura della tabella di alveare. Quindi possiamo eseguire la query SQL.

bank.registerTempTable("bank_temp") 
hive_context.sql("select * from bank_temp").show() 
+0

grazie.Tuttavia, sto ottenendo questo errore. – Chn

+0

banca = hive_context.table ("banca") Traceback (chiamata più recente scorso): file "", linea 1, in File "/usr/lib/spark/python/pyspark/sql/context.py ", riga 565, nella tabella return DataFrame (self._ssql_ctx.table (tableName), self) File" /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py ", riga 538, in __call__ File" /usr/lib/spark/python/pyspark/sql/utils.py ", riga 36, ​​in deco return f (* a, ** kw) File"/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py ", riga 300, in get_return_value py4j.protocol.Py4JJavaError: si è verificato un errore durante il richiamo di o30.table. – Chn

+0

Ho modificato la risposta per includere il nome del database. Dovrebbe funzionare ora. – bijay697

0

Sto anche provando a farlo. Quando eseguo la prima serie di comandi, ottengo l'errore sotto:

line 300, in get_return_value 

py4j.protocol.Py4JJavaError: Si è verificato un errore durante la chiamata o28.table. : org.apache.spark.sql.types.DataTypeException: dataType non supportato: char (1). Se hai una struttura e un nome di campo con caratteri speciali, usa i backtick () to quote that field name, e.g. x + y.) Si noti che il backtick stesso non è supportato in un nome di campo.

-2

puoi usare sqlCtx.sql. l'alveare-site.xml dovrebbe essere copiato a suscitare percorso conf.

my_dataframe = sqlCtx.sql ("Select * from categorie") my_dataframe.show()

7

SparkSQL viene spedito con il proprio metastore (derby), in modo che possa funzionare anche se l'hive non è installato sul sistema. Questa è la modalità predefinita.

Nella domanda precedente, hai creato una tabella nell'alveare. Si ottiene l'errore table not found perché SparkSQL sta utilizzando il metastore predefinito che non ha i metadati della tabella hive.

Se si desidera che SparkSQL utilizzi invece il metastore dell'alveare e acceda alle tabelle dell'alveare, è necessario aggiungere hive-site.xml nella cartella spark conf.

0

Non è sicuro, se questo non è ancora risolto, stavo controllando il kernel pyspark con l'integrazione di Livio e questo è quanto ho provato la configurazione alveare

from pyspark.sql import Row 
from pyspark.sql import HiveContext 
sqlContext = HiveContext(sc) 
test_list = [('A', 25),('B', 20),('C', 25),('D', 18)] 
rdd = sc.parallelize(test_list) 
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) 
schemaPeople = sqlContext.createDataFrame(people) 
# Register it as a temp table 
sqlContext.registerDataFrameAsTable(schemaPeople, "test_table") 
sqlContext.sql("show tables").show() 


Output: 
-------- 
+--------+----------+-----------+ 
|database| tableName|isTemporary| 
+--------+----------+-----------+ 
|  |test_table|  true| 
+--------+----------+-----------+ 

Now one can query it in many different ways, 
1. jupyter kernel(sparkmagic syntax): 
    %%sql 
    SELECT * FROM test_table limit 4 
2. Using default HiveContext: 
    sqlContext.sql("Select * from test_table").show() 
0

A mio problema, cp l'alveare in loco. xml al tuo $ SPARK_HOME/conf, e cp il mysql-connect-java - *. jar ai tuoi $ SPARK_HOME/jars, questa soluzione ha risolto il mio problema.