Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add _SQLSource and SQLInput to bytewax.connectors.sql #263

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Carlos-Marques
Copy link

@Carlos-Marques Carlos-Marques commented Jun 27, 2023

Hi everyone 👋

In the context of this Slack thread I opened this MR to share a new kind of connector I wrote for bytewax.

The jist of it is that it uses SQLAlchemy core operations to abstract access to the SQL compatible DB (you will also depend on the underlying engine, our case is Postgres).

It presents this interface:

class SQLInput(PartitionedInput):
    def __init__(
        self,
        connection_string: str,
        query: str,
        starting_cursors: dict,
        backoff_start_delay: float = 1.0,
        backoff_factor: float = 2.0,
        backoff_max_delay: float = 60.0,
    ):
  1. connection_string: connection URI of DB
  2. query: query to run that returns the results you want
  3. starting_cursors: let's you define a multi column cursor that is used to keep the state/init the Input where you wanted it to start
  4. backoff_*: let's you define the polling behaviour for when you get to the end of the results in your query (it uses SQLAlchemy batching and streaming of results while the query still has results but when you get to the real time data you need to poll the DB for extra results that are coming in)

simple example usage:

sql_input = SQLInput(
        connection_string=connection_uri,
        query="SELECT \"timestamp\", \"signal_1\" FROM \"table_1\" WHERE \"timestamp\" > :timestamp"
        starting_cursors={"timestamp": start_time},
    )

This returns a stream of events of this type:
{"timestamp": datetime.datetime(*), "signal_1": <signal_value>}

The query requirement is that it needs to select the cursors and the variables in the query need to have the same name of the selected cursors. (I still don't check this)
In the example I gave this happens because I select "timestamp" and in my WHERE I have a :timestamp variable.

If there is interest in this I can add tests and docs too!

@@ -0,0 +1,108 @@
"""Connectors for [SQL](https://en.wikipedia.org/wiki/SQL).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this connector is ok to be in bytewax, I suggest to update docs:
https://github.com/bytewax/bytewax/blob/main/docs/articles/getting-started/ins_and_outs.md

]


class _SQLSource(StatefulSource):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self,
connection_string: str,
query: str,
starting_cursors: dict,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add comment the possible values that can be set in starting_cursors ?

self._backoff_start_delay,
self._backoff_factor,
self._backoff_max_delay,
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants