-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🚨 🚨 ✨ Source Tik Tok Marketing: Migration to Low-Code #38316
base: master
Are you sure you want to change the base?
🚨 🚨 ✨ Source Tik Tok Marketing: Migration to Low-Code #38316
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
…pe for dimensions transformations
else: | ||
stream_state = stream_states[0] | ||
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token} | ||
return [record for record in records if self._filter_interpolator.eval(self.config, record=record, **kwargs)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update to align with original signature (Iterable)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
airbyte-integrations/connectors/source-tiktok-marketing/source_tiktok_marketing/manifest.yaml
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-tiktok-marketing/source_tiktok_marketing/manifest.yaml
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-tiktok-marketing/metadata.yaml
Outdated
Show resolved
Hide resolved
Regression test results: TestDataIntegrity.test_record_schema_match_without_state [failed] - Value of root['properties']['budget']['type'] changed from "integer" to "number". Value of root['properties']['roas_bid']['type'] changed from "integer" to "number". (same error for all fields with type number in schema but actual type is integer). TestDataIntegrity.test_all_records_are_the_same_without_state [failed] - Same differences with integer/number as above. Read URLs: some requests in py version due to HttpAvailabilityStrategy PS: Reviewer can ask me to send the full html report in slack dm. Regression tests were running locally as I needed to change start date in config and chose testing without state due to breaking changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the manifest and the schemas overall look good and given the size of the manifest and number of streams, I am going to trust that we've carefully run live tests to verify that the changes are working and the breaking changes are expected. I didn't see anything glaring.
I did however have some questions to clarify my understanding for the custom components and some suggestions on the code itself. Especially around why exactly we need for two types of advertiser id (+ids) partition routers.
field as API docs declares it. | ||
Users will need to reset source configuration, refresh the source schema and reset the impacted streams after upgrading. | ||
For more information, see our migration documentation for source TikTok Marketing. | ||
upgradeDeadline: "2024-08-07" # TODO: update this date before merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@katmarkham what is the actual intended deadline date?
path: "{{ parameters['path'] }}" | ||
http_method: GET | ||
error_handler: | ||
type: CompositeErrorHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we get rid of the CompositeErrorHandler
and just use DefaultErrorHandler
at the top level. This is not on you, but we are making plans to deprecate this component and we can save the trouble of fixing this connector if we get rid of it now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated with DefaultErrorHandler
Custom AdvertiserIdsPartitionRouter and AdvertiserIdPartitionRouter partition routers are used to get advertiser_ids | ||
as slices for streams where it uses as request param. | ||
|
||
When user uses sandbox account it's impossible to get advertiser_ids via API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"When using a sandbox account, it's impossible to get advertiser_ids via API."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated comment
Main difference between AdvertiserIdsPartitionRouter and AdvertiserIdPartitionRouter is | ||
that AdvertiserIdPartitionRouter returns multiple advertiser_ids in a one slice when id is not provided, | ||
e.g. {"advertiser_ids": '["11111111", "22222222"]', "parent_slice": {}}. | ||
And AdvertiserIdPartitionRouter returns single slice for every advertiser_id as usual. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this sentence is unclear because you might have a typo. Starting with "is that ...":
AdvertiserIdPartitionRouter returns multiple advertiser_ids in a one slice when id is not provided
AdvertiserIdPartitionRouter returns single slice for every advertiser_id as usual.
I think one of these should reference AdvertiserIdsPartitionRouter.
But in the context of these components, can you add to you comment why we even need to return a single slice with multiple advertiser_ids
in the same slice. This seems unnecessarily complex given that the other flow is returning one slice per ID which is the normal convention.
I think we should try to avoid multiple advertiser_ids
in a single slice if we can avoid it. Couldn't we just read in the config's advertiser ids and return multiple slices?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one of these should reference AdvertiserIdsPartitionRouter.
updated with correct names.
I think we should try to avoid multiple advertiser_ids in a single slice if we can avoid it. Couldn't we just read in the config's advertiser ids and return multiple slices?
MultipleAdvertiserIdPartitionRouter
is used only in one stream Advertisers to fetch more then one advertisers at once, reduce amount of requests. It's how it was implemented in py version of the connector. All other streams use SingleAdvertiserIdPartitionRouter
. advertiser ids
is not required property in the config so we can't avoid reading parent stream.
yield StreamSlice(partition={"advertiser_ids": json.dumps(slices[i : min(end, i + step)]), "parent_slice": {}}, cursor_slice={}) | ||
|
||
|
||
class SingleAdvertiserIdPartitionRouter(MultipleAdvertiserIdsPartitionRouter): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you incorporate some of the above description into this partition router as well. I think my above question still is relevant. Why can't we return one slice per partition_value_in_config
for better consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added description.
Why can't we return one slice per partition_value_in_config for better consistency?
We return one slice per partition_value_in_config if it's in the provided config or read stream slices if it's not provided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We return one slice per partition_value_in_config if it's in the provided config or read stream slices if it's not provided.
Makes sense thanks.
MultipleAdvertiserIdPartitionRouter is used only in one stream Advertisers to fetch more then one advertisers at once, reduce amount of requests. It's how it was implemented in py version of the connector. All other streams use SingleAdvertiserIdPartitionRouter. advertiser ids is not required property in the config so we can't avoid reading parent stream.
Got it, I think that's the missing part I wanted to understand was the why and it sounds like reducing the amount of requests is the intent. Can you include that in the description so we know why we have a different component.
I'm also curious in your migration, how much are we reducing requests? I imagine customers don't have that many advertiser_id
s so I'm surprised that putting this into a single slice saves us that many requests when its only used by the advertisers
stream.
To be honest, I'm questioning whether or not this is even worth keeping. If we are only saving a few requests, I think it would be better to just have one cursor even if it is a minor regression for one stream. I think it just depends on how much we save
) -> Iterable[Mapping[str, Any]]: | ||
stream_states = None | ||
if stream_state: | ||
stream_states = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not always clear from the shape of the state object, but I'm pretty certain that we only have one cursor per partition, and that fits with your below logic that we get only the first stream_states[0]
.
I think we can simplify this logic a bit and it also avoids us having to iterate over the entire state object every time:
stream_state = next((p["cursor"] for p in stream_state["states"] if p["partition"][self._partition_field] == stream_slice[self._partition_field]), {})
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
...
This should hopefully get the first (and only match of partition) and then defaults to {}
and the rest can operate on the same flow. Let me know if that makes sense and works with your original intent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works as expected. Thanks for suggestion. Updated code.
app_id: "{{ config.get('credentials', config.get('environment', {})).get('app_id', 0) }}" | ||
request_headers: {} | ||
authenticator: | ||
$ref: "#/definitions/authenticator" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice that we have a lot of places in the stream definitions themselves that assign authenticator to $ref: "#/definitions/authenticator"
. Since we only have one type of authenticator, why do we also need to assign it here? It looks like in some of the reusable components, we're already assigning the authenticator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I refactored this part of code, moved #/definitions/authenticator" to requester and removed re-assigning of it.
self, start: datetime.datetime, end: datetime.datetime, step: Union[datetime.timedelta, Duration] | ||
) -> List[StreamSlice]: | ||
start = start.replace(hour=0, minute=0, second=0) | ||
return super()._partition_daterange(start, end, step) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might result in some duplicative code, but I think we should be cautious not to override the private method _partition_daterange()
. Because it's intended to be private, we're more likely to make underlying changes to this w/o considering how dependent connectors use this method.
Instead, can we override the public stream_slices()
implementation which is the main method that in turn invokes the logic for _partition_daterange()
.
So it would look roughly like:
def stream_slices():
# get best end time
# reset start to the 0 hour of the provided start
# copy the logic from the `_partition_daterange`
I don't love the duplicated logic so its not a perfect solution, but it does make it more resilient to any upstream changes that could break the connector a republish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved replacing of h/m/s to stream_slices
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A note on naming and Just one last discussion point on the need for the MultipleAdvertiserIdsPartitionRouter
. given it's only used on one stream, depending on how drastically it reduces requests, I think we might want to get rid of it even if that deviates from the original behavior.
What I want to figure out is how much the separate partition router benefits us. Basically, does combining the advertiser ids into a single slice results in them all getting bundled up and we only have to go through a single full iteration? Versus, if we separate them into individual slices and that means we have to perform one full iteration per advertiser_id slice. For example, if we have 5 advertiser_ids, then we end up making 5x the requests. If thats the case we can leave as is.
After we clear that up this is good to go. nice work!
yield StreamSlice(partition={"advertiser_ids": json.dumps(slices[i : min(end, i + step)]), "parent_slice": {}}, cursor_slice={}) | ||
|
||
|
||
class SingleAdvertiserIdPartitionRouter(MultipleAdvertiserIdsPartitionRouter): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We return one slice per partition_value_in_config if it's in the provided config or read stream slices if it's not provided.
Makes sense thanks.
MultipleAdvertiserIdPartitionRouter is used only in one stream Advertisers to fetch more then one advertisers at once, reduce amount of requests. It's how it was implemented in py version of the connector. All other streams use SingleAdvertiserIdPartitionRouter. advertiser ids is not required property in the config so we can't avoid reading parent stream.
Got it, I think that's the missing part I wanted to understand was the why and it sounds like reducing the amount of requests is the intent. Can you include that in the description so we know why we have a different component.
I'm also curious in your migration, how much are we reducing requests? I imagine customers don't have that many advertiser_id
s so I'm surprised that putting this into a single slice saves us that many requests when its only used by the advertisers
stream.
To be honest, I'm questioning whether or not this is even worth keeping. If we are only saving a few requests, I think it would be better to just have one cursor even if it is a minor regression for one stream. I think it just depends on how much we save
from airbyte_cdk.sources.declarative.types import StreamSlice | ||
|
||
|
||
class MultipleAdvertiserIdsPartitionRouter(SubstreamPartitionRouter): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename this to MultipleAdvertiserIdsPerPartition
and the below one SingleAdvertiserIdPerPartition
. I think we want to really make it clear that what is happening is that it is how many advertiser_ids are in one partition. Because when named SingleAdvertiserIdPartitionRouter
makes it sound like we only ever get one advertiser id.
What
resolved: https://github.com/airbytehq/airbyte-internal-issues/issues/7824
How
Migrated source to use low-code cdk instead of python cdk.
Regression tests are described here: #38316 (comment)
Main changes:
lifetime=true
as request param, which is deprecated on API v1.3. Now lifetime reports usequery_lifetime=true
, with this param start_date and end_date should not be provided. Exception: advertiser_lifetime_report: API v1.3 doesn't allow query_lifetime=true` with advertiser reports, so this stream was implemented exactly as in py version with start_date and end_date query params(range >=365d)Review guide
User Impact
Breaking change users will need to follow migration guide for affected streams.
Can this PR be safely reverted and rolled back?
Breaking change due to changes in schema and state format.