Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vitess VStream Replicator Support #1943

Open
wants to merge 34 commits into
base: master
Choose a base branch
from

Conversation

kovyrin
Copy link

@kovyrin kovyrin commented Nov 9, 2022

[Implements/fixes #1757]

As a part of a project around data materialization efforts within Shopify, we would like to use Maxwell for running our custom producer code. The databases on which the project will be running use Vitess as the clustering/sharding technology. This means we need Maxwell to support replicating data from a Vitess keyspace. This PR attempts to address that need by adding support for a new Replicator class using VStream API provided by Vitess VTgate component.

What already works

  • The code successfully connects to vtgate via the VStream API (including full TLS support, certificate verification, custom CA, etc).

  • Both user/password and TLS auth work for gRPC connections to vtgate.

  • Maxwell follows a given keyspace in a Vitess cluster (all shards by default, or a specific shard configured within the properties file) and receives all relevant events.

  • Maxwell uses FIELD events from VStream to maintain an in-memory representation of the database schema (meaning we don't need to load it from MySQL or analyze DDL events). See a comment below for more details on how this works.

  • Using the in-memory schema, Maxwell converts VStream ROW events into RowMap objects (including before/after value tracking) and pushes those to a given producer.

  • VGTID events are used to track current position (including the initial position when first started) and the data is persisted in a new vitess_gtid column within the positions table. The persisted position is use for recovery in case of a restart.

Still need to implement/fix/figure out

  • When loading position info from the database, we need to make sure to only use keyspaces/shards that match our existing configuration, otherwise a change in a configuration without a corresponding change in a client_id value would make Maxwell to use the old VGTID and never follow the newly configured keyspace/shards.

  • The code needs tests

  • We need to update documentation to add references to Vitess support and explain how it works.

  • The code within the replicator class may contain some duplication when compared to the MysqlReplicator class, but that is by design - I tried to refrain from changing the current code too much until things stabilize a bit within the Vitess-related codebase to make sure whatever generalization we do later would be done on a stable set of classes.

Licensing

You will notice, that there is a set of files within the vitess directory that have Apache 2.0 headers. Those are derivatives of the code used by the Vitess connector in Debezium. Will need guidance on how to properly integrate that code into the codebase (if there is a NOTICE file where it needs mentioning or something like that) or if it needs to be re-implemented (it mostly revolves around schema caching and VStream field value parsing/conversion).

Things that work:
* The code successfully follows a given keyspace in a Vitess cluster (a specific shard or all shards) and pushes RowMap objects to a given producer.

Still need to implement/fix/figure out:
* Position store is stubbed out for now and position recovery is not implemented, so it always starts at the current stream position.
* I'm not sure I'm handling GRPC stream shutdown properly.
* Had to disable OpenCensus dependency for now because they depend on an ancient grpc version that conflicts with the grpc dependency used by vitess.
@kovyrin kovyrin mentioned this pull request Nov 9, 2022
@kovyrin
Copy link
Author

kovyrin commented Nov 9, 2022

The config.properties I'm using for my development/testing:

# tl;dr config
log_level=debug

producer=stdout
# producer=kafka
# kafka.bootstrap.servers=localhost:9092

# Enable vitess support
vitess=true
vitess_keyspace=commerce
vitess_shard=
vtgate_host=localhost
vtgate_port=15991

# MySQL login info for storing maxwell metadata
host=storefront-renderer.railgun
user=root
password=

@osheroff
Copy link
Collaborator

You will notice, that there is a set of files within the vitess directory that have Apache 2.0 headers. Those are derivatives of the code used by the [Vitess connector in Debezium (https://debezium.io/documentation/reference/stable/connectors/vitess.html). Will need guidance on how to properly integrate that code into the codebase (if there is a NOTICE file where it needs mentioning or something like that) or if it needs to be re-implemented (it mostly revolves around schema caching and VStream field value parsing/conversion).

Definitely don't re-implement. Maxwell is apache2 as well, copying it on in is fine. The debezium guys are friends, of sorts, and a NOTICE file will be fine.

@osheroff
Copy link
Collaborator

Nice start! Let me know if there's anything you're unclear on; I presume that maxwell.positions will have enough fields to store your position.

I won't get deep into source-level comments just now, on a very surface level you should know that maxwell uses tabs-not-spaces :)

@kovyrin
Copy link
Author

kovyrin commented Nov 21, 2022

maxwell uses tabs-not-spaces :)

Nice catch, fixed that!

I'm back from vacation and planning to work on adding position tracking and recovery.

@kovyrin
Copy link
Author

kovyrin commented Nov 22, 2022

Added support for Vitess position tracking and recovery.

Few notes on the implementation:

  • I've added a new TEXT field to the positions table for storing vgtid values since they may get pretty large for large Vitess clusters and I did not want to risk storing it in the gtid_set column (+ the format is different, so I was worried it may cause parsing problems down the road).

  • When initializing a new deployment of Maxwell, instead of asking for a current position and persisting it right away (like we do for MySQL), I have opted for waiting for the initial VGTID event that is sent to the client immediately upon connection and then using that value as the initial vgtid (so the semantics of initial positioning have changed a bit).

  • I have not implemented heartbeat support for Vitess since at the moment I'm targeting a deployment model where Maxwell has a separate MySQL cluster for storing its management schema (since it does not fit into the sharded model of Vitess directly as all the tables miss a sharding key) and those heartbeats from the Maxwell database will never be visible within the Vitess change stream.

@kovyrin
Copy link
Author

kovyrin commented Nov 22, 2022

OK, polished it all a bit, cleaned up some changes we didn't need and tomorrow will try to deploy in staging within our infrastructure and run on a relatively large Vitess cluster to see how it goes. 🤞🏻

@osheroff
Copy link
Collaborator

I've added a new TEXT field to the positions table for storing vgtid values since they may get pretty large for large Vitess clusters and I did not want to risk storing it in the gtid_set column

That's fine, two things:

  1. need some code in upgradeSchemaStoreSchema to add the column to existing installs
  2. probably name it vitess_gtid for clarity

@osheroff
Copy link
Collaborator

I have not implemented heartbeat support for Vitess since at the moment I'm targeting a deployment model where Maxwell has a separate MySQL cluster for storing its management schema (since it does not fit into the sharded model of Vitess directly as all the tables miss a sharding key) and those heartbeats from the Maxwell database will never be visible within the Vitess change stream.

That's fine, we'll need some code strewn around the codebase to turn off the expectation of heartbeats.

@osheroff
Copy link
Collaborator

do you have a docker-compose type setup I could use to step through the code?

@osheroff
Copy link
Collaborator

can I get a 10k foot view of how the in-memory schema works? Does vitess keep track of what tables a stream listeners has seen already and then send any unseen tables + changes?

@osheroff
Copy link
Collaborator

I have not implemented heartbeat support for Vitess

Honestly the whole heartbeating thing even for mysql is of dubious value; it was written in a world where GTID wasn't hugely common to support a wild and overly engineered desire to not lose data during primary server flips. I may start trudging down the ugly and unpleasant path of deprecation at some point.

@kovyrin
Copy link
Author

kovyrin commented Nov 23, 2022

can I get a 10k foot view of how the in-memory schema works? Does vitess keep track of what tables a stream listeners has seen already and then send any unseen tables + changes?

Exactly! For each VStream connection, they keep track of what tables and what versions of those tables a client has seen and before sending any data for a given table, they make sure the client first gets a FIELD event explaining that the table schema should look like.

Here is a demo of how it works in reality:

  1. Let's connect to VStream and listen for updates:
❯ grpcurl -plaintext -d '{"vgtid":{"shard_gtids":[{"keyspace":"commerce", "gtid":"current"}]}}' localhost:15991 vtgateservice.Vitess.VStream

It will immediately send the VGTID event with the current position (which we use to set record the initial position in Maxwell).

Click to see JSON stream
{
  "events": [
    {
      "type": "VGTID",
      "vgtid": {
        "shardGtids": [
          {
            "keyspace": "commerce",
            "shard": "0",
            "gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2060"
          }
        ]
      },
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "OTHER",
      "keyspace": "commerce",
      "shard": "0"
    }
  ]
}
  1. Now, let's create a simple table with create table bananas(name varchar(100));. The stream will receive a DDL event with the DDL statement:
Click to see JSON stream
{
  "events": [
    {
      "type": "VGTID",
      "vgtid": {
        "shardGtids": [
          {
            "keyspace": "commerce",
            "shard": "0",
            "gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2061"
          }
        ]
      },
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "DDL",
      "timestamp": "1669215464",
      "statement": "create table bananas (\n\t`name` varchar(100)\n)",
      "currentTime": "1669215464575035000",
      "keyspace": "commerce",
      "shard": "0"
    }
  ]
}
  1. Next, we insert some data insert into bananas set name="test1";. The stream receives a FIELD event with the schema for the new table, followed by a ROW event with the actual data:
Click to see JSON stream
{
  "events": [
    {
      "type": "BEGIN",
      "timestamp": "1669215490",
      "currentTime": "1669215490981788000",
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "FIELD",
      "timestamp": "1669215490",
      "fieldEvent": {
        "tableName": "commerce.bananas",
        "fields": [
          {
            "name": "name",
            "type": "VARCHAR",
            "table": "bananas",
            "orgTable": "bananas",
            "database": "vt_commerce",
            "orgName": "name",
            "columnLength": 400,
            "charset": 45,
            "columnType": "varchar(100)"
          }
        ],
        "keyspace": "commerce",
        "shard": "0"
      },
      "currentTime": "1669215490990077000",
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "ROW",
      "timestamp": "1669215490",
      "rowEvent": {
        "tableName": "commerce.bananas",
        "rowChanges": [
          {
            "after": {
              "lengths": [
                "5"
              ],
              "values": "dGVzdDE="
            }
          }
        ],
        "keyspace": "commerce",
        "shard": "0"
      },
      "currentTime": "1669215490990132000",
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "VGTID",
      "vgtid": {
        "shardGtids": [
          {
            "keyspace": "commerce",
            "shard": "0",
            "gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2063"
          }
        ]
      },
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "COMMIT",
      "timestamp": "1669215490",
      "currentTime": "1669215490990165000",
      "keyspace": "commerce",
      "shard": "0"
    }
  ]
}
  1. Let's insert another row insert into bananas set name="test2";. Now the stream only gets the transaction, but no schema updates:
Click to see JSON stream
{
  "events": [
    {
      "type": "BEGIN",
      "timestamp": "1669215502",
      "currentTime": "1669215502107840000",
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "ROW",
      "timestamp": "1669215502",
      "rowEvent": {
        "tableName": "commerce.bananas",
        "rowChanges": [
          {
            "after": {
              "lengths": [
                "5"
              ],
              "values": "dGVzdDI="
            }
          }
        ],
        "keyspace": "commerce",
        "shard": "0"
      },
      "currentTime": "1669215502107932000",
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "VGTID",
      "vgtid": {
        "shardGtids": [
          {
            "keyspace": "commerce",
            "shard": "0",
            "gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2064"
          }
        ]
      },
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "COMMIT",
      "timestamp": "1669215502",
      "currentTime": "1669215502107961000",
      "keyspace": "commerce",
      "shard": "0"
    }
  ]
}
  1. Let's change the schema: alter table bananas change name name varchar(50);. The stream gets a DDL event:
Click to see JSON stream
{
  "events": [
    {
      "type": "VGTID",
      "vgtid": {
        "shardGtids": [
          {
            "keyspace": "commerce",
            "shard": "0",
            "gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2065"
          }
        ]
      },
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "DDL",
      "timestamp": "1669215523",
      "statement": "alter table bananas change column `name` `name` varchar(50)",
      "currentTime": "1669215523734671000",
      "keyspace": "commerce",
      "shard": "0"
    }
  ]
}
  1. Finally, let's insert more data: insert into bananas set name="test3";. Now VTgate detects that we have not seen the new schema yet, so it sends us a transaction with a FIELD event inside before sending us the data:
Click to see JSON stream
{
  "events": [
    {
      "type": "BEGIN",
      "timestamp": "1669215535",
      "currentTime": "1669215535606806000",
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "FIELD",
      "timestamp": "1669215535",
      "fieldEvent": {
        "tableName": "commerce.bananas",
        "fields": [
          {
            "name": "name",
            "type": "VARCHAR",
            "table": "bananas",
            "orgTable": "bananas",
            "database": "vt_commerce",
            "orgName": "name",
            "columnLength": 200,
            "charset": 45,
            "columnType": "varchar(50)"
          }
        ],
        "keyspace": "commerce",
        "shard": "0"
      },
      "currentTime": "1669215535619492000",
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "ROW",
      "timestamp": "1669215535",
      "rowEvent": {
        "tableName": "commerce.bananas",
        "rowChanges": [
          {
            "after": {
              "lengths": [
                "5"
              ],
              "values": "dGVzdDM="
            }
          }
        ],
        "keyspace": "commerce",
        "shard": "0"
      },
      "currentTime": "1669215535619512000",
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "VGTID",
      "vgtid": {
        "shardGtids": [
          {
            "keyspace": "commerce",
            "shard": "0",
            "gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2066"
          }
        ]
      },
      "keyspace": "commerce",
      "shard": "0"
    },
    {
      "type": "COMMIT",
      "timestamp": "1669215535",
      "currentTime": "1669215535619533000",
      "keyspace": "commerce",
      "shard": "0"
    }
  ]
}

@osheroff
Copy link
Collaborator

nice, very ergonomic.

Just something to keep in the back of your head:

Maxwell supports configurations with near-psychotic sized schemas, hundreds of thousands of tables. To this end, it's important to keep in-memory schema representations as compact as possible. The default in-memory representation jumps through crazy intern hoops to do so. Not saying you should do this in v1, but if you can design your data to be somewhat compact it'll help us down the line.

@kovyrin
Copy link
Author

kovyrin commented Nov 23, 2022

do you have a docker-compose type setup I could use to step through the code?

Working today on setting that up (so far it has been based on a bunch of internal tooling we use at Shopify + a bunch of duct tape around official scripts shopped with Vitess for local dev environment setup).

@kovyrin
Copy link
Author

kovyrin commented Nov 24, 2022

do you have a docker-compose type setup I could use to step through the code?

Added a vitess directory to the project, including a set of scripts and a README file explaining how to run it in development/test mode. That should allow you to run the whole setup with just two commands.

@kovyrin kovyrin changed the title WIP implementation of Vitess VStream replicator support Vitess VStream Replicator Support Nov 28, 2022
@kovyrin
Copy link
Author

kovyrin commented Nov 30, 2022

Ok, we have our first instance of Maxwell running on Vitess deployed in Staging on top of a real cluster and it seems to work as expected. More testing, benchmarks, etc over the next couple of weeks.

@kovyrin
Copy link
Author

kovyrin commented Dec 7, 2022

Updated the PR with the latest changes from the master branch and now it works witout disabling outdated dependencies.

@kovyrin
Copy link
Author

kovyrin commented Dec 7, 2022

Added and tested both TLS certificate and user/password auth on our staging instances - both work as expected.

@osheroff
Copy link
Collaborator

osheroff commented Dec 7, 2022 via email

@kovyrin
Copy link
Author

kovyrin commented Dec 7, 2022

I'd like it to bake for a bit in our staging environments (until January?) and then we can be more confident about it. But thank you for your vote of confidence! 😊

@osheroff
Copy link
Collaborator

osheroff commented Dec 7, 2022 via email

@kovyrin
Copy link
Author

kovyrin commented Dec 12, 2022

@osheroff Noticed an issue today:

Due to the way VStream ROW events come in before the VGTID event for a given transaction and they do not have their own position values, I end up simply setting the position on the last ("commit") RowMap event manually at the very end. \

I've looked at your existing producers and they seem to use the nextPosition value from RowMap to commit the latest position, so I've assumed the nextPosition is what I need to set. It works in all my testing with built-in producers.

Interestingly though, your example CustomProducer (and, as a result, our a producer we use for our project here at Shopify) has context.setPosition(r.getPosition());. Since I do not set those values on VStream-originating RowMap events (the value is a static field and I wasn't sure I should touch it), the whole position tracking thing ends up not working, leading to all kinds of fun problems.

So, the question is: which is the right way to update progress from a producer? And, if both the position and the last position field could potentially be used, should I just set them to the same value to make the system work for people who may have followed the example producer code and have the same logic in place?

Thank you for any insight you can provide into this.

@osheroff
Copy link
Collaborator

ugh, I'm sure that's just a dumb bug with the example CustomerProducer, or maybe RowMap#getPosition used to actually return the next position, or something.

The proper behavior is most assuredly to do context.setPosition(r.getNextPosition()). Does that work with vitess' view of how positions are tracked?

@kovyrin
Copy link
Author

kovyrin commented Dec 13, 2022

The proper behavior is most assuredly to do context.setPosition(r.getNextPosition()).

Great, I'll just use that in our custom producer.

Does that work with vitess' view of how positions are tracked?

Yes, absolutely, that works!

@osheroff
Copy link
Collaborator

osheroff commented Feb 1, 2023

how'd it all go @kovyrin? stall a bit or you in prod?

…on Vitess since they may be following a different keyspace/shards
@kovyrin
Copy link
Author

kovyrin commented Feb 1, 2023

stall a bit or you in prod?

Hey, Ben! We're slowly rolling it out in some of our environments and ironing out rough edges, etc. Currently exploring different options for coordination of multiple instances of Maxwell running with the same config (sometimes happens during rolling upgrades, k8s cluster failovers, etc). I'm considering adding Zookeeper support for leader election or something like that (will propose it in a separate PR if we decide to go that route).

I'll try to polish the Vitess PR enough over the next couple of weeks (add tests, docs, etc) and then request a final review/merge if that's OK with you. Thank you for your support!

@dbmurphy
Copy link

@kovyrin Did this ever go to production? It would be great to see this officially in maxwell mainline code :)

@kovyrin
Copy link
Author

kovyrin commented Dec 11, 2023

@dbmurphy Unfortunately, due to some changes at Shopify, we had to pivot and change our approach for the project, making Maxwell redundant in our infrastructure. We have run this code in production for about a couple of months before shutting down, so I am fairly certain in it as an experimental feature, but may not be able to push it over the finish line at this point. :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Connect to Vitess
3 participants