Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is there a way to get sparklyr to push down queries into jdbc database connection? #3332

Open
scottporter opened this issue Apr 22, 2023 · 3 comments
Assignees
Labels
databricks Issues related to Databricks connection mode

Comments

@scottporter
Copy link

scottporter commented Apr 22, 2023

I was wondering if there is a better way to ask spark to push down queries to the underlying database.

This will push the query down to the underlying database, and takes about 1 second.

monthly_counts <- sparklyr::spark_read_jdbc(
    sc, 
    memory = FALSE, 
    name = "mytablename", 
    options = list(
      url = "jdbc:sybase:Tds:myhostname.something.com:5000",
      driver = "com.sybase.jdbc4.jdbc.SybDriver",
      user = "username",
      password = "password",
      fetchsize = "10000",
      query = "SELECT `month_numeric`, COUNT(*) AS `count_rows`
  FROM `mytablename`
  GROUP BY `month_numeric`",
      pushDownPredicate = TRUE
    ) 
  ) %>%
dplyr::collect()

But the following code doesn't seem to push the query down to the underlying database, since it takes about a minute for a table with about 10 million rows and I gave up after several hours for a table with about a billion rows.

  my_table <- sparklyr::spark_read_jdbc(
    sc, 
    memory = FALSE, 
    name = "mytablename", 
    options = list(
      url = "jdbc:sybase:Tds:myhostname.something.com:5000",
      driver = "com.sybase.jdbc4.jdbc.SybDriver",
      user = "username",
      password = "password",
      fetchsize = "10000",
      dbtable = "mytableschema.mytablename",  # NOTE, same result if I use something like `query = "SELECT * FROM mytablename"` 
      pushDownPredicate = TRUE 
    ) 
  ) 
  
# using this sparklyr function to avoid complicating the query by trying to retain order
#    this will help us make sure to produce the exact same SQL as above for an "apples to apples" comparison
  withr::with_options(
    new = list(sparklyr.dplyr_distinct.impl = "tbl_lazy"),
    code = {
      monthly_count_lazy <- my_table %>%
        dplyr::group_by(month_numeric) %>%
        dplyr::summarize(
          count_rows = dplyr::n()
        ) 
     })
       
     monthly_count_sql <- monthly_count_lazy %>%
       dbplyr::sql_render()
     print(monthly_count_sql)
    # prints out following SQL (showing it is identical to the query we manually passed in the other approach)
    # <SQL> SELECT `month_numeric`, COUNT(*) AS `count_rows`
    # FROM `mytablename`
    # GROUP BY `month_numeric`
    
     monthly_count <- monthly_count_lazy %>%
       dplyr::collect()

Is there another way to ask spark to push down the query to the underlying database?

I've looked and looked for options, but couldn't find anything. I'd much rather just specify the table name up front, and use dplyr pipelines to define the rest of my query and then collect, like I do in the second example. But if it's going to pull all the data into spark and redo the aggregation there, it's not going to be efficient.

This reference made it seem like it might be possible, but didn't give any real instructions on how to go about it:

https://microsites.databricks.com/sites/default/files/2022-07/Spark-Data-Source-V2-Performance-Improvement_Aggregate-Push-Down.pdf

And this reference helped me realize the first approach would work:

https://docs.databricks.com/external-data/jdbc.html#language-scala

But I haven't found any spark setting or sparklyr setting that would help clarify if I can make my regular pipeline push down the work to the underlying database.

@scottporter
Copy link
Author

I've been working with databricks support on this issue, and they gave me somewhat helpful, although cryptic notes that I thought I'd preserve here. According to them, spark_read_jdbc uses V1 of the api (which is the only exposed version), which does not push down aggregations.

Some of the other spark tools, such as "jdbc_catalog" utilize DS V2 API. They give an example (not in R) of querying a local catalog: spark.sql("select ... from jdbc_catalog.some_table ...") and they believe in these cases, spark will invoke DS V2 API under the hood, which would support the aggregations.

@scottporter
Copy link
Author

I have created an ugly workaround for now. We have helper functions that create the connections to the underlying database. These helper functions have a lot of stuff specific to our setup that isn't relevant, but I'll share the important bits here.

Since the right behavior of pushing down the aggregation to the underlying database happens when I call sparklyr::spark_read_jdbc using the query parameter, I build an ugly work around doing the following:

  1. In my helper function that creates the connection to the database (using sparklyr::spark_read_jdbc), I add the info that specifies which underlying database and table we're connecting to as attributes to the object I return (a lazy spark table or whatever we call those).
  2. I pipe that object through various dplyr steps (dplyr::filter, dplyr::select, etc).
  3. Before sending this to dplyr::collect I send it to a new helper function I call push_query_to_db. This function:
    • grabs the info from the attributes of the object that specifies which sql database and table this relates to
    • creates a direct connection for that database (not via spark)
    • uses that direct connection to render the SQL using dbplyr::sql_render (I found that was more reliable that rendering it using the ANSI SQL that sparklyr would normally render)
    • I still have to do a little find/replace magic on that query to make it valid Sybase SQL... Sybase really wants the schema to always be specified
      • My two stringr calls are not very robust, but dbplyr is really consistent with the SQL it produces, so maybe this is fine
        • stringr::str_replace_all(., "FROM \"", glue::glue("FROM \"{schema}\".\""))
        • stringr::str_replace_all(., "JOIN \"", glue::glue("JOIN \"{schema}\".\""))
    • create a new connection to the database using sparklyr::spark_read_jdbc, but use the query parameter and pass in my rendered SQL

Really complicated queries will break this, and it obviously can't help in cases with joins where both tables aren't on the same database, but it works with many of the cases we were having trouble with.

It is extremely ugly, and it's hard to predict which things will work. But if it doesn't work, I can connect directly to the database, pull down the data, and then push it into spark for subsequent processing.

@edgararuiz edgararuiz added the databricks Issues related to Databricks connection mode label Jun 20, 2023
@edgararuiz
Copy link
Collaborator

We may be able to address this as part of #3334 . Using this comment to tie both of them together.

@edgararuiz edgararuiz self-assigned this Jun 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
databricks Issues related to Databricks connection mode
Projects
None yet
Development

No branches or pull requests

2 participants