Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

PrefectHQ/prefect-databricks

Repository files navigation

Note

Active development of this project has moved within PrefectHQ/prefect. The code can be found here and documentation here. Please open issues and PRs against PrefectHQ/prefect instead of this repository.

prefect-databricks


PyPI

Visit the full docs here to see additional examples and the API reference.

The prefect-databricks collection makes it easy to coordiante Databricks jobs with other tools in your data stack using Prefect. Check out the examples below to get started!

Getting Started

Integrate with Prefect flows

Using Prefect with Databricks allows you to define and orchestrate complex data workflows that take advantage of the scalability and performance of Databricks.

This can be especially useful for data-intensive tasks such as ETL (extract, transform, load) pipelines, machine learning training and inference, and real-time data processing.

Below is an example of how you can incorporate Databricks notebooks within your Prefect flows.

Be sure to install prefect-databricks and save a credentials block to run the examples below!

If you don't have an existing notebook ready on Databricks, you can copy the following, and name it example.ipynb. This notebook, accepts a name parameter from the flow and simply prints a message.

name = dbutils.widgets.get("name")
message = f"Don't worry {name}, I got your request! Welcome to prefect-databricks!"
print(message)

Here, the flow launches a new cluster to run example.ipynb and waits for the completion of the notebook run. Replace the placeholders and run.

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
    AutoScale,
    AwsAttributes,
    JobTaskSettings,
    NotebookTask,
    NewCluster,
)


@flow
def jobs_runs_submit_flow(block_name: str, notebook_path: str, **base_parameters):
    databricks_credentials = DatabricksCredentials.load(block_name)

    # specify new cluster settings
    aws_attributes = AwsAttributes(
        availability="SPOT",
        zone_id="us-west-2a",
        ebs_volume_type="GENERAL_PURPOSE_SSD",
        ebs_volume_count=3,
        ebs_volume_size=100,
    )
    auto_scale = AutoScale(min_workers=1, max_workers=2)
    new_cluster = NewCluster(
        aws_attributes=aws_attributes,
        autoscale=auto_scale,
        node_type_id="m4.large",
        spark_version="10.4.x-scala2.12",
        spark_conf={"spark.speculation": True},
    )

    # specify notebook to use and parameters to pass
    notebook_task = NotebookTask(
        notebook_path=notebook_path,
        base_parameters=base_parameters,
    )

    # compile job task settings
    job_task_settings = JobTaskSettings(
        new_cluster=new_cluster,
        notebook_task=notebook_task,
        task_key="prefect-task"
    )

    run = jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        run_name="prefect-job",
        tasks=[job_task_settings]
    )

    return run


jobs_runs_submit_flow(
    block_name="BLOCK-NAME-PLACEHOLDER"
    notebook_path="/Users/<EMAIL_ADDRESS_PLACEHOLDER>/example.ipynb",
    name="Marvin"
)

Upon execution, the notebook run should output:

Don't worry Marvin, I got your request! Welcome to prefect-databricks!

!!! info "Input dictionaries in the place of models"

Instead of using the built-in models, you may also input a valid dictionary.

For example, the following are equivalent:

```python
auto_scale=AutoScale(min_workers=1, max_workers=2)
```

```python
auto_scale={"min_workers": 1, "max_workers": 2}
```

If you have an existing Databricks job, you can run it using jobs_runs_submit_by_id_and_wait_for_completion:

from prefect import flow

from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import (
    jobs_runs_submit_by_id_and_wait_for_completion,
)


@flow
def existing_job_submit(databricks_credentials_block_name: str, job_id):
    databricks_credentials = DatabricksCredentials.load(name=block_name)

    run = jobs_runs_submit_by_id_and_wait_for_completion(
        databricks_credentials=databricks_credentials, job_id=job_id
    )

    return run

existing_job_submit(databricks_credentials_block_name="db-creds", job_id="YOUR-JOB-NAME")

Resources

For more tips on how to use tasks and flows in a Collection, check out Using Collections!

Note, the tasks within this collection were created by a code generator using the service's OpenAPI spec.

The service's REST API documentation can be found here.

Installation

Install prefect-databricks with pip:

pip install prefect-databricks

Requires an installation of Python 3.7+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

Saving Credentials to Block

To use the load method on Blocks, you must already have a block document saved through code or saved through the UI.

Below is a walkthrough on saving block documents through code; simply create a short script, replacing the placeholders.

  1. Head over to Databricks.
  2. Login to your Databricks account and select a workspace.
  3. On the top right side of the nav bar, click on your account name -> User Settings.
  4. Click Access tokens -> Generate new token -> Generate and copy the token.
  5. Note down your Databricks instance from the browser URL, formatted like https://<DATABRICKS-INSTANCE>.cloud.databricks.com/
  6. Create a short script, replacing the placeholders.
from prefect_databricks import DatabricksCredentials

credentials = DatabricksCredentials(
    databricks_instance="DATABRICKS-INSTANCE-PLACEHOLDER"
    token="TOKEN-PLACEHOLDER"
)

connector.save("BLOCK_NAME-PLACEHOLDER")

Congrats! You can now easily load the saved block, which holds your credentials:

from prefect_databricks import DatabricksCredentials

DatabricksCredentials.load("BLOCK_NAME-PLACEHOLDER")

!!! info "Registering blocks"

Register blocks in this module to
[view and edit them](https://orion-docs.prefect.io/ui/blocks/)
on Prefect Cloud:

```bash
prefect block register -m prefect_databricks
```

Feedback

If you encounter any bugs while using prefect-databricks, feel free to open an issue in the prefect-databricks repository.

If you have any questions or issues while using prefect-databricks, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to star or watch prefect-databricks for updates too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-databricks, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
pip install -e ".[dev]"
  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
pre-commit install
  1. git commit, git push, and create a pull request