Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7674] contacts_list_memberships incremental using client filt… #38128

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented May 10, 2024

…ering

What

Addressing https://github.com/airbytehq/airbyte-internal-issues/issues/7674

A customer of ours is using this stream but this is emitted too many records leading to some very expensive syncs. We don't have a good way to do server side filtering so we will use client side filtering for now.

Deletes are not required for this.

How

Having ContactsListMemberships inherit from ClientSideIncrementalStream

Review guide

  1. airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py
  2. airbyte-integrations/connectors/source-hubspot/unit_tests/integrations/test_contacts_list_memberships.py
  3. The rest

User Impact

The stream now supports incremental. Indeed python main.py discover --config secrets/config.json will yield

{
    "type": "CATALOG",
    "catalog": {
        "streams": [
            <...>
            {
                "name": "contacts_list_memberships",
                "json_schema": <...>
                "supported_sync_modes": [
                    "full_refresh",
                    "incremental"
                ],
                "source_defined_cursor": true,
                "default_cursor_field": [
                    "timestamp"
                ],
                "source_defined_primary_key": [
                    [
                        "canonical-vid"
                    ]
                ]
            }
            <...>
        ]
    }
}

Using main.py read --config secrets/config.json --catalog sample_files/test_catalog.json --debug

test_catalog.json

{
  "streams": [
    {
      "stream": {
        "name": "contacts_list_memberships",
        "json_schema": {},
        "supported_sync_modes": ["full_refresh", "incremental"]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}

I get {"type": "LOG", "log": {"level": "INFO", "message": "Read 1025 records from contacts_list_memberships stream"}}

Running the same thing with state { "timestamp": "1714412405830" }, I get {"type": "LOG", "log": {"level": "INFO", "message": "Read 3 records from contacts_list_memberships stream"}}.

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

@maxi297 maxi297 requested a review from a team as a code owner May 10, 2024 20:05
@maxi297 maxi297 requested a review from brianjlai May 10, 2024 20:05
Copy link

vercel bot commented May 10, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview May 13, 2024 0:22am

@@ -891,7 +891,7 @@ def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mappin
# save the state
self.state = {self.cursor_field: int(max_state) if int_field_type else max_state}
# emmit record if it has bigger cursor value compare to the state (`True` only)
return record_value > state_value
return record_value >= state_value
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very edge case but we had a similar discussion for Salesforce here

@@ -1369,6 +1369,7 @@ class ContactsAllBase(Stream):
page_filter = "vidOffset"
page_field = "vid-offset"
primary_key = "canonical-vid"
limit_field = "count"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous implementation was using limit instead of count. This should have no impact as we pass the default value which is 100

@maxi297 maxi297 requested a review from bazarnov May 10, 2024 20:09
Copy link
Collaborator

@bazarnov bazarnov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question: did we gain any speed boost out of this change?

@maxi297
Copy link
Contributor Author

maxi297 commented May 10, 2024

General question: did we gain any speed boost out of this change?

@bazarnov The goal what not speed boost. I expect there would be none in that case. The goal is to only emits the record the customer needs as some customers will have too much data on full_refresh and hence it will cost a lot

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes to contacts_list_memberships seems reasonable, but i also think that we might as well port contacts_form_submissions and contacts_merged_audit to the same functionality because all three can be expressed as semi-incremental.

Looking at the graph you mentioned in the RFR PR here , it sounds like the biggest issue w/ these streams is the cost associated with a full refresh load on every attempt to the destination warehouse.

And if so, I'm okay just saying we make all of them semi-incremental instead of RFR since that solves the issue of cost and reliability is already high enough

@maxi297
Copy link
Contributor Author

maxi297 commented May 13, 2024

@brianjlai you are right that it would have other benefits (i.e. stream would be smaller and therefore there would be smaller load on the destination) and that there are other streams that could provide this benefit.

I terms of sequencing, I would release this change for our client to be able to leverage this and we can make contacts_form_submissions and contacts_merged_audit changes as part of the RFR project. If it works in terms of timing for you, the Critical Connectors team can take it once we get to the RFR project

@maxi297 maxi297 merged commit 1855408 into master May 13, 2024
33 checks passed
@maxi297 maxi297 deleted the issue-7674/hubspot-contacts-list-membership-client-side-filtering-incremental branch May 13, 2024 12:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/source/hubspot
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants