-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add _SQLSource and SQLInput to bytewax.connectors.sql #263
base: main
Are you sure you want to change the base?
Conversation
@@ -0,0 +1,108 @@ | |||
"""Connectors for [SQL](https://en.wikipedia.org/wiki/SQL). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this connector is ok to be in bytewax, I suggest to update docs:
https://github.com/bytewax/bytewax/blob/main/docs/articles/getting-started/ins_and_outs.md
] | ||
|
||
|
||
class _SQLSource(StatefulSource): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add documentations in each classes like:
https://github.com/bytewax/bytewax/blob/main/pysrc/bytewax/connectors/files.py
self, | ||
connection_string: str, | ||
query: str, | ||
starting_cursors: dict, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add comment the possible values that can be set in starting_cursors ?
self._backoff_start_delay, | ||
self._backoff_factor, | ||
self._backoff_max_delay, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to add tests here:
https://github.com/bytewax/bytewax/tree/main/pytests/connectors
Hi everyone 👋
In the context of this Slack thread I opened this MR to share a new kind of connector I wrote for bytewax.
The jist of it is that it uses SQLAlchemy core operations to abstract access to the SQL compatible DB (you will also depend on the underlying engine, our case is Postgres).
It presents this interface:
simple example usage:
This returns a stream of events of this type:
{"timestamp": datetime.datetime(*), "signal_1": <signal_value>}
The query requirement is that it needs to select the cursors and the variables in the query need to have the same name of the selected cursors. (I still don't check this)
In the example I gave this happens because I select
"timestamp"
and in my WHERE I have a:timestamp
variable.If there is interest in this I can add tests and docs too!