Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Mixpanel - Add new datatime formats for state for cohort_members stream, added obsolete state reset for cohort_members stream #38066

Merged
merged 42 commits into from May 23, 2024

Conversation

midavadim
Copy link
Collaborator

@midavadim midavadim commented May 8, 2024

What

ERROR # 1 cohorts stream

  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in read
    yield from self._read_stream(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 236, in _read_stream
    for record_data_or_message in record_iterator:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py", line 126, in read
    slices = self.stream_slices(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 142, in stream_slices
    return self.retriever.stream_slices()
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 375, in stream_slices
    return self.stream_slicer.stream_slices()
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 168, in stream_slices
    start_datetime = self._calculate_earliest_possible_value(self._select_best_end_datetime())
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 174, in _calculate_earliest_possible_value
    cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state())
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 185, in _calculate_cursor_datetime_from_state
    return self.parse_date(stream_state[self._cursor_field.eval(self.config)])
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 234, in parse_date
    raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
ValueError: No format in ['%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z'] matching 2024-04-16T14:32:42

Error 2 - cohort_members stream

  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in read
    yield from self._read_stream(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 236, in _read_stream
    for record_data_or_message in record_iterator:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py", line 145, in read
    for record_data_or_message in records:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 120, in read_records
    yield from self.retriever.read_records(self.get_json_schema(), stream_slice)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 324, in read_records
    for stream_data in self._read_pages(record_generator, self.state, _slice):
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 294, in _read_pages
    next_page_token = self._next_page_token(response)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 258, in _next_page_token
    return self._paginator.next_page_token(response, self._records_from_last_response)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py", line 105, in next_page_token
    self._token = self.pagination_strategy.next_page_token(response, last_records)
  File "/airbyte/integration_code/source_mixpanel/components.py", line 278, in next_page_token
    if self._total and page_number is not None and self._total > self.page_size * (page_number + 1):
AttributeError: 'EngagePaginationStrategy' object has no attribute '_total'

Error 3 cohort_members::


		
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in read
    yield from self._read_stream(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 222, in _read_stream
    stream_instance.state = stream_state  # type: ignore # we check that state in the dir(stream_instance)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 85, in state
    self.retriever.state = state
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 385, in state
    self.cursor.set_initial_state(value)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py", line 86, in set_initial_state
    for state in stream_state["states"]:
KeyError: 'states'
  },

Problem 4:
Fixing non-unique primary key for cohort_members stream
#37833
Old key: distinct_id
New key: distinct_id, cohort_id

How

  1. Added new datetime formats for state for cohort_members stream,
  2. state for cohort_members stream is changed to new per-partition format
  3. initialize total
  4. changed key to distinct_id, cohort_id

Review guide

User Impact

Breaking change for CohortMembers stream:

  • State is changed to new empty per-partition format.
  • Key is changed to new unique key (based on 'distinct_id' and 'cohort_id' fields) since previous key was not unique and didn't support possibility for user be in a few different cohorts.

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

… obsolete state reset for cohort_members stream
Copy link

vercel bot commented May 8, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview May 23, 2024 11:58am

@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/source/mixpanel labels May 8, 2024
@midavadim midavadim changed the title Source Mixpanel Source Mixpanel - Add new datatime formats for state for cohort_members stream, added obsolete state reset for cohort_members stream May 8, 2024
@midavadim midavadim requested a review from lazebnyi May 8, 2024 18:34
@midavadim midavadim changed the title Source Mixpanel - Add new datatime formats for state for cohort_members stream, added obsolete state reset for cohort_members stream 🐛 Source Mixpanel - Add new datatime formats for state for cohort_members stream, added obsolete state reset for cohort_members stream May 8, 2024
@midavadim midavadim requested a review from girarda May 8, 2024 21:34
@octavia-squidington-iv octavia-squidington-iv requested a review from a team May 9, 2024 23:11
@midavadim midavadim requested a review from lazebnyi May 17, 2024 13:20
Copy link
Collaborator

@lazebnyi lazebnyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a few comments. main questions

  • can semi-incremental be released separately from the bug fixes?
  • can the filters only rely on stream_interval instead of stream_state?

@midavadim
Copy link
Collaborator Author

left a few comments. main questions

  • can semi-incremental be released separately from the bug fixes?
  • can the filters only rely on stream_interval instead of stream_state?
  1. "semi-incremental" - WIthout filtering each incremental syncs returns all records. it is implemented in order to support proper 'incremental' sync which existed in python version of this connector (2.2.0). When we were discussing this change, it was agreed that we must implement the same behavior as it was before low code otherwise it is treated as another breaking change. Please correct me if this is a wrong statement.

  2. "stream_interval " - I have rewritten filters to use stream_interval . It is indeed much more comfortable to use instead of stream_state and stream_slice, especially for streams with partitioning.

@midavadim midavadim requested a review from girarda May 20, 2024 21:24
# Conflicts:
#	airbyte-integrations/connectors/source-mixpanel/metadata.yaml
#	airbyte-integrations/connectors/source-mixpanel/pyproject.toml
#	docs/integrations/sources/mixpanel.md
Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good thanks @midavadim !
:shipit:

docs/integrations/sources/mixpanel.md Outdated Show resolved Hide resolved
releaseStage: generally_available
releases:
breakingChanges:
3.0.0:
message:
In this release, state for CohortMembers is changed to per partition format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lazebnyi @midavadim don't we need to have impactscope?
-let's change this message to be more customer friendly. This information is good, but I think it belongs in the migration guide, not the breaking change warning.
-please pic a format for the stream either CohortMembers or cohort_members
-user needs to understand what action needs to be taken. General format is "x stream has changed for y reason. Please reset x stream. See migration guide for further details."

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

In this release, we introduce breaking change for `CohortMembers` stream:
- State is changed to per-partition format.
- Key is changed to correct unique key (based on 'distinct_id' and 'cohort_id' fields) since previous key was not unique and didn't support possibility for user be in a few different cohorts.
Semi-incremental `Cohorts`, `CohortMembers` and `Engage` streams with client-side filtering extract records since user provided or default (1 year old) start_date.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you are saying here. Can you explain it a different way?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

docs/integrations/sources/mixpanel-migrations.md Outdated Show resolved Hide resolved
@octavia-squidington-iv octavia-squidington-iv requested a review from a team May 22, 2024 20:43
Co-authored-by: katmarkham <40400595+katmarkham@users.noreply.github.com>
midavadim and others added 2 commits May 23, 2024 14:51
Co-authored-by: katmarkham <40400595+katmarkham@users.noreply.github.com>
@lazebnyi lazebnyi merged commit 751b7af into master May 23, 2024
31 checks passed
@lazebnyi lazebnyi deleted the midavadim/mixpanel-datatime-fix branch May 23, 2024 13:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/source/mixpanel
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants