New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(airbyte-cdk): add client side incremental sync #38099
base: master
Are you sure you want to change the base?
feat(airbyte-cdk): add client side incremental sync #38099
Conversation
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
…205/airbyte-cdk-add-client-side-incremental # Conflicts: # airbyte-cdk/python/poetry.lock
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
…205/airbyte-cdk-add-client-side-incremental # Conflicts: # airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py # airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py # airbyte-cdk/python/poetry.lock
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
…-side-incremental' into artem1205/airbyte-cdk-add-client-side-incremental
Tested on Mailchimp
Index: airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml
--- a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml (revision 026ae3d2cf3cb5c0ef8ec0df5c48c8685f1d6606)
+++ b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml (date 1716212745673)
@@ -247,9 +247,6 @@
partition_field: id
record_selector:
$ref: "#/definitions/retriever/record_selector"
- record_filter:
- type: CustomRecordFilter
- class_name: source_mailchimp.components.MailChimpRecordFilter
incremental_sync:
type: DatetimeBasedCursor
cursor_datetime_formats:
@@ -261,6 +258,7 @@
datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}"
datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ"
lookback_window: PT0.1S
+ is_client_side_incremental: true
transformations:
- type: AddFields
fields:
@@ -289,9 +287,6 @@
partition_field: id
record_selector:
$ref: "#/definitions/retriever/record_selector"
- record_filter:
- type: CustomRecordFilter
- class_name: source_mailchimp.components.MailChimpRecordFilter
incremental_sync:
type: DatetimeBasedCursor
cursor_datetime_formats:
@@ -303,6 +298,7 @@
datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}"
datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ"
lookback_window: PT0.1S
+ is_client_side_incremental: true
state_migrations:
- type: LegacyToPerPartitionStateMigration
$parameters:
|
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
@@ -800,6 +800,10 @@ definitions: | |||
title: Whether the target API is formatted as a data feed | |||
description: A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination. | |||
type: boolean | |||
is_client_side_incremental: | |||
title: Whether the target API does not support filtering and returns all data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a good suggestion, but I would love this title to describe why we call this client_side_incremental
. Something like Whether the cursor filters records in the client instead of the API side
with a description that explains why this happens and what this implies.
A second description sentence could be like:
If the target API endpoint does not take cursor values to filter records and returns all records anyway, the connector with this cursor will filter out records locally, and only emit new records from the last sync, hence incremental. This means that all records would be read from the API, but only new records will be emitted to the destination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
description updated
airbyte-cdk/python/pyproject.toml
Outdated
@@ -64,7 +64,7 @@ cryptography = "^42.0.5" | |||
pytz = "2024.1" | |||
|
|||
[tool.poetry.group.dev.dependencies] | |||
datamodel_code_generator = "0.11.19" | |||
datamodel_code_generator = "0.25.4" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentional? I'm all for updates, but it's a big jump. Is this safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentional?
Yes. Old version used single quote. Newer version has use-double-quotes
option.
Is this safe?
Absolutely, no changes except formatting were introduced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you submit a separate PR for this change since it's not directly related to semi-incremental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted; will submit another PR with datamodel_code_generator
update
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
@@ -36,3 +37,72 @@ def filter_records( | |||
for record in records: | |||
if self._filter_interpolator.eval(self.config, record=record, **kwargs): | |||
yield record | |||
|
|||
|
|||
class ClientSideIncrementalRecordFilterDecorator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect this class to extend RecordFilter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
state_value = self._get_state_value(stream_state, stream_slice) | ||
filter_date = self._get_filter_date(state_value) | ||
if filter_date: | ||
records = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we yield the records instead of keeping them all in a tuple?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a generator, not a tuple
def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice) -> Optional[str]: | ||
if self._per_partition_cursor: | ||
partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice) | ||
return partition_state.get(self._cursor_field) if partition_state else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the else case is unexpected, right? Let's raise an exception if that's the case because it's indicative of a bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Else case is expected, e.g. we have parent-child stream and we already make initial sync, so stream_state is present. During the second read, we receive one extra record from parent and therefore no stream_state for this record will be found.
|
||
def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice) -> Optional[str]: | ||
if self._per_partition_cursor: | ||
partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it a little surprising that we don't use the stream_state
parameter if if the cursor is per_partition
.
Can you add a comment explaining why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment added
return stream_state.get(self._cursor_field) | ||
|
||
def _get_filter_date(self, state_value: Optional[str]) -> Optional[datetime.datetime]: | ||
start_date_parsed = self._start_date_from_config or None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't self._start_date_from_config
always be set? when would it evaluate to false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
||
def _get_filter_date(self, state_value: Optional[str]) -> Optional[datetime.datetime]: | ||
start_date_parsed = self._start_date_from_config or None | ||
state_date_parsed = self._date_time_based_cursor.parse_date(state_value) if state_value else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when would state_value
evaluate to false/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state_value would be None if it's the very first sync OR state for this parent record could not be found in the list of saved states
def _get_filter_date(self, state_value: Optional[str]) -> Optional[datetime.datetime]: | ||
start_date_parsed = self._start_date_from_config or None | ||
state_date_parsed = self._date_time_based_cursor.parse_date(state_value) if state_value else None | ||
return max((x for x in (start_date_parsed, state_date_parsed) if x), default=None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd always expect a value to be returned here. In what scenarios would we expect the filter date to be None
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, start_date will be always presented
|
||
@property | ||
def _cursor_field(self) -> Union[str, Any]: | ||
return self._date_time_based_cursor._cursor_field.eval(self._date_time_based_cursor.config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's makecursor_field
public instead of accessing the private field. I think it's a reasonable expectation for DatetimeBasedCursor
to have a cursor_field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return self._date_time_based_cursor._cursor_field.eval(self._date_time_based_cursor.config) | ||
|
||
@property | ||
def _start_date_from_config(self) -> Union[datetime.datetime, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why Any
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci] Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
What
Resolve https://github.com/airbytehq/airbyte-internal-issues/issues/7597
update
datamodel-codegen
and set "use-double-quotes" flagHow
is_client_side_incremental
totrue
inDatetimeBasedCursor
component.This option will add
ClientSideIncrementalRecordFilterDecorator
to filter out old records.Review guide
docs/connector-development/connector-builder-ui/incremental-sync.md
airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml
airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
User Impact
Can this PR be safely reverted and rolled back?