Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #982: active checkpoint system #993

Closed
wants to merge 13 commits into from

Conversation

kdetry
Copy link
Contributor

@kdetry kdetry commented May 5, 2024

Fixes #982

Changes proposed in this PR:

  • the move query is changed
  • UNIQUE constraints are added
  • the start-end timestamp logic is changed for the ETL and GQLDF

@kdetry kdetry requested a review from idiom-bytes May 6, 2024 20:23
@KatunaNorbert
Copy link
Member

deleted lake_data and run ETL, stopped it at the subscriptions fetching step and run into the following error when run it again:
Screenshot 2024-05-07 at 15 08 59

@KatunaNorbert
Copy link
Member

I think I see what is happening, if the gql update is stopped the temp table and csv are updated but live table is not created. Then when restarting the process we take the start date from live table but that doesn't exist so we take it from lake_ss but there is data inside cvs and temp table and when trying to insert it gives an error

@KatunaNorbert
Copy link
Member

A fix could be to add a try catch around the self._do_subgraph_fetch so whenever there the process is stopped we catch the error and move the existing tables to production. This was the production tables are reflecting what's inside the temp tables and there are not conflicts.
I tested it and seems to be working, I could push the changes if you agree with this approach

@idiom-bytes
Copy link
Member

idiom-bytes commented May 8, 2024

A fix could be to add a try catch around the self._do_subgraph_fetch so whenever there the process is stopped we catch the error and move the existing tables to production. This was the production tables are reflecting what's inside the temp tables and there are not conflicts.

We do not wan to update the live table if there was an error...

We can simply break down into 2 processes.

  1. Update data to local/csv
  2. Load from CSV to DuckDB

What should happen here is...

  1. GQL Data Factory should resume fetching from where it left off.
  2. When all fetching is completed, all data (from st_ts => to end_ts) is loaded from CSV onto the DuckDB tables.
  3. When all of this completes end-to-end the GQLDF is up-to-date.

@@ -53,6 +88,29 @@ def create_table_if_not_exists(self, table_name: str, schema: SchemaDict):
empty_df = pl.DataFrame([], schema=schema)
self._create_and_fill_table(empty_df, table_name)

def _create_sql_table_schema(self, schema: SchemaDict) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this have to be created now, as part of this PR?

# create the permanent table
self.execute_sql(
f"CREATE TABLE {permanent_table_name} ({sql_table_schema})"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put all of this into a function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for column in temp_table_columns
if column != "ID"
]
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct...

  1. I don't think the pds should handle the conflict. I.e. it should just handle merging from one to the other.
  2. You are trying to handle stuff ahead of time.

What should happen right now....

  1. Only new events are being processed... there should be no conflicts.
  2. There will be gaps in the data "null points", do not try to join/fx for these at the moment.

What will happen in the future...

  1. Create SQL queries that update existing records

Do not try to do this right now...

f"INSERT INTO {permanent_table_name} SELECT * FROM {temp_table.fullname}"
f"""INSERT INTO {permanent_table_name}
SELECT * FROM {temp_table.fullname}
ON CONFLICT (ID) DO UPDATE SET {on_conflict_columns}"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here...

# can be added based on specific needs and compatibility.
}

def _get_sql_column_type(self, column_type) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this have to be created now, as part of this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic was created because we were using auto-casting for datatypes in the database. However, we now require at least one unique column, and making a column unique by altering it does not work. Therefore, we must specify it while creating the table. As a solution, I created this logic to cast column types manually.

I am open for different suggestions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use the id column, which should be unique and every object has one at the moment?

@KatunaNorbert
Copy link
Member

We do not wan to update the live table if there was an error...

  1. GQL Data Factory should resume fetching from where it left off.
  2. When all fetching is completed, all data (from st_ts => to end_ts) is loaded from CSV onto the DuckDB tables.
  3. When all of this completes end-to-end the GQLDF is up-to-date.

I agree with this, If we move to production and there is missing data then we can't trust the production

@kdetry
Copy link
Contributor Author

kdetry commented May 8, 2024

I have reverted the UNIQUE feature which is on PDS @idiom-bytes

@idiom-bytes
Copy link
Member

idiom-bytes commented May 8, 2024

I believe what we did at the Data Fetching level is get st_ts and end_ts, then:

  1. Save new records to CSV + SQL raw tables
  2. Process new raw predictions rows => temp bronze prediction rows
  3. Move rows from temp tables => to live tables

)
except Exception as e:
self._move_from_temp_tables_to_live()
logger.error("Error on fetching data from %s: %s", table.table_name, e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed that part after adding it, realised we don't want this

@KatunaNorbert
Copy link
Member

Tested it and identified an issue:
When running gql update to fetch the data then stopping it along the way, for example after fetching predictions, slots and trueval, then running it again it fetches again all the data. I think it's looking at the live pdr_predictions table which it's only created at the end of the etl update, which is fine, but if the fetching process is stopped we should take the existing data from csvs and only fetch what is left instead of fetching everything.

@KatunaNorbert
Copy link
Member

KatunaNorbert commented May 10, 2024

The resume part looks fine now, but when stopping the fetch predictions process and restarting it at timestamp_x that timestamp is going to be used in all the other tables instead of the start date from the lake
Screenshot 2024-05-10 at 11 38 39

@kdetry
Copy link
Contributor Author

kdetry commented May 10, 2024

The resume part looks fine now, but when stopping the fetch predictions process and restarting it at timestamp_x that timestamp is going to be used in all the other tables instead of the start date from the lake Screenshot 2024-05-10 at 11 38 39

I tried to explain this situation while discussing the logic, we shouldn't rely on the pdr_predictions table for GQLDF, all other tables should manage their own situations. Only the ETL side look at pdr_predictions, if the bronze_pdr_predictions table does not have a record.

@idiom-bytes
Copy link
Member

idiom-bytes commented May 13, 2024

@kdetry @KatunaNorbert you tagged the wrong ticket...#982 is for incremental updates... which is not what we're doing

the start-end timestamp logic is changed for the ETL and GQLDF

please read the readme, the epic, and ticket #1000
incremental updates are for AFTER we get the first build ready.

The only thing that we care about right now is putting raw-records into GQLDF.
ETL is not part of #1000.

csvds = CSVDataStore(ppss.lake_ss.lake_dir)
# Add some predictions
pds.drop_table(get_table_name("pdr_predictions", TableType.TEMP))
csvds.delete("pdr_predictions")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't get this...

  1. pytest creates a different tmpdir for every test
  2. you don't need to delete anything, there is nothing there
  3. there is now a function to delete csvs which is only used by this test

@idiom-bytes
Copy link
Member

We've discussed the issues in here, reviewed implementations and agreed that:

  1. Much of the work being done originally and the order-of-events were indeed correct.
  2. We need to look across all tables to get the right timestamp, we can't just use the predictions table
  3. There were a lot of wrong assumptions in this implementation. Example: GQLDF SHOULD look at CSV records for where to resume not the DuckDB tables.

Closing this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants