Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚨 🚨 ✨ Source Tik Tok Marketing: Migration to Low-Code #38316

Open
wants to merge 37 commits into
base: master
Choose a base branch
from

Conversation

darynaishchenko
Copy link
Collaborator

@darynaishchenko darynaishchenko commented May 17, 2024

What

resolved: https://github.com/airbytehq/airbyte-internal-issues/issues/7824

How

Migrated source to use low-code cdk instead of python cdk.
Regression tests are described here: #38316 (comment)
Main changes:

  • State: Previously all incremental streams used incorrect state without partition. On low-code cdk all incremental streams use per partition state.
  • Lifetime reports: Previously implementation used lifetime=true as request param, which is deprecated on API v1.3. Now lifetime reports use query_lifetime=true, with this param start_date and end_date should not be provided. Exception: advertiser_lifetime_report: API v1.3 doesn't allow query_lifetime=true` with advertiser reports, so this stream was implemented exactly as in py version with start_date and end_date query params(range >=365d)
  • Advertiser Ids stream: schema was changed to use advertiser_id as type of stream to be up to date with API docs.
  • Discover for configs with granularity: In py implementation were missing streams(campaigns_audience_reports, ad_group_audience_reports_by_platform, ad_group_audience_reports_by_country, ads_audience_reports_by_country, advertisers_audience_reports_by_country, campaigns_audience_reports_by_platform, advertisers_audience_reports_by_platform, ads_audience_reports_by_platform, ads_audience_reports_by_province), which users with provided granularity actually can use but streams method didn't return them. For configs with granularity source removes granularity from stream name as it was previously named.

Review guide

User Impact

Breaking change users will need to follow migration guide for affected streams.

Can this PR be safely reverted and rolled back?

Breaking change due to changes in schema and state format.

  • YES 💚
  • NO ❌

@darynaishchenko darynaishchenko self-assigned this May 17, 2024
Copy link

vercel bot commented May 17, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Jun 12, 2024 11:25am

@darynaishchenko darynaishchenko marked this pull request as ready for review May 23, 2024 17:34
@octavia-squidington-iv octavia-squidington-iv requested a review from a team May 23, 2024 17:36
@darynaishchenko darynaishchenko requested a review from a team May 24, 2024 15:46
else:
stream_state = stream_states[0]
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
return [record for record in records if self._filter_interpolator.eval(self.config, record=record, **kwargs)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update to align with original signature (Iterable)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@darynaishchenko
Copy link
Collaborator Author

Regression test results:
test_catalog_are_the_same [failed] – updated advertiser_id: integer - string. (breaking change described in the docs)

TestDataIntegrity.test_record_schema_match_without_state [failed] - Value of root['properties']['budget']['type'] changed from "integer" to "number". Value of root['properties']['roas_bid']['type'] changed from "integer" to "number". (same error for all fields with type number in schema but actual type is integer).
Both versions have type number, but default type transformer was added in low code version so 0 value is changed to 0.0. For db with transformations(e.g. BigQuery) it’s not a breaking change as destination already converts this data values to a number.
Streams are in a list of breaking changes affected by state changes, so users will do refresh&clear anyway.
This change from 0 to 0.0 occured due to added Default schema normalization in low code to be compatible with stream schemas that was added for api v1.2.0 and in v1.3.0 some fields have new type. For example *_id was changed from integer to string and stream schemas for v.1.2.0 use integer as type.

TestDataIntegrity.test_all_records_are_the_same_without_state [failed] - Same differences with integer/number as above.

Read URLs: some requests in py version due to HttpAvailabilityStrategy

PS: Reviewer can ask me to send the full html report in slack dm. Regression tests were running locally as I needed to change start date in config and chose testing without state due to breaking changes.

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the manifest and the schemas overall look good and given the size of the manifest and number of streams, I am going to trust that we've carefully run live tests to verify that the changes are working and the breaking changes are expected. I didn't see anything glaring.

I did however have some questions to clarify my understanding for the custom components and some suggestions on the code itself. Especially around why exactly we need for two types of advertiser id (+ids) partition routers.

field as API docs declares it.
Users will need to reset source configuration, refresh the source schema and reset the impacted streams after upgrading.
For more information, see our migration documentation for source TikTok Marketing.
upgradeDeadline: "2024-08-07" # TODO: update this date before merge
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@katmarkham what is the actual intended deadline date?

path: "{{ parameters['path'] }}"
http_method: GET
error_handler:
type: CompositeErrorHandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we get rid of the CompositeErrorHandler and just use DefaultErrorHandler at the top level. This is not on you, but we are making plans to deprecate this component and we can save the trouble of fixing this connector if we get rid of it now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated with DefaultErrorHandler

Custom AdvertiserIdsPartitionRouter and AdvertiserIdPartitionRouter partition routers are used to get advertiser_ids
as slices for streams where it uses as request param.

When user uses sandbox account it's impossible to get advertiser_ids via API.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"When using a sandbox account, it's impossible to get advertiser_ids via API."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated comment

Main difference between AdvertiserIdsPartitionRouter and AdvertiserIdPartitionRouter is
that AdvertiserIdPartitionRouter returns multiple advertiser_ids in a one slice when id is not provided,
e.g. {"advertiser_ids": '["11111111", "22222222"]', "parent_slice": {}}.
And AdvertiserIdPartitionRouter returns single slice for every advertiser_id as usual.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this sentence is unclear because you might have a typo. Starting with "is that ...":

AdvertiserIdPartitionRouter returns multiple advertiser_ids in a one slice when id is not provided
AdvertiserIdPartitionRouter returns single slice for every advertiser_id as usual.

I think one of these should reference AdvertiserIdsPartitionRouter.

But in the context of these components, can you add to you comment why we even need to return a single slice with multiple advertiser_ids in the same slice. This seems unnecessarily complex given that the other flow is returning one slice per ID which is the normal convention.

I think we should try to avoid multiple advertiser_ids in a single slice if we can avoid it. Couldn't we just read in the config's advertiser ids and return multiple slices?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one of these should reference AdvertiserIdsPartitionRouter.

updated with correct names.

I think we should try to avoid multiple advertiser_ids in a single slice if we can avoid it. Couldn't we just read in the config's advertiser ids and return multiple slices?

MultipleAdvertiserIdPartitionRouter is used only in one stream Advertisers to fetch more then one advertisers at once, reduce amount of requests. It's how it was implemented in py version of the connector. All other streams use SingleAdvertiserIdPartitionRouter. advertiser ids is not required property in the config so we can't avoid reading parent stream.

yield StreamSlice(partition={"advertiser_ids": json.dumps(slices[i : min(end, i + step)]), "parent_slice": {}}, cursor_slice={})


class SingleAdvertiserIdPartitionRouter(MultipleAdvertiserIdsPartitionRouter):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you incorporate some of the above description into this partition router as well. I think my above question still is relevant. Why can't we return one slice per partition_value_in_config for better consistency?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added description.

Why can't we return one slice per partition_value_in_config for better consistency?

We return one slice per partition_value_in_config if it's in the provided config or read stream slices if it's not provided.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We return one slice per partition_value_in_config if it's in the provided config or read stream slices if it's not provided.

Makes sense thanks.

MultipleAdvertiserIdPartitionRouter is used only in one stream Advertisers to fetch more then one advertisers at once, reduce amount of requests. It's how it was implemented in py version of the connector. All other streams use SingleAdvertiserIdPartitionRouter. advertiser ids is not required property in the config so we can't avoid reading parent stream.

Got it, I think that's the missing part I wanted to understand was the why and it sounds like reducing the amount of requests is the intent. Can you include that in the description so we know why we have a different component.

I'm also curious in your migration, how much are we reducing requests? I imagine customers don't have that many advertiser_ids so I'm surprised that putting this into a single slice saves us that many requests when its only used by the advertisers stream.
To be honest, I'm questioning whether or not this is even worth keeping. If we are only saving a few requests, I think it would be better to just have one cursor even if it is a minor regression for one stream. I think it just depends on how much we save

) -> Iterable[Mapping[str, Any]]:
stream_states = None
if stream_state:
stream_states = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not always clear from the shape of the state object, but I'm pretty certain that we only have one cursor per partition, and that fits with your below logic that we get only the first stream_states[0].

I think we can simplify this logic a bit and it also avoids us having to iterate over the entire state object every time:

stream_state = next((p["cursor"] for p in stream_state["states"] if p["partition"][self._partition_field] == stream_slice[self._partition_field]), {})

kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
...

This should hopefully get the first (and only match of partition) and then defaults to {} and the rest can operate on the same flow. Let me know if that makes sense and works with your original intent

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works as expected. Thanks for suggestion. Updated code.

app_id: "{{ config.get('credentials', config.get('environment', {})).get('app_id', 0) }}"
request_headers: {}
authenticator:
$ref: "#/definitions/authenticator"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that we have a lot of places in the stream definitions themselves that assign authenticator to $ref: "#/definitions/authenticator". Since we only have one type of authenticator, why do we also need to assign it here? It looks like in some of the reusable components, we're already assigning the authenticator

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I refactored this part of code, moved #/definitions/authenticator" to requester and removed re-assigning of it.

self, start: datetime.datetime, end: datetime.datetime, step: Union[datetime.timedelta, Duration]
) -> List[StreamSlice]:
start = start.replace(hour=0, minute=0, second=0)
return super()._partition_daterange(start, end, step)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might result in some duplicative code, but I think we should be cautious not to override the private method _partition_daterange(). Because it's intended to be private, we're more likely to make underlying changes to this w/o considering how dependent connectors use this method.

Instead, can we override the public stream_slices() implementation which is the main method that in turn invokes the logic for _partition_daterange().

So it would look roughly like:

def stream_slices():
  # get best end time
  # reset start to the 0 hour of the provided start
  # copy the logic from the `_partition_daterange`

I don't love the duplicated logic so its not a perfect solution, but it does make it more resilient to any upstream changes that could break the connector a republish

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved replacing of h/m/s to stream_slices

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note on naming and Just one last discussion point on the need for the MultipleAdvertiserIdsPartitionRouter. given it's only used on one stream, depending on how drastically it reduces requests, I think we might want to get rid of it even if that deviates from the original behavior.

What I want to figure out is how much the separate partition router benefits us. Basically, does combining the advertiser ids into a single slice results in them all getting bundled up and we only have to go through a single full iteration? Versus, if we separate them into individual slices and that means we have to perform one full iteration per advertiser_id slice. For example, if we have 5 advertiser_ids, then we end up making 5x the requests. If thats the case we can leave as is.

After we clear that up this is good to go. nice work!

yield StreamSlice(partition={"advertiser_ids": json.dumps(slices[i : min(end, i + step)]), "parent_slice": {}}, cursor_slice={})


class SingleAdvertiserIdPartitionRouter(MultipleAdvertiserIdsPartitionRouter):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We return one slice per partition_value_in_config if it's in the provided config or read stream slices if it's not provided.

Makes sense thanks.

MultipleAdvertiserIdPartitionRouter is used only in one stream Advertisers to fetch more then one advertisers at once, reduce amount of requests. It's how it was implemented in py version of the connector. All other streams use SingleAdvertiserIdPartitionRouter. advertiser ids is not required property in the config so we can't avoid reading parent stream.

Got it, I think that's the missing part I wanted to understand was the why and it sounds like reducing the amount of requests is the intent. Can you include that in the description so we know why we have a different component.

I'm also curious in your migration, how much are we reducing requests? I imagine customers don't have that many advertiser_ids so I'm surprised that putting this into a single slice saves us that many requests when its only used by the advertisers stream.
To be honest, I'm questioning whether or not this is even worth keeping. If we are only saving a few requests, I think it would be better to just have one cursor even if it is a minor regression for one stream. I think it just depends on how much we save

from airbyte_cdk.sources.declarative.types import StreamSlice


class MultipleAdvertiserIdsPartitionRouter(SubstreamPartitionRouter):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename this to MultipleAdvertiserIdsPerPartition and the below one SingleAdvertiserIdPerPartition. I think we want to really make it clear that what is happening is that it is how many advertiser_ids are in one partition. Because when named SingleAdvertiserIdPartitionRouter makes it sound like we only ever get one advertiser id.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/source/tiktok-marketing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants