Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ODPS DataSource. #622

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

wuxueyang96
Copy link

@wuxueyang96 wuxueyang96 commented Jul 5, 2022

What changes were proposed in this pull request?

Create table when writing to MaxCompute if this table didn't exist.

It will throw OdpsException if the target table dosn't exist when writing maxcompute. It could be simply reproducing using code like below:

$ spark-shell --jars emr-maxcompute_2.11-2.3.0-SNAPSHOT-shaded.jar
scala> Seq((1, 2)).toDF("i", "j") \
                  .write.mode("overwrite") \
                  .format("org.apache.spark.aliyun.odps.datasource") \
                  .option("odpsUrl", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api") \
                  .option("tunnelUrl", "http://dt.cn-beijing.maxcompute.aliyun-inc.com") \
                  .option("table", "simple_reproduce_table") \
                  .option("project", "project") \
                  .option("accessKeyId", "<accessKeyId>") \
                  .option("accessKeySecret", "<accessKeySecret>") \
                  .saveAsTable("simple_reproduce_table");
com.aliyun.odps.ReloadException: ODPS-0130131:Table not found - 'project.simple_reproduce_table' table not found
  at com.aliyun.odps.LazyLoad.lazyLoad(LazyLoad.java:65)
  at com.aliyun.odps.Table.isVirtualView(Table.java:383)
  at com.aliyun.odps.Table.isPartitioned(Table.java:1100)
  at org.apache.spark.aliyun.utils.OdpsUtils.isPartitionTable(OdpsUtils.scala:305)
  at org.apache.spark.aliyun.odps.datasource.ODPSWriter.saveToTable(ODPSWriter.scala:68)
  at org.apache.spark.aliyun.odps.datasource.DefaultSource.createRelation(DefaultSource.scala:58)
  at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:543)
  at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:233)
  at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:180)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:142)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:138)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:166)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:163)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:92)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:139)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
  at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:494)
  at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:473)
  at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:429)
  ... 49 elided
Caused by: com.aliyun.odps.NoSuchObjectException: ODPS-0130131:Table not found - 'project.simple_reproduce_table' table not found
  at com.aliyun.odps.rest.RestClient.handleErrorResponse(RestClient.java:376)
  at com.aliyun.odps.rest.RestClient.request(RestClient.java:318)
  at com.aliyun.odps.rest.RestClient.request(RestClient.java:272)
  at com.aliyun.odps.rest.RestClient.request(RestClient.java:226)
  at com.aliyun.odps.rest.RestClient.request(RestClient.java:167)
  at com.aliyun.odps.Table.reload(Table.java:195)
  at com.aliyun.odps.LazyLoad.lazyLoad(LazyLoad.java:63)
  ... 77 more
Caused by: com.aliyun.odps.rest.RestException: RequestId=id,Code=NoSuchObject,Message=ODPS-0130131:Table not found - 'project.simple_reproduce_table' table not found
  ... 84 more

scala>

The root cause is that the target table won't be created automatically.

Support return partition column when reading partition table.

Currently, if we read a table was a partitioned table with some table, it will return the data like below:

$ /path/to/odpscmd
odps@ partition>create table partition_test(i int,j int) partitioned by (k string);
OK
odps@ partition>insert into partition_test partition (k) values (1,2,'3'),(4,5,'6');
OK
odps@ partition>set odps.sql.allow.fullscan=true;
OK
odps@ partition>select * from partition_test;
+------------+------------+------------+
| i          | j          | k          |
+------------+------------+------------+
| 1          | 2          | 3          |
| 4          | 5          | 6          |
+------------+------------+------------+
odps@ partition>quit;
$ spark-shell --jars emr-maxcompute_2.11-2.3.0-SNAPSHOT-shaded.jar
scala >spark.read.format("org.apache.spark.aliyun.odps.datasource") \
                  .option("odpsUrl", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api") \
                  .option("tunnelUrl", "http://dt.cn-beijing.maxcompute.aliyun-inc.com") \
                  .option("table", "partition_test") \
                  .option("project", "partition") \
                  .option("accessKeyId", "<accessKeyId>") \
                  .option("accessKeySecret", "<accessKeySecret>") \
                  .load() \
                  .show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
|  4|  5|
+---+---+

scala >

In fact, we need the partition column to determine which row is belong to which partition.

How was this patch tested?

manual tests like above.
unit test.

@CLAassistant
Copy link

CLAassistant commented Jul 5, 2022

CLA assistant check
All committers have signed the CLA.

1. support create table when write.
2. when create table, support partition by to create table.
3. when read table, support return partition columns.
@wuxueyang96 wuxueyang96 force-pushed the improve_maxcompute_datasource branch from a15513c to b00cc19 Compare July 11, 2022 12:24
@wuxueyang96
Copy link
Author

@powerwu PTAL

@wuxueyang96 wuxueyang96 changed the title create table when writing into maxcompute if the table didn't exist. Improve ODPS DataSource. Jul 13, 2022
@caneGuy
Copy link

caneGuy commented Jul 14, 2022

change the title?add more detailed messages like support create odps table

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants