Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for changing schemas #26

Open
hkdsun opened this issue May 23, 2018 · 13 comments
Open

Support for changing schemas #26

hkdsun opened this issue May 23, 2018 · 13 comments

Comments

@hkdsun
Copy link
Member

hkdsun commented May 23, 2018

The most common type of failures in Shopify's internal usage of Ghostferry is errors caused by changing schemas. These can manifest themselves in many different ways including DataIterator/BinlogWriter inserting data into deleted columns, tables disappearing on target database, and other incompatibilities between source and target databases.

This is especially problematic for very time consuming ferry runs as it increases the likelihood of this class of failures.

We need to think about this can be remedied. Starting with a limited scope seems like a good idea for this. For example, start by supporting addition/removal of columns or tables.

cc @Shopify/pods

@pushrax
Copy link
Contributor

pushrax commented May 23, 2018

There are two parts to this:

  1. Implement schema intersection so we don't try to copy tables/columns that don't exist in the target
  2. Detect when the schema changed on the source or target in a recoverable way and resume copying with the new schema
    a. This can be done by looking for DML events in the binlog of the source and target, but the data iterator is asynchronous relative to the binlog streamer and will use the wrong schema for some time. Also, if the schema changes in the target, both the data iterator and binlog streamer will use the wrong schema for some time.
    b. Detecting errors as they occur, pausing, reloading the schema, and resuming, may be a good general way to handle column removal and table removal since the data in those columns/tables is unimportant at that point. Table and column additions might not cause an error, but miss copying important data if ignored, so a binlog approach would still be necessary there. Some assumption about how long you need to wait before populating new columns with values would be necessary to make this work.

@hkdsun
Copy link
Member Author

hkdsun commented May 23, 2018

Thanks @pushrax!

Some assumption about how long you need to wait before populating new columns with values would be necessary to make this work.

Why is that important? Any changes that occur to a table or column that gets created during a run will be visible to the binlog streamer. So, if we can pause all binlog event processing while we handle a DDL event, we'd automatically copy any activity on new columns / tables.

@pushrax
Copy link
Contributor

pushrax commented May 30, 2018

@hkdsun and I came up with some ideas for this last week, and met with @shuhaowu yesterday to discuss them. We got stuck in our solution without a good way to synchronously detect schema changes in the target database. @hkdsun and I by chance had lunch with @insom today, who gave us an insightful hint to leverage transactional behaviour. Here's the idea:

  1. Goal: the ghostferry run should be resilient to created tables, dropped tables, created columns, dropped columns, and changes to indexes (other than the primary index and sharding key). No other schema changes are supported; they should be detected and cause the run to fail intentionally.
  2. When copying anything, intersect the schema of the data being copied (the source schema) with the schema of the target database (the target schema), and copy only those tables and columns that exist in the intersection (the app schema).
    A. This is based on the assumption that anything writing to production data in the source or target may only use tables and columns that exist in the app schema.
    B. The binlog streamer needs its own version of the source schema since a DML event it sees might have been generated with an older schema. If we try to use the latest source schema, it will be unable to infer column names correctly. Let's call this the binlog schema.
  3. At any time, for all rows copied, the columns sent to the target must equal the columns present in the app schema. This requires synchronously detecting column changes in all schemas. Note that from the binlog streamer's point of view, the app schema is the binlog schema intersected with the target schema, while from the data iterator's point of view the app schema is the source schema intersected with the target schema.
    A. Changes to the source schema are synchronously reflected in the rows the data iterator is reading due to SELECT *, and the existing transaction prevents the table from being altered until the batch is done being copied. Not much needs to be changed here.
    B. Changes to the binlog schema are reflected in table map events, which appear synchronously in the binlog stream. Code will need to be written to update the binlog schema when we see these events. A possible sanity check can be done by running SHOW CREATE TABLE again, and if it doesn't match the new binlog schema intentionally fail the run. This would mean multiple quick updates to a single table's schema might cause failures, with the benefit of some risk reduction.
    C. Changes to the target schema can be detected by opening a transaction on the target in a new connection, selecting any row, and checking if the columns in that row match the target schema in memory. If they don't match, reload the schema and retry the select. Once they do match, the target schema is valid until the transaction is ended. Then, if all copying is done while this transaction is open, we are guaranteed to always have the correct target schema. To avoid blocking table changes forever, this transaction could be restarted at some interval, perhaps every minute.
    D. If the binlog streamer sees an applicable DML event is modifying a column not present in the app schema, fail the run intentionally as this violates 1.A.
  4. When a table is created it will be empty until some writes occur, which will appear as row events in the binlog after the DDL event that creates the table. If we encounter a row event for a table that's not in the app schema (because it's not in the target schema), we can reload the target schema and we are guaranteed to find the table there. If it's not, fail the run intentionally as this violates 1.A.
  5. When a table is dropped in the source, the binlog streamer will see a DDL event and will simply stop getting row events for that table. When a table is dropped in the target, the mechanism in 2.C will remove it from the app schema so the binlog streamer will stop applying events for it. When a table is dropped in the source or target, its data iterator will run into an error and halt.

Let's go over some examples to see if this might be correct. In the figure, time progresses rightward, S is the source, T is the target, the vertical arrows represent the moment a schema change is applied, and the the triangle represents the moment ghostferry's binlog streamer sees the table map event due to the source schema change.

lhm_safety 2

Column creation

  • In A.1, the column exists in the target, but is ignored since the app, data iterator and binlog streamer will not touch it. In A.2 the column exists on the source and target, and the app may now write to it.
    • In A.2 suppose an INSERT (old_col, new_col) VALUES (1, 1) occurs, and the data iterator copies this row before A.3. Since it runs SELECT *, it gets both old_col and new_col, and these both exist in the target schema so it copies both columns. Now suppose an update setting new_col=2 occurs. When we get to A.3, the binlog schema is first updated to include new_col. The insert is processed with both columns, and gets dropped due to a duplicate primary key. The update is processed generating UPDATE SET old_col=1, new_col=2 WHERE old_col=1 AND new_col=1 AND <pk condition>, which gets applied to the target correctly.
    • In A.2 suppose row old_col=1, new_col=default exists. The data iterator copies it, and then we get an update with new_col=1. In A.3 the update is processed generating UPDATE SET old_col=1, new_col=1 WHERE old_col=1 AND new_col=default AND <pk condition>, which gets applied to the target correctly.
  • In B.1, the column exists on the source, but is ignored since it's not in the target. Any write to it at this point is a violation of premise 1.A at the top of this document. In B.2, writes that occurred to the column in B.1 or B.2 will be detected and fail the run intentionally. In B.3, the app schema is updated synchronously and the column will now be copied by both the data iterator and binlog streamer.
  • C.1 is equivalent to B.1. In C.2, the new column exists in both source and target so writes are allowed. The data iterator will include the new column in the app schema, but the binlog streamer won't.
    • In C.1 suppose row old_col=1 exists and is updated to old_col=2 while the binlog streamer lags. In C.2, the data iterator copies old_col=2, new_col=default, then we get an update with new_col=1. The binlog streamer processes the first update, generating UPDATE SET old_col=2 WHERE old_col=1 AND <pk condition>, which is dropped. In C.3, the binlog streamer processes the second update, generating UPDATE SET old_col=2, new_col=1 WHERE old_col=2 AND new_col=default AND <pk condition>, which is applied.
    • An erroneous write to the new column in C.1 will not be detected, since the app schema will include that column by C.3 when any binlog event that could have generated the write will be processed. If this happens, data corruption may occur. Suppose row old_col=1, new_col=1 is inserted in C.1, and the data iterator copies just old_col=1. Then in C.2 an update occurs setting old_col=2. In C.3, the update is processed, generating UPDATE SET old_col=2, new_col=1 WHERE old_col=1 AND new_col=1, which is dropped since new_col=default in the target. Case C occurs when ghostferry's binlog lag (unrelated to MySQL's replication lag) is greater than the time between the migration completing on the source and target. There are possible countermeasures here, such as pausing in C.1 until the DDL event corresponding to the schema change is seen. The additional effort vs the possible risk needs to be weighed.

Column removal

  • In A.1 and B.2, the app schema is updated and the column will not be used for any further operations. Any row events in the binlog will start ignoring the column when generating the target query, and any row batches in the data iterator will start ignoring the column when generating the INSERT batch.
  • In B.1 and C.1, data iterators will stop copying the removed column since it will not be returned in SELECT *.
  • In C.2, the binlog schema is updated and further row events can't possibly use the removed column since there would be no way to generate such an event after the column is removed.

Table creation

  • In A.2, B.2, and C.3 – the first points where any write may happen to the new table – there is no data iterator running for this table so we simply wait for the binlog streamer to see writes and apply them.

Table removal

  • In A.1 and B.2, the app schema is updated and the table will not be used for any further operations. The running data iterator for that table will encounter an error and shut down. The binlog streamer will stop applying writes to the dropped table.
  • In B.1 and C.1, the running data iterator for that table will shut down.
  • In C.2, the binlog schema is updated and further row events can't possibly use the removed table since there would be no way to generate such an event after the table is removed.

Non-exhaustive list of remaining things to think about:

  1. We haven't thought much about what to do during the verification step yet. Intuition is that it will be relatively similar. It may require using the long-transaction approach for the source database as well to detect column additions, since we don't currently use SELECT * during verification.
  2. How can we prove correctness formally? If not, how can we gain confidence in correctness?
  3. Does this work if the source or target has more than one schema change completed before the other, i.e. multiple concurrent migrations are running? Is this ever something we'd want to support?

@shuhaowu
Copy link
Contributor

shuhaowu commented Jul 9, 2018

So after some discussions with many people and some work with TLA+, I believe that the problem is of two folds:

  1. How do we detect a schema migration occured?
  2. How should Ghostferry proceed once a schema migration occurs?

The first problem still needs a solution as it's unclear how we can do this easily. The second problem, however, has a very simple solution. This is the interrupt + reconcile + cleanup/recopy method:

<tldr>

  1. When a schema change occurs on either the source or the target, we immediately stop Ghostferry.
  2. We resume ghostferry with a known good binlog/cursor position once the source and target schema have equalized.
  3. Before starting the standard Ghostferry algorithm, we first read through the binlogs from the known good position to the current position (run SHOW MASTER STATUS at this point):
    1. For each row changed by the binlog, delete this row from the target database.
    2. If this row has already been copied earlier (as indicated by the known good cursor position), recopy it to the target.
  4. For any table that has encountered a schema migration during the time between the interruption and the resume:
    1. Delete all applicable records from the target.
    2. Reset the known good cursor position for these tables to 0.
  5. Start the standard Ghostferry algorithm with the known good cursor position and the binlog position determined in step 3.

</tldr>

Further analysis of this has been done in prose in issue #17 and in TLA at #43 and is shown to preserve the safety properties of Ghostferry. Some alternatives to step 4 has also been documented in #17 that could be more time efficient.

What's remaining is that we need to detect when schema changes occur, and this could be non-trivial. There are also alternatives where we don't interrupt Ghostferry. In these ideas, we can possibly complete Ghostferry runs faster, although I believe it will be complex to implement into the current code base.

@fw42
Copy link
Contributor

fw42 commented Jul 10, 2018

A few questions off the top of my head:

immediately stop Ghostferry.

Why do we have to stop ghostferry rather than just making it sleep and wait for the schema to equalize? Doing that would save us the complexity of dumping and restoring state. It would also allow us to keep making progress on tables that didn't have their schema change and only sleep in the goroutine of the one that did. Wouldn't that be more efficient (and also sounds easier to implement)?

resume (..) once the source and target schema have equalized.

How do we know that they have equalized if ghostferry isn't running anymore? Who/what checks whether or not they are equalized yet?

known good binlog/cursor position

What's the definition of a "good position"? I assume that's the last position (or any) position we know of that didn't have the schema change yet?

For each row changed by the binlog, delete this row from the target database.

This only applies to tables that had their schema changed, right? Not ALL tables? If ALL, why?

@shuhaowu
Copy link
Contributor

Why do we have to stop ghostferry rather than just making it sleep and wait for the schema to equalize? Doing that would save us the complexity of dumping and restoring state. It would also allow us to keep making progress on tables that didn't have their schema change and only sleep in the goroutine of the one that did. Wouldn't that be more efficient (and also sounds easier to implement)?

This was suggested as an alternative solution internally as well. Yes, it would be more time efficient to pause the copy on a particular table and delete the applicable records while copying other tables whose schemas are not changing. However, I think it will be much harder to implement that dumping and restoring since most of the code for dump/restore is already in the Ghostferry codebase, just not hooked up. There are also a fair amount of complexity surrounding exactly how to pause a particular table, since we essentially have a pre-forked model in terms of how we parallelize per table.

Additionally, having a dump/restore would be beneficial for other circumstances as well such as in circumstances lost of connectivity to the database, whereas the code implementing the sleep/wait for equalize would only have a single use: to handle schema changes.

How do we know that they have equalized if ghostferry isn't running anymore? Who/what checks whether or not they are equalized yet?

Every time we start Ghostferry in reconciliation mode, it should check the schemas to see if they're identical. If not, Ghostferry should quit with some appropriate exit code. An external supervisor service could simply launch Ghostferry in a loop. Alternatively, the supervisor could check the schemas itself and launch Ghostferry when appropriate.

What's the definition of a "good position"? I assume that's the last position (or any) position we know of that didn't have the schema change yet?

This is documented somewhat in #17. It also depends how we want to perform schema change detection. Theoretically, this would be the lastStreamedBinlogPosition and lastSuccessfulPrimaryKeys in the BinlogStreamer and DataIterator respectively.

This only applies to tables that had their schema changed, right? Not ALL tables? If ALL, why?

ALL tables. During the downtime, any record on any table can change. We must update the target or otherwise the target data would get stuck with the version of data from when we interrupted as future binlog events will have no effect. This is documented in great details in #17, under the section Safety of the Reconciliation Step.

@pushrax
Copy link
Contributor

pushrax commented Jul 10, 2018

Detecting schema changes is something we have already talked a lot about when trying to figure out a truly general solution.

  1. Source schema changing:
    a. During data iteration, we SELECT *, if the columns we get back mismatch then the schema changed.
    b. During binlog streaming (when the binlog streamer wins the race with the data iterator, or the data iterator is already done), we can look at DDL events that pop up and parse out the table name (significantly easier than parsing out the whole event). Checking the number of columns in DML events against the loaded schema provides an additional margin of safety.
  2. Target schema changing: if we support only table/column addition/removal, we can look at errors that come back from writes and know the schema changed if the error is ER_NO_SUCH_TABLE or ER_BAD_FIELD_ERROR. Extra tables/columns in the target can be ignored.

@kolbitsch-lastline
Copy link
Contributor

I'm new to ghostferry but have spend quite some time working on this problem. I think there is a subset of this problem that might be somewhat easier to solve (and that I'm particularly interested in):

We are trying to use ghostferry not to move data, but instead to continuously replicate data from a source into a target. This can be useful, for example, when the target DB

  • cannot be configured to be a slave (e.g., Google's (GCP) cloud-SQL has this limitation, if the target DB holds other DBs), or
  • is older running an older version of mysql than the source DB (e.g., master is upgraded to a newer version of MySQL but for whatever reason the slave cannot be upgraded).

In our scenario, we can assume that we restore the target DB from an earlier backup of the source. Thus, the batch-copy operation of ghostferry has completed already or never took place. At this point, we only want to stream binlog events, but we also need to follow schema changes to be able to apply all row-events.

The nice thing about our scenario - and that's the subset that we could be solving somewhat easily - is that the target DB has the final state of all tables we care about before any alteration happens. As a result, we can use the target DB before/after altering (or adding) tables to re-extract the schema.

There is, however, a problem with this: the race condition between the binlog-writer applying a schema change, and the binlog-streamer receiving row-events, and checking that the schema (the number of columns) that we have in the table-schema-cache matches the number of columns in the incoming rows-event.
I'm referring to code you have in ddl_events.go, such as

if len(row) != len(table.Columns) {
	return nil, fmt.Errorf(
		"table %s.%s has %d columns but event has %d columns instead",
		table.Schema,
		table.Name,
		len(table.Columns),
		len(row),
	)
}

I have worked around this problem, but "dumbing down" the binlog-streamer, and making it a "pure streamer" - that is, one that does not care (or know) about the schemas of tables.
The binglog-writer, on the other hand, is now smarter, and enforces that the cached table schema matches the rows-event and bails out if not. That is, the generation of DDL-events was moved from the streamer to the writer.
Since the writer now also receives all schema changes, it can simply re-validate the schema of each table that is altered after executing the ALTER on the target DB. Thus, there is no race condition here, as it's all within the writer thread.

IMHO, making the streamer not know (or care) about schemas is a slightly cleaner design - but if there was a good reason you guys chose to do the opposite, I'm happy to be proven wrong :-) .

Of course there is still a race between the binlog-writer and a batch-writer, as the batch-writer also may need to know the schema. However, I think that trying to apply schema changes before all batches have been applied is a losing battle. The thoughts you pointed out above are clever, but it's going to be incredibly difficult getting this right.

Two more comments on the above:

  • first, since the binlog-writer operates on a restored schema in our scenario, ghostferry actually supports foreign-keys (and FK constraints). The batch-processor is not needed (as we restore a DB first, not using ghostferry), and by syncing data writes and schema changes in the binlog-writer allows guaranteeing that all constraints are met (or they would have failed when being inserted in the master)
  • second, while testing my variation of ghostferry-copydb (I call it ghostferry-replicatedb, or GFR) I actually found that the resume-logic has a design flaw, but I'll open a separate bug ticket for this.

So, long preamble to this: I have semi-working version of GFR that allows continuously replicating data and schema changes. Most of the changes could be used in ghostferry-copydb, but I understand that copydb tries to work under different scenarios than what I describe above.
Is it still interesting to have a pull-request into this project, or should I completely fork off the code (obviously keeping all kudos and licenses intact)?

@shuhaowu
Copy link
Contributor

shuhaowu commented Mar 6, 2020

Hi! Thanks for the detailed analysis. Internally we've actually come up with a different way of handling schema changes using interrupt/resume as opposed to some of the points outlined above. I can post a more detailed version of that at a later time if you'd like, however it feels like the solution takes a direction that doesn't quite match what you're doing.

In the meanwhile, you said there's a design flaw in the resume logic. Can you elaborate that? We can do this fix quickly if you can give a good description of it.

@shuhaowu
Copy link
Contributor

shuhaowu commented Mar 6, 2020

Is it still interesting to have a pull-request into this project, or should I completely fork off the code (obviously keeping all kudos and licenses intact)?

This depends on how much of your modifications is to the BinlogStreamer/Writer and how that may interact with the rest of the codebase. It is theoretically to write a ghostferry application (ghostferry-replicatedb) that bypasses the Ferry level API and call BinlogStreamer/BinlogWriter directly. We could merge this application (ghostferry-replicatedb) if the changes do not "interfere" with core Ghostferry functionalities (both copydb and sharding).

@kolbitsch-lastline
Copy link
Contributor

In the meanwhile, you said there's a design flaw in the resume logic. Can you elaborate that? We can do this fix quickly if you can give a good description of it.

yep, just wrote up #156 . We can continue this part of the conversation there.

@kolbitsch-lastline
Copy link
Contributor

This depends on how much of your modifications is to the BinlogStreamer/Writer and how that may interact with the rest of the codebase. It is theoretically to write a ghostferry application (ghostferry-replicatedb) that bypasses the Ferry level API and call BinlogStreamer/BinlogWriter directly. We could merge this application (ghostferry-replicatedb) if the changes do not "interfere" with core Ghostferry functionalities (both copydb and sharding).

all my changes are meant to be kept generic. I wouldn't be surprised if my refactorings negatively impacted the batch/iterative copy code, but nothing has conceptually changed - I wrote all code in a way that it would also benefit copydb .

I think the changes I made should be split up into multiple sub-pull-requests

  • the first one is probably the resume fix (if you agree that I'm right with my changes).
  • the second one would need to be the change from generating DDLEvents from within the binlog-syncer, and move that into the binlog-writer (and batch-writers)
  • the third would be to support schema changes
  • the forth would be adding the replicatedb code

One step at a time :-)

@shuhaowu
Copy link
Contributor

shuhaowu commented Mar 8, 2020

Is it possible for you to post your WIP code somewhere? We can take a look and give you feedback on whether we can work with it for upstreaming.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants