Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airbyte-cdk): add client side incremental sync #38099

Open
wants to merge 40 commits into
base: master
Choose a base branch
from

Conversation

artem1205
Copy link
Collaborator

@artem1205 artem1205 commented May 9, 2024

What

Resolve https://github.com/airbytehq/airbyte-internal-issues/issues/7597
update datamodel-codegen and set "use-double-quotes" flag

How

  • set flag is_client_side_incremental to true in DatetimeBasedCursor component.
    This option will add ClientSideIncrementalRecordFilterDecorator to filter out old records.

Review guide

  1. docs/connector-development/connector-builder-ui/incremental-sync.md
  2. airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  3. airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py
  4. airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

User Impact

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
@artem1205 artem1205 self-assigned this May 9, 2024
Copy link

vercel bot commented May 9, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview May 22, 2024 2:20pm

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label May 9, 2024
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
artem1205 and others added 16 commits May 10, 2024 13:31
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
…205/airbyte-cdk-add-client-side-incremental

# Conflicts:
#	airbyte-cdk/python/poetry.lock
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
…205/airbyte-cdk-add-client-side-incremental

# Conflicts:
#	airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py
#	airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py
#	airbyte-cdk/python/poetry.lock
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
…-side-incremental' into artem1205/airbyte-cdk-add-client-side-incremental
@artem1205
Copy link
Collaborator Author

Tested on Mailchimp

  • custom component was removed + set is_client_side_incremental: true
Index: airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml
--- a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml	(revision 026ae3d2cf3cb5c0ef8ec0df5c48c8685f1d6606)
+++ b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml	(date 1716212745673)
@@ -247,9 +247,6 @@
             partition_field: id
       record_selector:
         $ref: "#/definitions/retriever/record_selector"
-        record_filter:
-          type: CustomRecordFilter
-          class_name: source_mailchimp.components.MailChimpRecordFilter
     incremental_sync:
       type: DatetimeBasedCursor
       cursor_datetime_formats:
@@ -261,6 +258,7 @@
         datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}"
         datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ"
       lookback_window: PT0.1S
+      is_client_side_incremental: true
     transformations:
       - type: AddFields
         fields:
@@ -289,9 +287,6 @@
             partition_field: id
       record_selector:
         $ref: "#/definitions/retriever/record_selector"
-        record_filter:
-          type: CustomRecordFilter
-          class_name: source_mailchimp.components.MailChimpRecordFilter
     incremental_sync:
       type: DatetimeBasedCursor
       cursor_datetime_formats:
@@ -303,6 +298,7 @@
         datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}"
         datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ"
       lookback_window: PT0.1S
+      is_client_side_incremental: true
     state_migrations:
       - type: LegacyToPerPartitionStateMigration
     $parameters:

airbyte-ci connectors --use-local-cdk --name=source-mailchimp build

image

airbyte-ci connectors --use-local-cdk --name=source-mailchimp test

image

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
@artem1205 artem1205 marked this pull request as ready for review May 20, 2024 19:00
@artem1205 artem1205 requested a review from a team as a code owner May 20, 2024 19:00
@artem1205 artem1205 changed the title Airbyte CDK: add client side incremental sync feat(airbyte-cdk): add client side incremental sync May 20, 2024
@@ -800,6 +800,10 @@ definitions:
title: Whether the target API is formatted as a data feed
description: A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination.
type: boolean
is_client_side_incremental:
title: Whether the target API does not support filtering and returns all data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a good suggestion, but I would love this title to describe why we call this client_side_incremental. Something like Whether the cursor filters records in the client instead of the API side with a description that explains why this happens and what this implies.

A second description sentence could be like:

If the target API endpoint does not take cursor values to filter records and returns all records anyway, the connector with this cursor will filter out records locally, and only emit new records from the last sync, hence incremental. This means that all records would be read from the API, but only new records will be emitted to the destination.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

description updated

@@ -64,7 +64,7 @@ cryptography = "^42.0.5"
pytz = "2024.1"

[tool.poetry.group.dev.dependencies]
datamodel_code_generator = "0.11.19"
datamodel_code_generator = "0.25.4"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional? I'm all for updates, but it's a big jump. Is this safe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional?

Yes. Old version used single quote. Newer version has use-double-quotes option.

Is this safe?

Absolutely, no changes except formatting were introduced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you submit a separate PR for this change since it's not directly related to semi-incremental?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted; will submit another PR with datamodel_code_generator update

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
@@ -36,3 +37,72 @@ def filter_records(
for record in records:
if self._filter_interpolator.eval(self.config, record=record, **kwargs):
yield record


class ClientSideIncrementalRecordFilterDecorator:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this class to extend RecordFilter

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

state_value = self._get_state_value(stream_state, stream_slice)
filter_date = self._get_filter_date(state_value)
if filter_date:
records = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we yield the records instead of keeping them all in a tuple?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a generator, not a tuple

def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice) -> Optional[str]:
if self._per_partition_cursor:
partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice)
return partition_state.get(self._cursor_field) if partition_state else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the else case is unexpected, right? Let's raise an exception if that's the case because it's indicative of a bug

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Else case is expected, e.g. we have parent-child stream and we already make initial sync, so stream_state is present. During the second read, we receive one extra record from parent and therefore no stream_state for this record will be found.


def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice) -> Optional[str]:
if self._per_partition_cursor:
partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it a little surprising that we don't use the stream_state parameter if if the cursor is per_partition.

Can you add a comment explaining why?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment added

return stream_state.get(self._cursor_field)

def _get_filter_date(self, state_value: Optional[str]) -> Optional[datetime.datetime]:
start_date_parsed = self._start_date_from_config or None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't self._start_date_from_config always be set? when would it evaluate to false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


def _get_filter_date(self, state_value: Optional[str]) -> Optional[datetime.datetime]:
start_date_parsed = self._start_date_from_config or None
state_date_parsed = self._date_time_based_cursor.parse_date(state_value) if state_value else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would state_value evaluate to false/

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state_value would be None if it's the very first sync OR state for this parent record could not be found in the list of saved states

def _get_filter_date(self, state_value: Optional[str]) -> Optional[datetime.datetime]:
start_date_parsed = self._start_date_from_config or None
state_date_parsed = self._date_time_based_cursor.parse_date(state_value) if state_value else None
return max((x for x in (start_date_parsed, state_date_parsed) if x), default=None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd always expect a value to be returned here. In what scenarios would we expect the filter date to be None?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, start_date will be always presented


@property
def _cursor_field(self) -> Union[str, Any]:
return self._date_time_based_cursor._cursor_field.eval(self._date_time_based_cursor.config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's makecursor_field public instead of accessing the private field. I think it's a reasonable expectation for DatetimeBasedCursor to have a cursor_field

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, updated

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@girarda , after change of cursor_field -> cursor_field in c9b5e41

mypy raises an exception:

datetime_based_cursor.py:185: error: Item "str" of "InterpolatedString | str" has no attribute "eval"  [union-attr]

It seems mypy does not take into account post_init changes,
Should we just ignore this ?

return self._date_time_based_cursor._cursor_field.eval(self._date_time_based_cursor.config)

@property
def _start_date_from_config(self) -> Union[datetime.datetime, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Any?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
[skip ci]

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
@artem1205 artem1205 requested a review from girarda May 22, 2024 14:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/documentation Improvements or additions to documentation CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants