-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix #982: active checkpoint system #993
Conversation
I think I see what is happening, if the gql update is stopped the temp table and csv are updated but live table is not created. Then when restarting the process we take the start date from live table but that doesn't exist so we take it from lake_ss but there is data inside cvs and temp table and when trying to insert it gives an error |
A fix could be to add a try catch around the |
We do not wan to update the live table if there was an error... We can simply break down into 2 processes.
What should happen here is...
|
@@ -53,6 +88,29 @@ def create_table_if_not_exists(self, table_name: str, schema: SchemaDict): | |||
empty_df = pl.DataFrame([], schema=schema) | |||
self._create_and_fill_table(empty_df, table_name) | |||
|
|||
def _create_sql_table_schema(self, schema: SchemaDict) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this have to be created now, as part of this PR?
# create the permanent table | ||
self.execute_sql( | ||
f"CREATE TABLE {permanent_table_name} ({sql_table_schema})" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put all of this into a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for column in temp_table_columns | ||
if column != "ID" | ||
] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is correct...
- I don't think the pds should handle the conflict. I.e. it should just handle merging from one to the other.
- You are trying to handle stuff ahead of time.
What should happen right now....
- Only new events are being processed... there should be no conflicts.
- There will be gaps in the data "null points", do not try to join/fx for these at the moment.
What will happen in the future...
- Create SQL queries that update existing records
Do not try to do this right now...
f"INSERT INTO {permanent_table_name} SELECT * FROM {temp_table.fullname}" | ||
f"""INSERT INTO {permanent_table_name} | ||
SELECT * FROM {temp_table.fullname} | ||
ON CONFLICT (ID) DO UPDATE SET {on_conflict_columns}""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here...
# can be added based on specific needs and compatibility. | ||
} | ||
|
||
def _get_sql_column_type(self, column_type) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this have to be created now, as part of this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic was created because we were using auto-casting for datatypes in the database. However, we now require at least one unique column, and making a column unique by altering it does not work. Therefore, we must specify it while creating the table. As a solution, I created this logic to cast column types manually.
I am open for different suggestions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just use the id column, which should be unique and every object has one at the moment?
I agree with this, If we move to production and there is missing data then we can't trust the production |
I have reverted the UNIQUE feature which is on PDS @idiom-bytes |
I believe what we did at the Data Fetching level is get st_ts and end_ts, then:
|
pdr_backend/lake/gql_data_factory.py
Outdated
) | ||
except Exception as e: | ||
self._move_from_temp_tables_to_live() | ||
logger.error("Error on fetching data from %s: %s", table.table_name, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed that part after adding it, realised we don't want this
Tested it and identified an issue: |
@kdetry @KatunaNorbert you tagged the wrong ticket...#982 is for incremental updates... which is not what we're doing
please read the readme, the epic, and ticket #1000 The only thing that we care about right now is putting raw-records into GQLDF. |
csvds = CSVDataStore(ppss.lake_ss.lake_dir) | ||
# Add some predictions | ||
pds.drop_table(get_table_name("pdr_predictions", TableType.TEMP)) | ||
csvds.delete("pdr_predictions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't get this...
- pytest creates a different tmpdir for every test
- you don't need to delete anything, there is nothing there
- there is now a function to delete csvs which is only used by this test
We've discussed the issues in here, reviewed implementations and agreed that:
Closing this PR |
Fixes #982
Changes proposed in this PR: