Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot write spark data frame to hive table on EMR from pyspark connection in python application using pyspark package (not spark-submit) #127

Open
shaungupta123 opened this issue Aug 7, 2020 · 1 comment

Comments

@shaungupta123
Copy link

shaungupta123 commented Aug 7, 2020

Please fill out the form below.

System Information

EMR cluster node with applications:

Hive 3.1.2, Hue 4.4.0, Spark 2.4.4, TensorFlow 1.14.0, ZooKeeper 3.4.14, HBase 2.2.3, Zeppelin 0.9.0, JupyterHub 1.0.0, HCatalog 3.1.2, Oozie 5.1.0
pip3 freeze
absl-py==0.9.0
astor==0.8.0
beautifulsoup4==4.8.2
bleach==3.1.0
boto==2.49.0
cached-property==1.5.1
config==0.1
database-connectors==0.2.1+3.g184d5af.dirty
funcsigs==1.0.2
gast==0.3.3
google-pasta==0.1.8
grpcio==1.26.0
html5lib==1.0.1
jmespath==0.9.4
Keras-Applications==1.0.6
Keras-Preprocessing==1.0.5
lxml==4.5.0
Markdown==3.1.1
mysqlclient==1.4.2
nltk==3.4.5
nose==1.3.4
numpy==1.16.5
pandas==1.0.5
protobuf==3.11.3
px-schema==0.0.post0.dev1+gea0b46c.dirty
py-dateutil==2.2
py4j==0.10.7
pyarrow==1.0.0
pydantic==1.6.1
pyodbc==4.0.30
pypandoc==1.5
pyspark==2.4.4
python-dateutil==2.8.1
python37-sagemaker-pyspark==1.2.6
pytz==2019.3
PyYAML==5.3
scipy==1.5.2
six==1.13.0
soupsieve==1.9.5
stringcase==1.2.0
tensorboard==1.14.0
tensorflow==1.14.0
tensorflow-estimator==1.14.0
termcolor==1.1.0
toml==0.10.1
utils==0.0.1
webencodings==0.5.1
Werkzeug==0.16.1
windmill==1.6
wrapt==1.11.2

Describe the problem

When trying to save a spark dataframe to hive via sdf.write.saveAsTable I get the below error. This happens when running a spark application via a pyspark connection from within python 3.7 (I am importing pyspark and using getOrCreate to create a yarn connection). I am running this literally on the cluster node. If I create a pyspark shell (using pyspark), and try to save the sdf from there to hive, I do not get the below error.

Minimal repo / logs

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/08/07 13:22:21 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
20/08/07 13:22:49 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
Traceback (most recent call last):
  File "/usr/local/bin/px-convert", line 11, in <module>
    load_entry_point('px-schema==0.0.post0.dev1+gea0b46c.dirty', 'console_scripts', 'px-convert')()
  File "/usr/local/lib/python3.7/site-packages/px_schema/runner.py", line 65, in run
    write(df_converted, f'{args.output_database}://app_data.{table_name}')
  File "/usr/local/lib/python3.7/site-packages/database_connectors/__init__.py", line 71, in write
    return connector.write(data, path)
  File "/usr/local/lib/python3.7/site-packages/database_connectors/pyspark_connector.py", line 59, in write
    sdf.write.saveAsTable(table_name)
  File "/usr/local/lib/python3.7/site-packages/pyspark/sql/readwriter.py", line 777, in saveAsTable
    self._jwrite.saveAsTable(name)
  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o95.saveAsTable.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.amazonaws.services.sagemaker.sparksdk.protobuf.SageMakerProtobufFileFormat could not be instantiated
	at java.util.ServiceLoader.fail(ServiceLoader.java:232)
	at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
	at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
	at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:630)
	at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:94)
	at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:93)
	at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:482)
	at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:217)
	at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:176)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:474)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:453)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:409)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.FileFormat.$init$(Lorg/apache/spark/sql/execution/datasources/FileFormat;)V
	at com.amazonaws.services.sagemaker.sparksdk.protobuf.SageMakerProtobufFileFormat.<init>(SageMakerProtobufFileFormat.scala:41)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at java.lang.Class.newInstance(Class.java:442)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
	... 47 more

Command:

Literally calling sdf.write.saveAsTable('db.table') causes the above error - it is definitely connected to hive as it will complain when duplicating a table name.

@metrizable
Copy link

Hello @shaungupta

Thank you for using Amazon SageMaker. I looked at your pip3 freeze and didn't see the sagemaker-pyspark package from PyPI.

Could you provide more detail on your setup? It's not clear that your issue corresponds to this repo and sagemaker-pyspark==1.4.0.

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants