Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redshift optimization #4775

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

vikasgoyal31071992
Copy link

@vikasgoyal31071992 vikasgoyal31071992 commented Mar 17, 2024

Description

  1. Redshift Table lock issue while pipeline runs solved
  2. Redshift destination Performance increased
  • Test A
  • Test B

Checklist

  • The PR is tagged with proper labels (bug, enhancement, feature, documentation)
  • I have performed a self-review of my own code
  • I have added unit tests that prove my fix is effective or that my feature works
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • If new documentation has been added, relative paths have been added to the appropriate section of docs/mint.json

cc:

unique_constraints_clean = [
f'{self.clean_column_name(col)}'
for col in unique_constraints
]

drop_duplicate_records_from_temp = (f'DELETE FROM {full_table_name_temp} '
f'where ({", ".join(unique_constraints_clean)},_mage_created_at) '
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we capitalize the WHERE please?

Comment on lines 160 to 162
f'where ({", ".join(unique_constraints_clean)}) '
f'in (SELECT {", ".join(unique_constraints_clean)} '
f'from {full_table_name_temp})')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we capitalize the SQL keywords please?

drop_duplicate_records_from_temp,
delete_records_from_full_table,
insert_records_from_temp_table,
truncate_records_from_temp_table
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add trailing comma please.

f'from {full_table_name_temp})')

insert_records_from_temp_table = f'INSERT INTO {full_table_name} SELECT * FROM {full_table_name_temp}'
truncate_records_from_temp_table = f'TRUNCATE table {full_table_name_temp}'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalize TABLE

Copy link
Author

@vikasgoyal31071992 vikasgoyal31071992 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes Done @tommydangerous

Copy link
Author

@vikasgoyal31071992 vikasgoyal31071992 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes Done

@dy46
Copy link
Member

dy46 commented Mar 18, 2024

can you describe what tests you ran to verify this change works?

@vikasgoyal31071992
Copy link
Author

I have done testing below
1.data integration pipeline with redshift as destination
2. Tried with duplicate keys
3. At the time of pipeline runs trying select querying with same table

@vikasgoyal31071992
Copy link
Author

@dy46 is there any suggestions here ?

@vikasgoyal31071992
Copy link
Author

@tommydangerous @dy46

build_create_table_command(
column_type_mapping=self.column_type_mapping(schema),
columns=schema['properties'].keys(),
full_table_name=f'{temp_table_name}',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just do full_table_name=temp_table_name here

Comment on lines 151 to 157
drop_duplicate_records_from_temp = (f'DELETE FROM {full_table_name_temp} '
f'WHERE ({", ".join(unique_constraints_clean)},_mage_created_at) '
f'IN ( SELECT {", ".join(unique_constraints_clean)}, _mage_created_at '
f'FROM ( SELECT {", ".join(unique_constraints_clean)}, _mage_created_at, ROW_NUMBER() '
f'OVER (PARTITION BY {", ".join(unique_constraints_clean)} '
f'ORDER BY _mage_created_at DESC) as row_num '
f'FROM {full_table_name_temp} ) AS subquery_alias WHERE row_num > 1);')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you share an example of what this query looks like with the variable values filled in?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dy46 Please find sample query here

DELETE FROM temp_abc
WHERE (id,_mage_created_at)
IN ( SELECT id, _mage_created_at
FROM ( SELECT id, _mage_created_at, ROW_NUMBER()
OVER (PARTITION BY id
ORDER BY _mage_created_at DESC) as row_num
FROM temp_abc ) AS subquery_alias WHERE row_num > 1);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dy46 changes done as suggested

f'VALUES {insert_values}',
]),
]
# This is temp as need to know the best way to create table programmatically i will change it properly
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate on this comment? not sure what you mean here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, my bad will remove this comment

Comment on lines +68 to +75
build_create_table_command(
column_type_mapping=self.column_type_mapping(schema),
columns=schema['properties'].keys(),
full_table_name=temp_table_name,
if_not_exists=True,
unique_constraints=unique_constraints,
use_lowercase=self.use_lowercase,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have any logic to delete this temporary table?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No actually, i guess we don't have should we add this ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should if we are going to be creating it every sync

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually currently it's tried if table not exists only

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I still think we should delete it at the end of the sync to avoid leaving a temp table in the database.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants