From 0776376c66b9f33c09360d08705cdb844b17523f Mon Sep 17 00:00:00 2001 From: Shiv Nagarajan Date: Fri, 7 Jan 2022 14:34:40 -0500 Subject: [PATCH 1/2] add bits for running arm64 mysql8 Co-authored-by: Aaron Brady Co-authored-by: Pawan Dubey --- .github/workflows/start-mysql.sh | 44 ++++--- .github/workflows/tests.yml | 5 +- .github/workflows/tests_5.7.yml | 74 +++++++++++ dev.yml | 20 ++- dml_events.go | 3 + docker-compose.yml | 5 +- docker-compose_8.0.yml | 68 ++++++++++ examples/gh-285/bugreport.sh | 2 - go.mod | 2 +- go.sum | 2 + test/go/lag_throttler_test.go | 23 ++-- test/go/race_conditions_integration_test.go | 3 +- test/integration/inline_verifier_test.rb | 5 +- testhelpers/helper_methods.go | 21 ++-- .../go-mysql-org/go-mysql/client/auth.go | 41 ++++--- .../go-mysql-org/go-mysql/client/conn.go | 43 ++++++- .../go-mysql-org/go-mysql/client/pool.go | 5 +- .../go-mysql-org/go-mysql/client/resp.go | 43 ++++--- .../go-mysql-org/go-mysql/client/stmt.go | 24 +++- .../go-mysql-org/go-mysql/mysql/errname.go | 92 +++++++------- .../go-mysql-org/go-mysql/mysql/mysql_gtid.go | 81 ++++++++++++ .../go-mysql-org/go-mysql/mysql/result.go | 3 +- .../go-mysql-org/go-mysql/mysql/resultset.go | 6 + .../go-mysql/mysql/resultset_helper.go | 4 +- .../go-mysql-org/go-mysql/mysql/rowdata.go | 2 +- .../go-mysql-org/go-mysql/mysql/util.go | 19 ++- .../go-mysql-org/go-mysql/packet/conn.go | 29 ++++- .../go-mysql/replication/binlogsyncer.go | 20 +-- .../go-mysql/replication/const.go | 8 ++ .../go-mysql/replication/event.go | 16 +++ .../go-mysql/replication/generic_event.go | 5 - .../go-mysql/replication/parser.go | 16 ++- .../go-mysql/replication/row_event.go | 116 +++++++++++++----- .../go-mysql-org/go-mysql/schema/schema.go | 46 ++++--- .../go-mysql/utils/bytes_buffer_pool.go | 8 ++ vendor/modules.txt | 2 +- 36 files changed, 690 insertions(+), 216 deletions(-) create mode 100644 .github/workflows/tests_5.7.yml create mode 100644 docker-compose_8.0.yml diff --git a/.github/workflows/start-mysql.sh b/.github/workflows/start-mysql.sh index 2282d494..adbdd308 100755 --- a/.github/workflows/start-mysql.sh +++ b/.github/workflows/start-mysql.sh @@ -1,7 +1,7 @@ #!/bin/bash set -xe -DOCKER_COMPOSE_VERSION=1.29.2 +DOCKER_COMPOSE_VERSION=v2.2.3 sudo apt-get update sudo apt-get install -y netcat-openbsd make gcc @@ -9,27 +9,41 @@ sudo apt-get install -y netcat-openbsd make gcc sudo curl -o /usr/local/bin/docker-compose -L https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` sudo chmod +x /usr/local/bin/docker-compose -docker-compose up -d mysql-1 mysql-2 +if [ "$MYSQL_VERSION" == "8.0" ]; then + docker-compose -f docker-compose_8.0.yml up -d mysql-1 mysql-2 +else + docker-compose up -d mysql-1 mysql-2 +fi -# We need a way to check if the mysql servers have booted or not before running -# the tests and this way is slightly faster than installing mysql-client +MAX_ATTEMPTS=60 -wait_for_mysql() { - port=$1 - echo "Waiting for MySQL at port $port..." +function wait_for_version () { attempts=0 - while ! nc -w 1 localhost $port | grep -q "mysql"; do + until docker exec -t $1 mysql -N -s -u root -e "select @@version"; do sleep 1 attempts=$((attempts + 1)) - if (( attempts > 60 )); then - echo "ERROR: mysql $port was not started." >&2 - exit 1 + if (( attempts > $MAX_ATTEMPTS )); then + echo "ERROR: $1 was not started." >&2 + exit 1 fi done - echo "MySQL at port $port has started!" } -wait_for_mysql 29291 -wait_for_mysql 29292 +wait_for_configuration () { + attempts=0 + # we do need to see the "root@%" user configured, so wait for that + until mysql --port $1 --protocol tcp --skip-password -N -s -u root -e "select host from mysql.user where user = 'root';" 2>/dev/null | grep -q '%'; do + sleep 1 + attempts=$((attempts + 1)) + if (( attempts > $MAX_ATTEMPTS )); then + echo "ERROR: $1 was not started." >&2 + exit 1 + fi + done +} + +wait_for_version "ghostferry-mysql-1-1" +wait_for_version "ghostferry-mysql-2-1" -docker-compose exec -T mysql-1 mysql -u root -e "select @@version" +wait_for_configuration 29291 +wait_for_configuration 29292 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a892ca99..1cd54270 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -1,4 +1,4 @@ -name: Ghostferry tests +name: Ghostferry MySQL 8.0 tests on: push: @@ -12,6 +12,7 @@ jobs: timeout-minutes: 15 env: CI: "true" + MYSQL_VERSION: "8.0" steps: - uses: actions/checkout@v2 @@ -30,6 +31,7 @@ jobs: timeout-minutes: 15 env: CI: "true" + MYSQL_VERSION: "8.0" steps: - uses: actions/checkout@v2 @@ -49,6 +51,7 @@ jobs: env: CI: "true" BUNDLE_WITHOUT: "development" + MYSQL_VERSION: "8.0" steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/tests_5.7.yml b/.github/workflows/tests_5.7.yml new file mode 100644 index 00000000..62aa65b3 --- /dev/null +++ b/.github/workflows/tests_5.7.yml @@ -0,0 +1,74 @@ +name: Ghostferry MySQL 5.7 tests + +on: + push: + branches: + - master + pull_request: + +jobs: + gh-285: + runs-on: ubuntu-latest + timeout-minutes: 15 + env: + CI: "true" + MYSQL_VERSION: "5.7" + steps: + - uses: actions/checkout@v2 + + - name: Setup Go + uses: actions/setup-go@v2 + with: + go-version: 1.16 + + - name: Starting up MySQL + run: .github/workflows/start-mysql.sh + + - name: Running GH-285 test + run: ./examples/gh-285/bugreport.sh + go-test: + runs-on: ubuntu-latest + timeout-minutes: 15 + env: + CI: "true" + MYSQL_VERSION: "5.7" + steps: + - uses: actions/checkout@v2 + + - name: Setup Go + uses: actions/setup-go@v2 + with: + go-version: 1.16 + + - name: Starting up MySQL + run: .github/workflows/start-mysql.sh + + - name: Running Golang tests + run: make test-go + ruby-test: + runs-on: ubuntu-latest + timeout-minutes: 15 + env: + CI: "true" + MYSQL_VERSION: "5.7" + BUNDLE_WITHOUT: "development" + steps: + - uses: actions/checkout@v2 + + - name: Setup Golang + uses: actions/setup-go@v2 + with: + go-version: 1.16 + + - name: Setup Ruby + uses: ruby/setup-ruby@v1 + with: + ruby-version: 2.7 + bundler-cache: true + + - name: Starting up MySQL + run: .github/workflows/start-mysql.sh + + - name: Running Ruby tests + run: bundle exec ruby test/main.rb + diff --git a/dev.yml b/dev.yml index 2412876a..33073b96 100644 --- a/dev.yml +++ b/dev.yml @@ -1,8 +1,14 @@ name: ghostferry +env: + MYSQL_VERSION: "8.0" + up: - homebrew: - - mysql + - mysql-client@5.7: + or: [mysql@5.7] + conflicts: [mysql-connector-c, mysql, mysql-client] + - ruby: "2.7.3" - bundler - go: @@ -20,11 +26,17 @@ up: meet: echo 'go mod failed to download dependencies'; false - custom: name: MySQL - met?: docker-compose up -d mysql-1 mysql-2 + met?: docker-compose -f docker-compose_8.0.yml up -d mysql-1 mysql-2 meet: echo 'mysql failed to start'; false - down: docker-compose stop mysql-1 mysql-2 + down: docker-compose -f docker-compose_8.0.yml stop mysql-1 mysql-2 commands: test: - desc: Run the test suite. + desc: Run all the tests. run: make test + test-go: + desc: Run the golang test suite. + run: make test-go + test-ruby: + desc: Run the ruby test suite. + run: make test-ruby diff --git a/dml_events.go b/dml_events.go index 5cc7d0c7..30a0f9c4 100644 --- a/dml_events.go +++ b/dml_events.go @@ -423,6 +423,9 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol switch v := value.(type) { case string: + if column.Type == schema.TYPE_JSON { + return appendEscapedBuffer(buffer, []byte(v), true) + } var rightPadLengthForBinaryColumn int = 0 // see appendEscapedString() for details why we need special // handling of BINARY column types diff --git a/docker-compose.yml b/docker-compose.yml index 1c41d14a..c2a0cc84 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,6 @@ version: "3" services: mysql-1: image: percona:5.7 - platform: linux/x86_64 command: --server-id=1 --log-bin=mysql-bin --max-binlog-size=4096 @@ -18,6 +17,7 @@ services: --binlog-rows-query-log-events=ON environment: MYSQL_ALLOW_EMPTY_PASSWORD: "yes" + MYSQL_ROOT_HOST: "%" volumes: - /var/lib/mysql ports: @@ -25,7 +25,6 @@ services: mysql-2: image: percona:5.7 - platform: linux/x86_64 command: --server-id=2 --log-bin=mysql-bin --binlog-format=ROW @@ -40,6 +39,7 @@ services: --binlog-rows-query-log-events=ON environment: MYSQL_ALLOW_EMPTY_PASSWORD: "yes" + MYSQL_ROOT_HOST: "%" volumes: - /var/lib/mysql ports: @@ -61,6 +61,7 @@ services: --binlog-rows-query-log-events=ON environment: MYSQL_ALLOW_EMPTY_PASSWORD: "yes" + MYSQL_ROOT_HOST: "%" volumes: - /var/lib/mysql ports: diff --git a/docker-compose_8.0.yml b/docker-compose_8.0.yml new file mode 100644 index 00000000..87d6eb7b --- /dev/null +++ b/docker-compose_8.0.yml @@ -0,0 +1,68 @@ +version: "3" +services: + mysql-1: + image: docker.io/mysql/mysql-server:8.0 + command: --server-id=1 + --log-bin=mysql-bin + --max-binlog-size=4096 + --binlog-format=ROW + --sync-binlog=1 + --log-slave-updates=ON + --gtid-mode=ON + --enforce-gtid-consistency=ON + --character-set-server=utf8mb4 + --collation-server=utf8mb4_unicode_ci + --max-connections=1000 + --read-only=OFF + --binlog-rows-query-log-events=ON + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: "yes" + MYSQL_ROOT_HOST: "%" + volumes: + - /var/lib/mysql + ports: + - "29291:3306" + + mysql-2: + image: docker.io/mysql/mysql-server:8.0 + command: --server-id=2 + --log-bin=mysql-bin + --binlog-format=ROW + --max-binlog-size=4096 + --sync-binlog=1 + --log-slave-updates=ON + --gtid-mode=ON + --enforce-gtid-consistency=ON + --character-set-server=utf8mb4 + --collation-server=utf8mb4_unicode_ci + --max-connections=1000 + --binlog-rows-query-log-events=ON + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: "yes" + MYSQL_ROOT_HOST: "%" + volumes: + - /var/lib/mysql + ports: + - "29292:3306" + + mysql-3: + image: docker.io/mysql/mysql-server:8.0 + command: --server-id=3 + --log-bin=mysql-bin + --binlog-format=ROW + --max-binlog-size=4096 + --sync-binlog=1 + --log-slave-updates=ON + --gtid-mode=ON + --enforce-gtid-consistency=ON + --character-set-server=utf8mb4 + --collation-server=utf8mb4_unicode_ci + --max-connections=1000 + --binlog-rows-query-log-events=ON + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: "yes" + MYSQL_ROOT_HOST: "%" + volumes: + - /var/lib/mysql + ports: + - "29293:3306" diff --git a/examples/gh-285/bugreport.sh b/examples/gh-285/bugreport.sh index 2b098e66..b148bb18 100755 --- a/examples/gh-285/bugreport.sh +++ b/examples/gh-285/bugreport.sh @@ -2,8 +2,6 @@ set -e -docker-compose up -d - mysql -h 127.0.0.1 -u root -P 29291 -e 'DROP DATABASE IF EXISTS `abc`' mysql -h 127.0.0.1 -u root -P 29292 -e 'DROP DATABASE IF EXISTS `abc`' diff --git a/go.mod b/go.mod index 063e4ff1..36772fac 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/datadog-go v4.8.2+incompatible github.com/Masterminds/squirrel v0.0.0-20180620232226-b127ed9be034 github.com/Microsoft/go-winio v0.5.0 // indirect - github.com/go-mysql-org/go-mysql v1.3.0 + github.com/go-mysql-org/go-mysql v1.4.1-0.20220112102103-b3f1a27311d8 github.com/go-sql-driver/mysql v1.5.0 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db github.com/gorilla/context v1.1.1 // indirect diff --git a/go.sum b/go.sum index 592174f4..69a6334b 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-mysql-org/go-mysql v1.3.0 h1:lpNqkwdPzIrYSZGdqt8HIgAXZaK6VxBNfr8f7Z4FgGg= github.com/go-mysql-org/go-mysql v1.3.0/go.mod h1:3lFZKf7l95Qo70+3XB2WpiSf9wu2s3na3geLMaIIrqQ= +github.com/go-mysql-org/go-mysql v1.4.1-0.20220112102103-b3f1a27311d8 h1:ViPrQui89RKKSFi0FwonSGGCMksRaYFZm+mu8nQpM84= +github.com/go-mysql-org/go-mysql v1.4.1-0.20220112102103-b3f1a27311d8/go.mod h1:3lFZKf7l95Qo70+3XB2WpiSf9wu2s3na3geLMaIIrqQ= github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= diff --git a/test/go/lag_throttler_test.go b/test/go/lag_throttler_test.go index 3e719f98..c83ca923 100644 --- a/test/go/lag_throttler_test.go +++ b/test/go/lag_throttler_test.go @@ -2,11 +2,13 @@ package test import ( "context" - sql "github.com/Shopify/ghostferry/sqlwrapper" + "os" "sync" "testing" "time" + sql "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/Shopify/ghostferry" "github.com/Shopify/ghostferry/testhelpers" "github.com/stretchr/testify/assert" @@ -28,7 +30,7 @@ func newThrottlerWithQuery(query string) *ghostferry.LagThrottler { } func newThrottler() *ghostferry.LagThrottler { - return newThrottlerWithQuery("SELECT MAX(lag) FROM meta.lag_table") + return newThrottlerWithQuery("SELECT MAX(throttler_lag) FROM meta.lag_table") } func setupLagTable(db *sql.DB, ctx context.Context) { @@ -38,12 +40,12 @@ func setupLagTable(db *sql.DB, ctx context.Context) { _, err = db.Exec("CREATE DATABASE meta") testhelpers.PanicIfError(err) - _, err = db.Exec("CREATE TABLE meta.lag_table (lag FLOAT NOT NULL, server_id int unsigned NOT NULL PRIMARY KEY)") + _, err = db.Exec("CREATE TABLE meta.lag_table (throttler_lag FLOAT NOT NULL, server_id int unsigned NOT NULL PRIMARY KEY)") testhelpers.PanicIfError(err) } func setLag(throttler *ghostferry.LagThrottler, serverId int, lag float32) { - _, err := throttler.DB.Exec("INSERT INTO meta.lag_table (lag, server_id) VALUES (?, ?) ON DUPLICATE KEY UPDATE lag = ?", lag, serverId, lag) + _, err := throttler.DB.Exec("INSERT INTO meta.lag_table (throttler_lag, server_id) VALUES (?, ?) ON DUPLICATE KEY UPDATE throttler_lag = ?", lag, serverId, lag) testhelpers.PanicIfError(err) time.Sleep(10 * time.Millisecond) } @@ -108,7 +110,7 @@ func TestNewThrottlerConfigErrors(t *testing.T) { okConfig := ghostferry.LagThrottlerConfig{ Connection: connConfig, - Query: "SELECT MAX(lag) FROM meta.lag_table", + Query: "SELECT MAX(throttler_lag) FROM meta.lag_table", } config := okConfig @@ -141,7 +143,14 @@ func TestThrottlerRunErrors(t *testing.T) { err = throttler.Run(ctx) assert.NotNil(t, err) - assert.Equal(t, "Error 1146: Table 'meta.lag_table' doesn't exist", err.Error()) + expectedError := "Error 1146: Table 'meta.lag_table' doesn't exist" + + if os.Getenv("MYSQL_VERSION") == "8.0" { + expectedError = "Error 1049: Unknown database 'meta'" + } + + validError := (err.Error() == expectedError) + assert.True(t, validError) done() err = throttler.Run(ctx) @@ -151,7 +160,7 @@ func TestThrottlerRunErrors(t *testing.T) { func TestThrottlerWithNullReturned(t *testing.T) { ctx, done := context.WithCancel(context.Background()) - throttler := newThrottlerWithQuery("SELECT MAX(lag) FROM meta.lag_table") + throttler := newThrottlerWithQuery("SELECT MAX(throttler_lag) FROM meta.lag_table") setupLagTable(throttler.DB, ctx) wg := &sync.WaitGroup{} diff --git a/test/go/race_conditions_integration_test.go b/test/go/race_conditions_integration_test.go index eb78c127..15ec599c 100644 --- a/test/go/race_conditions_integration_test.go +++ b/test/go/race_conditions_integration_test.go @@ -2,10 +2,11 @@ package test import ( "fmt" - sql "github.com/Shopify/ghostferry/sqlwrapper" "testing" "time" + sql "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/Shopify/ghostferry" "github.com/Shopify/ghostferry/testhelpers" "github.com/stretchr/testify/require" diff --git a/test/integration/inline_verifier_test.rb b/test/integration/inline_verifier_test.rb index 97c209f8..a12ed0e1 100644 --- a/test/integration/inline_verifier_test.rb +++ b/test/integration/inline_verifier_test.rb @@ -556,9 +556,12 @@ def test_utfmb3_data_from_utfmb4_to_utfmb3 run_collation_test(UTF8MB3DATA, "utf8mb4", "utf8mb3", identical: true) end + # skip on MySQL 8 + # More details at + # https://github.com/Shopify/ghostferry/pull/328#discussion_r791197939 def test_utfmb4_data_from_utfmb4_to_utfmb3 run_collation_test(UTF8MB4DATA, "utf8mb4", "utf8mb3", identical: false) - end + end unless ENV['MYSQL_VERSION'] == '8.0' private diff --git a/testhelpers/helper_methods.go b/testhelpers/helper_methods.go index 1475c028..8b92b192 100644 --- a/testhelpers/helper_methods.go +++ b/testhelpers/helper_methods.go @@ -2,10 +2,12 @@ package testhelpers import ( sqlorig "database/sql" - sql "github.com/Shopify/ghostferry/sqlwrapper" + "regexp" "strings" "testing" + sql "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/Shopify/ghostferry" "github.com/stretchr/testify/assert" ) @@ -24,26 +26,29 @@ func ProcessListContainsQueries(db *sql.DB, queries []string) bool { defer rows.Close() - queriesFound := make(map[string]bool) + queriesFound := make(map[*regexp.Regexp]bool) for _, query := range queries { - queriesFound[query] = false + re := regexp.MustCompile(strings.Replace(regexp.QuoteMeta(query), `\?`, `\S`, -1)) + queriesFound[re] = false } for rows.Next() { - data, err := ghostferry.ScanGenericRow(rows, 10) + columns, _ := rows.Columns() + data, err := ghostferry.ScanGenericRow(rows, len(columns)) if err != nil { panic(err) } - if data[7] == nil { + if data[7] == nil || data[4] == nil { continue } info := data[7].([]byte) + command := data[4].([]byte) - for query, found := range queriesFound { - if !found && strings.TrimSpace(string(info)) == query { - queriesFound[query] = true + for re, found := range queriesFound { + if !found && string(command) == "Execute" && re.MatchString(string(info)) { + queriesFound[re] = true break } } diff --git a/vendor/github.com/go-mysql-org/go-mysql/client/auth.go b/vendor/github.com/go-mysql-org/go-mysql/client/auth.go index 01217f83..6f0ba5f8 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/client/auth.go +++ b/vendor/github.com/go-mysql-org/go-mysql/client/auth.go @@ -77,21 +77,25 @@ func (c *Conn) readInitialHandshake() error { c.capability = uint32(binary.LittleEndian.Uint16(data[pos:pos+2]))<<16 | c.capability pos += 2 - // skip auth data len or [00] + // auth_data is end with 0x00, min data length is 13 + 8 = 21 + // ref to https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::Handshake + maxAuthDataLen := 21 + if c.capability&CLIENT_PLUGIN_AUTH != 0 && int(data[pos]) > maxAuthDataLen { + maxAuthDataLen = int(data[pos]) + } + // skip reserved (all [00]) pos += 10 + 1 - // The documentation is ambiguous about the length. - // The official Python library uses the fixed length 12 - // mysql-proxy also use 12 - // which is not documented but seems to work. - c.salt = append(c.salt, data[pos:pos+12]...) - pos += 13 - // auth plugin - if end := bytes.IndexByte(data[pos:], 0x00); end != -1 { - c.authPluginName = string(data[pos : pos+end]) - } else { - c.authPluginName = string(data[pos:]) + // auth_data is end with 0x00, so we need to trim 0x00 + resetOfAuthDataEndPos := pos + maxAuthDataLen - 8 - 1 + c.salt = append(c.salt, data[pos:resetOfAuthDataEndPos]...) + + // skip reset of end pos + pos = resetOfAuthDataEndPos + 1 + + if c.capability&CLIENT_PLUGIN_AUTH != 0 { + c.authPluginName = string(data[pos : len(data)-1]) } } @@ -140,9 +144,18 @@ func (c *Conn) writeAuthHandshake() error { if !authPluginAllowed(c.authPluginName) { return fmt.Errorf("unknow auth plugin name '%s'", c.authPluginName) } - // Adjust client capability flags based on server support + + // Set default client capabilities that reflect the abilities of this library capability := CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | - CLIENT_LONG_PASSWORD | CLIENT_TRANSACTIONS | CLIENT_PLUGIN_AUTH | c.capability&CLIENT_LONG_FLAG + CLIENT_LONG_PASSWORD | CLIENT_TRANSACTIONS | CLIENT_PLUGIN_AUTH + // Adjust client capability flags based on server support + capability |= c.capability & CLIENT_LONG_FLAG + // Adjust client capability flags on specific client requests + // Only flags that would make any sense setting and aren't handled elsewhere + // in the library are supported here + capability |= c.ccaps&CLIENT_FOUND_ROWS | c.ccaps&CLIENT_IGNORE_SPACE | + c.ccaps&CLIENT_MULTI_STATEMENTS | c.ccaps&CLIENT_MULTI_RESULTS | + c.ccaps&CLIENT_PS_MULTI_RESULTS | c.ccaps&CLIENT_CONNECT_ATTRS // To enable TLS / SSL if c.tlsConfig != nil { diff --git a/vendor/github.com/go-mysql-org/go-mysql/client/conn.go b/vendor/github.com/go-mysql-org/go-mysql/client/conn.go index 1b374446..19318ddd 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/client/conn.go +++ b/vendor/github.com/go-mysql-org/go-mysql/client/conn.go @@ -1,6 +1,7 @@ package client import ( + "context" "crypto/tls" "fmt" "net" @@ -21,7 +22,10 @@ type Conn struct { tlsConfig *tls.Config proto string + // server capabilities capability uint32 + // client-set capabilities only + ccaps uint32 status uint16 @@ -36,6 +40,9 @@ type Conn struct { // This function will be called for every row in resultset from ExecuteSelectStreaming. type SelectPerRowCallback func(row []FieldValue) error +// This function will be called once per result from ExecuteSelectStreaming +type SelectPerResultCallback func(result *Result) error + func getNetProto(addr string) string { proto := "tcp" if strings.Contains(addr, "/") { @@ -49,10 +56,23 @@ func getNetProto(addr string) string { func Connect(addr string, user string, password string, dbName string, options ...func(*Conn)) (*Conn, error) { proto := getNetProto(addr) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + dialer := &net.Dialer{} + + return ConnectWithDialer(ctx, proto, addr, user, password, dbName, dialer.DialContext, options...) +} + +// Dialer connects to the address on the named network using the provided context. +type Dialer func(ctx context.Context, network, address string) (net.Conn, error) + +// Connect to a MySQL server using the given Dialer. +func ConnectWithDialer(ctx context.Context, network string, addr string, user string, password string, dbName string, dialer Dialer, options ...func(*Conn)) (*Conn, error) { c := new(Conn) var err error - conn, err := net.DialTimeout(proto, addr, 10*time.Second) + conn, err := dialer(ctx, network, addr) if err != nil { return nil, errors.Trace(err) } @@ -66,9 +86,9 @@ func Connect(addr string, user string, password string, dbName string, options . c.user = user c.password = password c.db = dbName - c.proto = proto + c.proto = network - //use default charset here, utf-8 + // use default charset here, utf-8 c.charset = DEFAULT_CHARSET // Apply configuration functions. @@ -120,6 +140,16 @@ func (c *Conn) Ping() error { return nil } +// SetCapability enables the use of a specific capability +func (c *Conn) SetCapability(cap uint32) { + c.ccaps |= cap +} + +// UnsetCapability disables the use of a specific capability +func (c *Conn) UnsetCapability(cap uint32) { + c.ccaps &= ^cap +} + // UseSSL: use default SSL // pass to options when connect func (c *Conn) UseSSL(insecureSkipVerify bool) { @@ -170,6 +200,7 @@ func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) { // ExecuteSelectStreaming will call perRowCallback for every row in resultset // WITHOUT saving any row data to Result.{Values/RawPkg/RowDatas} fields. +// When given, perResultCallback will be called once per result // // ExecuteSelectStreaming should be used only for SELECT queries with a large response resultset for memory preserving. // @@ -180,14 +211,14 @@ func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) { // // Use the row as you want. // // You must not save FieldValue.AsString() value after this callback is done. Copy it if you need. // return nil -// }) +// }, nil) // -func (c *Conn) ExecuteSelectStreaming(command string, result *Result, perRowCallback SelectPerRowCallback) error { +func (c *Conn) ExecuteSelectStreaming(command string, result *Result, perRowCallback SelectPerRowCallback, perResultCallback SelectPerResultCallback) error { if err := c.writeCommandStr(COM_QUERY, command); err != nil { return errors.Trace(err) } - return c.readResultStreaming(false, result, perRowCallback) + return c.readResultStreaming(false, result, perRowCallback, perResultCallback) } func (c *Conn) Begin() error { diff --git a/vendor/github.com/go-mysql-org/go-mysql/client/pool.go b/vendor/github.com/go-mysql-org/go-mysql/client/pool.go index c8c1f45d..029487e4 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/client/pool.go +++ b/vendor/github.com/go-mysql-org/go-mysql/client/pool.go @@ -466,11 +466,12 @@ func (pool *Pool) startNewConnections(count int) { func (pool *Pool) ping(conn *Conn) error { deadline := time.Now().Add(100 * time.Millisecond) - _ = conn.SetWriteDeadline(deadline) - _ = conn.SetReadDeadline(deadline) + _ = conn.SetDeadline(deadline) err := conn.Ping() if err != nil { pool.logFunc(`Pool: ping query fail: %s`, err.Error()) + } else { + _ = conn.SetDeadline(time.Time{}) } return err } diff --git a/vendor/github.com/go-mysql-org/go-mysql/client/resp.go b/vendor/github.com/go-mysql-org/go-mysql/client/resp.go index cc944146..12fbac92 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/client/resp.go +++ b/vendor/github.com/go-mysql-org/go-mysql/client/resp.go @@ -7,10 +7,11 @@ import ( "encoding/binary" "encoding/pem" - . "github.com/go-mysql-org/go-mysql/mysql" - "github.com/go-mysql-org/go-mysql/utils" "github.com/pingcap/errors" "github.com/siddontang/go/hack" + + . "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/utils" ) func (c *Conn) readUntilEOF() (err error) { @@ -39,7 +40,7 @@ func (c *Conn) handleOKPacket(data []byte) (*Result, error) { var n int var pos = 1 - r := &Result{Resultset: &Resultset{}} + r := new(Result) r.AffectedRows, _, n = LengthEncodedInt(data[pos:]) pos += n @@ -52,8 +53,8 @@ func (c *Conn) handleOKPacket(data []byte) (*Result, error) { pos += 2 //todo:strict_mode, check warnings as error - //Warnings := binary.LittleEndian.Uint16(data[pos:]) - //pos += 2 + r.Warnings = binary.LittleEndian.Uint16(data[pos:]) + pos += 2 } else if c.capability&CLIENT_TRANSACTIONS > 0 { r.Status = binary.LittleEndian.Uint16(data[pos:]) c.status = r.Status @@ -127,9 +128,8 @@ func (c *Conn) handleAuthResult() error { return nil // auth already succeeded } if data[0] == CACHE_SHA2_FAST_AUTH { - if _, err = c.readOK(); err == nil { - return nil // auth successful - } + _, err = c.readOK() + return err } else if data[0] == CACHE_SHA2_FULL_AUTH { // need full authentication if c.tlsConfig != nil || c.proto == "unix" { @@ -141,6 +141,8 @@ func (c *Conn) handleAuthResult() error { return err } } + _, err = c.readOK() + return err } else { return errors.Errorf("invalid packet %x", data[0]) } @@ -233,7 +235,7 @@ func (c *Conn) readResult(binary bool) (*Result, error) { return c.readResultset(firstPkgBuf, binary) } -func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectPerRowCallback) error { +func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback) error { firstPkgBuf, err := c.ReadPacketReuseMem(utils.ByteSliceGet(16)[:0]) defer utils.ByteSlicePut(firstPkgBuf) @@ -254,6 +256,7 @@ func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectP result.Status = okResult.Status result.AffectedRows = okResult.AffectedRows result.InsertId = okResult.InsertId + result.Warnings = okResult.Warnings if result.Resultset == nil { result.Resultset = NewResultset(0) } else { @@ -266,7 +269,7 @@ func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectP return ErrMalformPacket } - return c.readResultsetStreaming(firstPkgBuf, binary, result, perRowCb) + return c.readResultsetStreaming(firstPkgBuf, binary, result, perRowCb, perResCb) } func (c *Conn) readResultset(data []byte, binary bool) (*Result, error) { @@ -292,7 +295,7 @@ func (c *Conn) readResultset(data []byte, binary bool) (*Result, error) { return result, nil } -func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result, perRowCb SelectPerRowCallback) error { +func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback) error { columnCount, _, n := LengthEncodedInt(data) if n-len(data) != 0 { @@ -306,14 +309,26 @@ func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result, result.Reset(int(columnCount)) } + // this is a streaming resultset + result.Resultset.Streaming = true + if err := c.readResultColumns(result); err != nil { return errors.Trace(err) } + if perResCb != nil { + if err := perResCb(result); err != nil { + return err + } + } + if err := c.readResultRowsStreaming(result, binary, perRowCb); err != nil { return errors.Trace(err) } + // this resultset is done streaming + result.Resultset.StreamingDone = true + return nil } @@ -332,7 +347,7 @@ func (c *Conn) readResultColumns(result *Result) (err error) { // EOF Packet if c.isEOFPacket(data) { if c.capability&CLIENT_PROTOCOL_41 > 0 { - //result.Warnings = binary.LittleEndian.Uint16(data[1:]) + result.Warnings = binary.LittleEndian.Uint16(data[1:]) //todo add strict_mode, warning will be treat as error result.Status = binary.LittleEndian.Uint16(data[3:]) c.status = result.Status @@ -373,7 +388,7 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) { // EOF Packet if c.isEOFPacket(data) { if c.capability&CLIENT_PROTOCOL_41 > 0 { - //result.Warnings = binary.LittleEndian.Uint16(data[1:]) + result.Warnings = binary.LittleEndian.Uint16(data[1:]) //todo add strict_mode, warning will be treat as error result.Status = binary.LittleEndian.Uint16(data[3:]) c.status = result.Status @@ -421,7 +436,7 @@ func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb S // EOF Packet if c.isEOFPacket(data) { if c.capability&CLIENT_PROTOCOL_41 > 0 { - // result.Warnings = binary.LittleEndian.Uint16(data[1:]) + result.Warnings = binary.LittleEndian.Uint16(data[1:]) // todo add strict_mode, warning will be treat as error result.Status = binary.LittleEndian.Uint16(data[3:]) c.status = result.Status diff --git a/vendor/github.com/go-mysql-org/go-mysql/client/stmt.go b/vendor/github.com/go-mysql-org/go-mysql/client/stmt.go index 239da78b..c9f4a175 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/client/stmt.go +++ b/vendor/github.com/go-mysql-org/go-mysql/client/stmt.go @@ -2,6 +2,7 @@ package client import ( "encoding/binary" + "encoding/json" "fmt" "math" @@ -13,8 +14,9 @@ type Stmt struct { conn *Conn id uint32 - params int - columns int + params int + columns int + warnings int } func (s *Stmt) ParamNum() int { @@ -25,6 +27,10 @@ func (s *Stmt) ColumnNum() int { return s.columns } +func (s *Stmt) WarningsNum() int { + return s.warnings +} + func (s *Stmt) Execute(args ...interface{}) (*Result, error) { if err := s.write(args...); err != nil { return nil, errors.Trace(err) @@ -33,6 +39,14 @@ func (s *Stmt) Execute(args ...interface{}) (*Result, error) { return s.conn.readResult(true) } +func (s *Stmt) ExecuteSelectStreaming(result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback, args ...interface{}) error { + if err := s.write(args...); err != nil { + return errors.Trace(err) + } + + return s.conn.readResultStreaming(true, result, perRowCb, perResCb) +} + func (s *Stmt) Close() error { if err := s.conn.writeCommandUint32(COM_STMT_CLOSE, s.id); err != nil { return errors.Trace(err) @@ -122,6 +136,9 @@ func (s *Stmt) write(args ...interface{}) error { case []byte: paramTypes[i<<1] = MYSQL_TYPE_STRING paramValues[i] = append(PutLengthEncodedInt(uint64(len(v))), v...) + case json.RawMessage: + paramTypes[i<<1] = MYSQL_TYPE_STRING + paramValues[i] = append(PutLengthEncodedInt(uint64(len(v))), v...) default: return fmt.Errorf("invalid argument type %T", args[i]) } @@ -196,7 +213,8 @@ func (c *Conn) Prepare(query string) (*Stmt, error) { pos += 2 //warnings - //warnings = binary.LittleEndian.Uint16(data[pos:]) + s.warnings = int(binary.LittleEndian.Uint16(data[pos:])) + pos += 2 if s.params > 0 { if err := s.conn.readUntilEOF(); err != nil { diff --git a/vendor/github.com/go-mysql-org/go-mysql/mysql/errname.go b/vendor/github.com/go-mysql-org/go-mysql/mysql/errname.go index 0ff7a13d..592f4bb5 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/mysql/errname.go +++ b/vendor/github.com/go-mysql-org/go-mysql/mysql/errname.go @@ -75,28 +75,28 @@ var MySQLErrName = map[uint16]string{ ER_TOO_LONG_KEY: "Specified key was too long; max key length is %d bytes", ER_KEY_COLUMN_DOES_NOT_EXITS: "Key column '%-.192s' doesn't exist in table", ER_BLOB_USED_AS_KEY: "BLOB column '%-.192s' can't be used in key specification with the used table type", - ER_TOO_BIG_FIELDLENGTH: "Column length too big for column '%-.192s' (max = %lu); use BLOB or TEXT instead", + ER_TOO_BIG_FIELDLENGTH: "Column length too big for column '%-.192s' (max = %d); use BLOB or TEXT instead", ER_WRONG_AUTO_KEY: "Incorrect table definition; there can be only one auto column and it must be defined as a key", ER_READY: "%s: ready for connections.\nVersion: '%s' socket: '%s' port: %d", ER_NORMAL_SHUTDOWN: "%s: Normal shutdown\n", ER_GOT_SIGNAL: "%s: Got signal %d. Aborting!\n", ER_SHUTDOWN_COMPLETE: "%s: Shutdown complete\n", - ER_FORCING_CLOSE: "%s: Forcing close of thread %ld user: '%-.48s'\n", + ER_FORCING_CLOSE: "%s: Forcing close of thread %d user: '%-.48s'\n", ER_IPSOCK_ERROR: "Can't create IP socket", ER_NO_SUCH_INDEX: "Table '%-.192s' has no index like the one used in CREATE INDEX; recreate the table", ER_WRONG_FIELD_TERMINATORS: "Field separator argument is not what is expected; check the manual", ER_BLOBS_AND_NO_TERMINATED: "You can't use fixed rowlength with BLOBs; please use 'fields terminated by'", ER_TEXTFILE_NOT_READABLE: "The file '%-.128s' must be in the database directory or be readable by all", ER_FILE_EXISTS_ERROR: "File '%-.200s' already exists", - ER_LOAD_INFO: "Records: %ld Deleted: %ld Skipped: %ld Warnings: %ld", - ER_ALTER_INFO: "Records: %ld Duplicates: %ld", + ER_LOAD_INFO: "Records: %d Deleted: %d Skipped: %d Warnings: %d", + ER_ALTER_INFO: "Records: %d Duplicates: %d", ER_WRONG_SUB_KEY: "Incorrect prefix key; the used key part isn't a string, the used length is longer than the key part, or the storage engine doesn't support unique prefix keys", ER_CANT_REMOVE_ALL_FIELDS: "You can't delete all columns with ALTER TABLE; use DROP TABLE instead", ER_CANT_DROP_FIELD_OR_KEY: "Can't DROP '%-.192s'; check that column/key exists", - ER_INSERT_INFO: "Records: %ld Duplicates: %ld Warnings: %ld", + ER_INSERT_INFO: "Records: %d Duplicates: %d Warnings: %d", ER_UPDATE_TABLE_USED: "You can't specify target table '%-.192s' for update in FROM clause", - ER_NO_SUCH_THREAD: "Unknown thread id: %lu", - ER_KILL_DENIED_ERROR: "You are not owner of thread %lu", + ER_NO_SUCH_THREAD: "Unknown thread id: %d", + ER_KILL_DENIED_ERROR: "You are not owner of thread %d", ER_NO_TABLES_USED: "No tables used", ER_TOO_BIG_SET: "Too many strings for column %-.192s and SET", ER_NO_UNIQUE_LOGFILE: "Can't generate a unique log-filename %-.200s.(1-999)\n", @@ -119,8 +119,8 @@ var MySQLErrName = map[uint16]string{ ER_UNKNOWN_CHARACTER_SET: "Unknown character set: '%-.64s'", ER_TOO_MANY_TABLES: "Too many tables; MySQL can only use %d tables in a join", ER_TOO_MANY_FIELDS: "Too many columns", - ER_TOO_BIG_ROWSIZE: "Row size too large. The maximum row size for the used table type, not counting BLOBs, is %ld. This includes storage overhead, check the manual. You have to change some columns to TEXT or BLOBs", - ER_STACK_OVERRUN: "Thread stack overrun: Used: %ld of a %ld stack. Use 'mysqld --thread_stack=#' to specify a bigger stack if needed", + ER_TOO_BIG_ROWSIZE: "Row size too large. The maximum row size for the used table type, not counting BLOBs, is %d. This includes storage overhead, check the manual. You have to change some columns to TEXT or BLOBs", + ER_STACK_OVERRUN: "Thread stack overrun: Used: %d of a %d stack. Use 'mysqld --thread_stack=#' to specify a bigger stack if needed", ER_WRONG_OUTER_JOIN: "Cross dependency found in OUTER JOIN; examine your ON conditions", ER_NULL_COLUMN_IN_INDEX: "Table handler doesn't support NULL in given index. Please change column '%-.192s' to be NOT NULL or use another handler", ER_CANT_FIND_UDF: "Can't load function '%-.192s'", @@ -135,9 +135,9 @@ var MySQLErrName = map[uint16]string{ ER_PASSWORD_ANONYMOUS_USER: "You are using MySQL as an anonymous user and anonymous users are not allowed to change passwords", ER_PASSWORD_NOT_ALLOWED: "You must have privileges to update tables in the mysql database to be able to change passwords for others", ER_PASSWORD_NO_MATCH: "Can't find any matching row in the user table", - ER_UPDATE_INFO: "Rows matched: %ld Changed: %ld Warnings: %ld", + ER_UPDATE_INFO: "Rows matched: %d Changed: %d Warnings: %d", ER_CANT_CREATE_THREAD: "Can't create a new thread (errno %d); if you are not out of available memory, you can consult the manual for a possible OS-dependent bug", - ER_WRONG_VALUE_COUNT_ON_ROW: "Column count doesn't match value count at row %ld", + ER_WRONG_VALUE_COUNT_ON_ROW: "Column count doesn't match value count at row %d", ER_CANT_REOPEN_TABLE: "Can't reopen table: '%-.192s'", ER_INVALID_USE_OF_NULL: "Invalid use of NULL value", ER_REGEXP_ERROR: "Got error '%-.64s' from regexp", @@ -153,7 +153,7 @@ var MySQLErrName = map[uint16]string{ ER_SYNTAX_ERROR: "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use", ER_DELAYED_CANT_CHANGE_LOCK: "Delayed insert thread couldn't get requested lock for table %-.192s", ER_TOO_MANY_DELAYED_THREADS: "Too many delayed threads in use", - ER_ABORTING_CONNECTION: "Aborted connection %ld to db: '%-.192s' user: '%-.48s' (%-.64s)", + ER_ABORTING_CONNECTION: "Aborted connection %d to db: '%-.192s' user: '%-.48s' (%-.64s)", ER_NET_PACKET_TOO_LARGE: "Got a packet bigger than 'max_allowed_packet' bytes", ER_NET_READ_ERROR_FROM_PIPE: "Got a read error from the connection pipe", ER_NET_FCNTL_ERROR: "Got an error from fcntl()", @@ -185,7 +185,7 @@ var MySQLErrName = map[uint16]string{ ER_ERROR_DURING_ROLLBACK: "Got error %d during ROLLBACK", ER_ERROR_DURING_FLUSH_LOGS: "Got error %d during FLUSH_LOGS", ER_ERROR_DURING_CHECKPOINT: "Got error %d during CHECKPOINT", - ER_NEW_ABORTING_CONNECTION: "Aborted connection %ld to db: '%-.192s' user: '%-.48s' host: '%-.64s' (%-.64s)", + ER_NEW_ABORTING_CONNECTION: "Aborted connection %d to db: '%-.192s' user: '%-.48s' host: '%-.64s' (%-.64s)", ER_DUMP_NOT_IMPLEMENTED: "The storage engine for the table does not support binary table dump", ER_FLUSH_MASTER_BINLOG_CLOSED: "Binlog closed, cannot RESET MASTER", ER_INDEX_REBUILD: "Failed rebuilding the index of dumped table '%-.192s'", @@ -227,7 +227,7 @@ var MySQLErrName = map[uint16]string{ ER_CANT_UPDATE_WITH_READLOCK: "Can't execute the query because you have a conflicting read lock", ER_MIXING_NOT_ALLOWED: "Mixing of transactional and non-transactional tables is disabled", ER_DUP_ARGUMENT: "Option '%s' used twice in statement", - ER_USER_LIMIT_REACHED: "User '%-.64s' has exceeded the '%s' resource (current value: %ld)", + ER_USER_LIMIT_REACHED: "User '%-.64s' has exceeded the '%s' resource (current value: %d)", ER_SPECIFIC_ACCESS_DENIED_ERROR: "Access denied; you need (at least one of) the %-.128s privilege(s) for this operation", ER_LOCAL_VARIABLE: "Variable '%-.64s' is a SESSION variable and can't be used with SET GLOBAL", ER_GLOBAL_VARIABLE: "Variable '%-.64s' is a GLOBAL variable and should be set with SET GLOBAL", @@ -250,7 +250,7 @@ var MySQLErrName = map[uint16]string{ ER_AUTO_CONVERT: "Converting column '%s' from %s to %s", ER_ILLEGAL_REFERENCE: "Reference '%-.64s' not supported (%s)", ER_DERIVED_MUST_HAVE_ALIAS: "Every derived table must have its own alias", - ER_SELECT_REDUCED: "Select %u was reduced during optimization", + ER_SELECT_REDUCED: "Select %d was reduced during optimization", ER_TABLENAME_NOT_ALLOWED_HERE: "Table '%-.192s' from one of the SELECTs cannot be used in %-.32s", ER_NOT_SUPPORTED_AUTH_MODE: "Client does not support authentication protocol requested by server; consider upgrading MySQL client", ER_SPATIAL_CANT_HAVE_NULL: "All parts of a SPATIAL index must be NOT NULL", @@ -261,12 +261,12 @@ var MySQLErrName = map[uint16]string{ ER_ZLIB_Z_MEM_ERROR: "ZLIB: Not enough memory", ER_ZLIB_Z_BUF_ERROR: "ZLIB: Not enough room in the output buffer (probably, length of uncompressed data was corrupted)", ER_ZLIB_Z_DATA_ERROR: "ZLIB: Input data corrupted", - ER_CUT_VALUE_GROUP_CONCAT: "Row %u was cut by GROUP_CONCAT()", - ER_WARN_TOO_FEW_RECORDS: "Row %ld doesn't contain data for all columns", - ER_WARN_TOO_MANY_RECORDS: "Row %ld was truncated; it contained more data than there were input columns", - ER_WARN_NULL_TO_NOTNULL: "Column set to default value; NULL supplied to NOT NULL column '%s' at row %ld", - ER_WARN_DATA_OUT_OF_RANGE: "Out of range value for column '%s' at row %ld", - WARN_DATA_TRUNCATED: "Data truncated for column '%s' at row %ld", + ER_CUT_VALUE_GROUP_CONCAT: "Row %d was cut by GROUP_CONCAT()", + ER_WARN_TOO_FEW_RECORDS: "Row %d doesn't contain data for all columns", + ER_WARN_TOO_MANY_RECORDS: "Row %d was truncated; it contained more data than there were input columns", + ER_WARN_NULL_TO_NOTNULL: "Column set to default value; NULL supplied to NOT NULL column '%s' at row %d", + ER_WARN_DATA_OUT_OF_RANGE: "Out of range value for column '%s' at row %d", + WARN_DATA_TRUNCATED: "Data truncated for column '%s' at row %d", ER_WARN_USING_OTHER_HANDLER: "Using storage engine %s for table '%s'", ER_CANT_AGGREGATE_2COLLATIONS: "Illegal mix of collations (%s,%s) and (%s,%s) for operation '%s'", ER_DROP_USER: "Cannot drop one or more of the requested users", @@ -283,7 +283,7 @@ var MySQLErrName = map[uint16]string{ ER_UNTIL_COND_IGNORED: "SQL thread is not to be started so UNTIL options are ignored", ER_WRONG_NAME_FOR_INDEX: "Incorrect index name '%-.100s'", ER_WRONG_NAME_FOR_CATALOG: "Incorrect catalog name '%-.100s'", - ER_WARN_QC_RESIZE: "Query cache failed to set size %lu; new query cache size is %lu", + ER_WARN_QC_RESIZE: "Query cache failed to set size %d; new query cache size is %d", ER_BAD_FT_COLUMN: "Column '%-.192s' cannot be part of FULLTEXT index", ER_UNKNOWN_KEY_CACHE: "Unknown key cache '%-.100s'", ER_WARN_HOSTNAME_WONT_WORK: "MySQL is started in --skip-name-resolve mode; you must restart it without this switch for this grant to work", @@ -300,9 +300,9 @@ var MySQLErrName = map[uint16]string{ ER_GET_ERRMSG: "Got error %d '%-.100s' from %s", ER_GET_TEMPORARY_ERRMSG: "Got temporary error %d '%-.100s' from %s", ER_UNKNOWN_TIME_ZONE: "Unknown or incorrect time zone: '%-.64s'", - ER_WARN_INVALID_TIMESTAMP: "Invalid TIMESTAMP value in column '%s' at row %ld", + ER_WARN_INVALID_TIMESTAMP: "Invalid TIMESTAMP value in column '%s' at row %d", ER_INVALID_CHARACTER_STRING: "Invalid %s character string: '%.64s'", - ER_WARN_ALLOWED_PACKET_OVERFLOWED: "Result of %s() was larger than max_allowed_packet (%ld) - truncated", + ER_WARN_ALLOWED_PACKET_OVERFLOWED: "Result of %s() was larger than max_allowed_packet (%d) - truncated", ER_CONFLICTING_DECLARATIONS: "Conflicting declarations: '%s%s' and '%s%s'", ER_SP_NO_RECURSIVE_CREATE: "Can't create a %s from within another stored routine", ER_SP_ALREADY_EXISTS: "%s %s already exists", @@ -319,7 +319,7 @@ var MySQLErrName = map[uint16]string{ ER_UPDATE_LOG_DEPRECATED_IGNORED: "The update log is deprecated and replaced by the binary log; SET SQL_LOG_UPDATE has been ignored.", ER_UPDATE_LOG_DEPRECATED_TRANSLATED: "The update log is deprecated and replaced by the binary log; SET SQL_LOG_UPDATE has been translated to SET SQL_LOG_BIN.", ER_QUERY_INTERRUPTED: "Query execution was interrupted", - ER_SP_WRONG_NO_OF_ARGS: "Incorrect number of arguments for %s %s; expected %u, got %u", + ER_SP_WRONG_NO_OF_ARGS: "Incorrect number of arguments for %s %s; expected %d, got %d", ER_SP_COND_MISMATCH: "Undefined CONDITION: %s", ER_SP_NORETURN: "No RETURN found in FUNCTION %s", ER_SP_NORETURNEND: "FUNCTION %s ended without RETURN", @@ -367,7 +367,7 @@ var MySQLErrName = map[uint16]string{ ER_TRG_NO_SUCH_ROW_IN_TRG: "There is no %s row in %s trigger", ER_NO_DEFAULT_FOR_FIELD: "Field '%-.192s' doesn't have a default value", ER_DIVISION_BY_ZERO: "Division by 0", - ER_TRUNCATED_WRONG_VALUE_FOR_FIELD: "Incorrect %-.32s value: '%-.128s' for column '%.192s' at row %ld", + ER_TRUNCATED_WRONG_VALUE_FOR_FIELD: "Incorrect %-.32s value: '%-.128s' for column '%.192s' at row %d", ER_ILLEGAL_VALUE_FOR_TYPE: "Illegal %s '%-.192s' value found during parsing", ER_VIEW_NONUPD_CHECK: "CHECK OPTION on non-updatable view '%-.192s.%-.192s'", ER_VIEW_CHECK_FAILED: "CHECK OPTION failed '%-.192s.%-.192s'", @@ -407,7 +407,7 @@ var MySQLErrName = map[uint16]string{ ER_NONEXISTING_PROC_GRANT: "There is no such grant defined for user '%-.48s' on host '%-.64s' on routine '%-.192s'", ER_PROC_AUTO_GRANT_FAIL: "Failed to grant EXECUTE and ALTER ROUTINE privileges", ER_PROC_AUTO_REVOKE_FAIL: "Failed to revoke all privileges to dropped routine", - ER_DATA_TOO_LONG: "Data too long for column '%s' at row %ld", + ER_DATA_TOO_LONG: "Data too long for column '%s' at row %d", ER_SP_BAD_SQLSTATE: "Bad SQLSTATE: '%s'", ER_STARTUP: "%s: ready for connections.\nVersion: '%s' socket: '%s' port: %d %s", ER_LOAD_FROM_FIXED_SIZE_ROWS_TO_VAR: "Can't load value from file with fixed size rows to variable", @@ -422,12 +422,12 @@ var MySQLErrName = map[uint16]string{ ER_BINLOG_UNSAFE_ROUTINE: "This function has none of DETERMINISTIC, NO SQL, or READS SQL DATA in its declaration and binary logging is enabled (you *might* want to use the less safe log_bin_trust_function_creators variable)", ER_BINLOG_CREATE_ROUTINE_NEED_SUPER: "You do not have the SUPER privilege and binary logging is enabled (you *might* want to use the less safe log_bin_trust_function_creators variable)", ER_EXEC_STMT_WITH_OPEN_CURSOR: "You can't execute a prepared statement which has an open cursor associated with it. Reset the statement to re-execute it.", - ER_STMT_HAS_NO_OPEN_CURSOR: "The statement (%lu) has no open cursor.", + ER_STMT_HAS_NO_OPEN_CURSOR: "The statement (%d) has no open cursor.", ER_COMMIT_NOT_ALLOWED_IN_SF_OR_TRG: "Explicit or implicit commit is not allowed in stored function or trigger.", ER_NO_DEFAULT_FOR_VIEW_FIELD: "Field of view '%-.192s.%-.192s' underlying table doesn't have a default value", ER_SP_NO_RECURSION: "Recursive stored functions and triggers are not allowed.", - ER_TOO_BIG_SCALE: "Too big scale %d specified for column '%-.192s'. Maximum is %lu.", - ER_TOO_BIG_PRECISION: "Too big precision %d specified for column '%-.192s'. Maximum is %lu.", + ER_TOO_BIG_SCALE: "Too big scale %d specified for column '%-.192s'. Maximum is %d.", + ER_TOO_BIG_PRECISION: "Too big precision %d specified for column '%-.192s'. Maximum is %d.", ER_M_BIGGER_THAN_D: "For float(M,D), double(M,D) or decimal(M,D), M must be >= D (column '%-.192s').", ER_WRONG_LOCK_OF_SYSTEM_TABLE: "You can't combine write-locking of system tables with other tables or lock types", ER_CONNECT_TO_FOREIGN_DATA_SOURCE: "Unable to connect to foreign data source: %.64s", @@ -437,10 +437,10 @@ var MySQLErrName = map[uint16]string{ ER_FOREIGN_DATA_STRING_INVALID: "The data source connection string '%-.64s' is not in the correct format", ER_CANT_CREATE_FEDERATED_TABLE: "Can't create federated table. Foreign data src error: %-.64s", ER_TRG_IN_WRONG_SCHEMA: "Trigger in wrong schema", - ER_STACK_OVERRUN_NEED_MORE: "Thread stack overrun: %ld bytes used of a %ld byte stack, and %ld bytes needed. Use 'mysqld --thread_stack=#' to specify a bigger stack.", + ER_STACK_OVERRUN_NEED_MORE: "Thread stack overrun: %d bytes used of a %d byte stack, and %d bytes needed. Use 'mysqld --thread_stack=#' to specify a bigger stack.", ER_TOO_LONG_BODY: "Routine body for '%-.100s' is too long", ER_WARN_CANT_DROP_DEFAULT_KEYCACHE: "Cannot drop default keycache", - ER_TOO_BIG_DISPLAYWIDTH: "Display width out of range for column '%-.192s' (max = %lu)", + ER_TOO_BIG_DISPLAYWIDTH: "Display width out of range for column '%-.192s' (max = %d)", ER_XAER_DUPID: "XAER_DUPID: The XID already exists", ER_DATETIME_FUNCTION_OVERFLOW: "Datetime function: %-.32s field overflow", ER_CANT_UPDATE_USED_TABLE_IN_SF_OR_TRG: "Can't update table '%-.192s' in stored function/trigger because it is already used by statement which invoked this stored function/trigger.", @@ -462,7 +462,7 @@ var MySQLErrName = map[uint16]string{ ER_SP_WRONG_NAME: "Incorrect routine name '%-.192s'", ER_TABLE_NEEDS_UPGRADE: "Table upgrade required. Please do \"REPAIR TABLE `%-.32s`\" or dump/reload to fix it!", ER_SP_NO_AGGREGATE: "AGGREGATE is not supported for stored functions", - ER_MAX_PREPARED_STMT_COUNT_REACHED: "Can't create more than max_prepared_stmt_count statements (current value: %lu)", + ER_MAX_PREPARED_STMT_COUNT_REACHED: "Can't create more than max_prepared_stmt_count statements (current value: %d)", ER_VIEW_RECURSIVE: "`%-.192s`.`%-.192s` contains view recursion", ER_NON_GROUPING_FIELD_USED: "Non-grouping field '%-.192s' is used in %-.64s clause", ER_TABLE_CANT_HANDLE_SPKEYS: "The used table type doesn't support SPATIAL indexes", @@ -572,7 +572,7 @@ var MySQLErrName = map[uint16]string{ ER_CANT_CHANGE_TX_CHARACTERISTICS: "Transaction characteristics can't be changed while a transaction is in progress", ER_DUP_ENTRY_AUTOINCREMENT_CASE: "ALTER TABLE causes auto_increment resequencing, resulting in duplicate entry '%-.192s' for key '%-.192s'", ER_EVENT_MODIFY_QUEUE_ERROR: "Internal scheduler error %d", - ER_EVENT_SET_VAR_ERROR: "Error during starting/stopping of the scheduler. Error code %u", + ER_EVENT_SET_VAR_ERROR: "Error during starting/stopping of the scheduler. Error code %d", ER_PARTITION_MERGE_ERROR: "Engine cannot be used in partitioned tables", ER_CANT_ACTIVATE_LOG: "Cannot activate '%-.64s' log", ER_RBR_NOT_AVAILABLE: "The server was not built with row-based replication", @@ -629,8 +629,8 @@ var MySQLErrName = map[uint16]string{ ER_NDB_REPLICATION_SCHEMA_ERROR: "Bad schema for mysql.ndb_replication table. Message: %-.64s", ER_CONFLICT_FN_PARSE_ERROR: "Error in parsing conflict function. Message: %-.64s", ER_EXCEPTIONS_WRITE_ERROR: "Write to exceptions table failed. Message: %-.128s\"", - ER_TOO_LONG_TABLE_COMMENT: "Comment for table '%-.64s' is too long (max = %lu)", - ER_TOO_LONG_FIELD_COMMENT: "Comment for field '%-.64s' is too long (max = %lu)", + ER_TOO_LONG_TABLE_COMMENT: "Comment for table '%-.64s' is too long (max = %d)", + ER_TOO_LONG_FIELD_COMMENT: "Comment for field '%-.64s' is too long (max = %d)", ER_FUNC_INEXISTENT_NAME_COLLISION: "FUNCTION %s does not exist. Check the 'Function Name Parsing and Resolution' section in the Reference Manual", ER_DATABASE_NAME: "Database", ER_TABLE_NAME: "Table", @@ -689,7 +689,7 @@ var MySQLErrName = map[uint16]string{ ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_BINLOG_DIRECT: "Cannot modify @@session.binlog_direct_non_transactional_updates inside a transaction", ER_STORED_FUNCTION_PREVENTS_SWITCH_BINLOG_DIRECT: "Cannot change the binlog direct flag inside a stored function or trigger", ER_SPATIAL_MUST_HAVE_GEOM_COL: "A SPATIAL index may only contain a geometrical type column", - ER_TOO_LONG_INDEX_COMMENT: "Comment for index '%-.64s' is too long (max = %lu)", + ER_TOO_LONG_INDEX_COMMENT: "Comment for index '%-.64s' is too long (max = %d)", ER_LOCK_ABORTED: "Wait on a lock was aborted due to a pending exclusive lock", ER_DATA_OUT_OF_RANGE: "%s value is out of range in '%s'", ER_WRONG_SPVAR_TYPE_IN_LIMIT: "A variable of a non-integer based type in LIMIT clause", @@ -710,7 +710,7 @@ var MySQLErrName = map[uint16]string{ ER_MULTI_UPDATE_KEY_CONFLICT: "Primary key/partition key update is not allowed since the table is updated both as '%-.192s' and '%-.192s'.", ER_TABLE_NEEDS_REBUILD: "Table rebuild required. Please do \"ALTER TABLE `%-.32s` FORCE\" or dump/reload to fix it!", WARN_OPTION_BELOW_LIMIT: "The value of '%s' should be no less than the value of '%s'", - ER_INDEX_COLUMN_TOO_LONG: "Index column size too large. The maximum column size is %lu bytes.", + ER_INDEX_COLUMN_TOO_LONG: "Index column size too large. The maximum column size is %d bytes.", ER_ERROR_IN_TRIGGER_BODY: "Trigger '%-.64s' has an error in its body: '%-.256s'", ER_ERROR_IN_UNKNOWN_TRIGGER_BODY: "Unknown trigger has an error in its body: '%-.256s'", ER_INDEX_CORRUPT: "Index %s is corrupted", @@ -730,7 +730,7 @@ var MySQLErrName = map[uint16]string{ ER_UNSUPPORTED_ENGINE: "Storage engine '%s' does not support system tables. [%s.%s]", ER_BINLOG_UNSAFE_AUTOINC_NOT_FIRST: "INSERT into autoincrement field which is not the first part in the composed primary key is unsafe.", ER_CANNOT_LOAD_FROM_TABLE_V2: "Cannot load from %s.%s. The table is probably corrupted", - ER_MASTER_DELAY_VALUE_OUT_OF_RANGE: "The requested value %u for the master delay exceeds the maximum %u", + ER_MASTER_DELAY_VALUE_OUT_OF_RANGE: "The requested value %d for the master delay exceeds the maximum %d", ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT: "Only Format_description_log_event and row events are allowed in BINLOG statements (but %s was provided)", ER_PARTITION_EXCHANGE_DIFFERENT_OPTION: "Non matching attribute '%-.64s' between partition and table", ER_PARTITION_EXCHANGE_PART_TABLE: "Table to exchange with partition is partitioned: '%-.64s'", @@ -739,14 +739,14 @@ var MySQLErrName = map[uint16]string{ ER_UNKNOWN_PARTITION: "Unknown partition '%-.64s' in table '%-.64s'", ER_TABLES_DIFFERENT_METADATA: "Tables have different definitions", ER_ROW_DOES_NOT_MATCH_PARTITION: "Found a row that does not match the partition", - ER_BINLOG_CACHE_SIZE_GREATER_THAN_MAX: "Option binlog_cache_size (%lu) is greater than max_binlog_cache_size (%lu); setting binlog_cache_size equal to max_binlog_cache_size.", + ER_BINLOG_CACHE_SIZE_GREATER_THAN_MAX: "Option binlog_cache_size (%d) is greater than max_binlog_cache_size (%d); setting binlog_cache_size equal to max_binlog_cache_size.", ER_WARN_INDEX_NOT_APPLICABLE: "Cannot use %-.64s access on index '%-.64s' due to type or collation conversion on field '%-.64s'", ER_PARTITION_EXCHANGE_FOREIGN_KEY: "Table to exchange with partition has foreign key references: '%-.64s'", ER_NO_SUCH_KEY_VALUE: "Key value '%-.192s' was not found in table '%-.192s.%-.192s'", ER_RPL_INFO_DATA_TOO_LONG: "Data for column '%s' too long", ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE: "Replication event checksum verification failed while reading from network.", ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE: "Replication event checksum verification failed while reading from a log file.", - ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX: "Option binlog_stmt_cache_size (%lu) is greater than max_binlog_stmt_cache_size (%lu); setting binlog_stmt_cache_size equal to max_binlog_stmt_cache_size.", + ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX: "Option binlog_stmt_cache_size (%d) is greater than max_binlog_stmt_cache_size (%d); setting binlog_stmt_cache_size equal to max_binlog_stmt_cache_size.", ER_CANT_UPDATE_TABLE_IN_CREATE_TABLE_SELECT: "Can't update table '%-.192s' while '%-.192s' is being created.", ER_PARTITION_CLAUSE_ON_NONPARTITIONED: "PARTITION () clause on non partitioned table", ER_ROW_DOES_NOT_MATCH_GIVEN_PARTITION_SET: "Found a row not matching the given partition set", @@ -794,7 +794,7 @@ var MySQLErrName = map[uint16]string{ ER_CANT_SET_GTID_NEXT_WHEN_OWNING_GTID: "@@SESSION.GTID_NEXT cannot be changed by a client that owns a GTID. The client owns %s. Ownership is released on COMMIT or ROLLBACK.", ER_UNKNOWN_EXPLAIN_FORMAT: "Unknown EXPLAIN format name: '%s'", ER_CANT_EXECUTE_IN_READ_ONLY_TRANSACTION: "Cannot execute statement in a READ ONLY transaction.", - ER_TOO_LONG_TABLE_PARTITION_COMMENT: "Comment for table partition '%-.64s' is too long (max = %lu)", + ER_TOO_LONG_TABLE_PARTITION_COMMENT: "Comment for table partition '%-.64s' is too long (max = %d)", ER_SLAVE_CONFIGURATION: "Slave is not configured or failed to initialize properly. You must at least set --server-id to enable either a master or a slave. Additional error messages can be found in the MySQL error log.", ER_INNODB_FT_LIMIT: "InnoDB presently supports one FULLTEXT index creation at a time", ER_INNODB_NO_FT_TEMP_TABLE: "Cannot create FULLTEXT index on temporary InnoDB table", @@ -811,15 +811,15 @@ var MySQLErrName = map[uint16]string{ ER_DISCARD_FK_CHECKS_RUNNING: "There is a foreign key check running on table '%-.192s'. Cannot discard the table.", ER_TABLE_SCHEMA_MISMATCH: "Schema mismatch (%s)", ER_TABLE_IN_SYSTEM_TABLESPACE: "Table '%-.192s' in system tablespace", - ER_IO_READ_ERROR: "IO Read error: (%lu, %s) %s", - ER_IO_WRITE_ERROR: "IO Write error: (%lu, %s) %s", + ER_IO_READ_ERROR: "IO Read error: (%d, %s) %s", + ER_IO_WRITE_ERROR: "IO Write error: (%d, %s) %s", ER_TABLESPACE_MISSING: "Tablespace is missing for table '%-.192s'", ER_TABLESPACE_EXISTS: "Tablespace for table '%-.192s' exists. Please DISCARD the tablespace before IMPORT.", ER_TABLESPACE_DISCARDED: "Tablespace has been discarded for table '%-.192s'", ER_INTERNAL_ERROR: "Internal error: %s", - ER_INNODB_IMPORT_ERROR: "ALTER TABLE '%-.192s' IMPORT TABLESPACE failed with error %lu : '%s'", + ER_INNODB_IMPORT_ERROR: "ALTER TABLE '%-.192s' IMPORT TABLESPACE failed with error %d : '%s'", ER_INNODB_INDEX_CORRUPT: "Index corrupt: %s", - ER_INVALID_YEAR_COLUMN_LENGTH: "YEAR(%lu) column type is deprecated. Creating YEAR(4) column instead.", + ER_INVALID_YEAR_COLUMN_LENGTH: "YEAR(%d) column type is deprecated. Creating YEAR(4) column instead.", ER_NOT_VALID_PASSWORD: "Your password does not satisfy the current policy requirements", ER_MUST_CHANGE_PASSWORD: "You must SET PASSWORD before executing this statement", ER_FK_NO_INDEX_CHILD: "Failed to add the foreign key constaint. Missing index for constraint '%s' in the foreign table '%s'", diff --git a/vendor/github.com/go-mysql-org/go-mysql/mysql/mysql_gtid.go b/vendor/github.com/go-mysql-org/go-mysql/mysql/mysql_gtid.go index fbd5b7b8..2a10ae25 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/mysql/mysql_gtid.go +++ b/vendor/github.com/go-mysql-org/go-mysql/mysql/mysql_gtid.go @@ -227,6 +227,57 @@ func (s *UUIDSet) AddInterval(in IntervalSlice) { s.Intervals = s.Intervals.Normalize() } +func (s *UUIDSet) MinusInterval(in IntervalSlice) { + var n IntervalSlice + in = in.Normalize() + + i, j := 0, 0 + var minuend Interval + var subtrahend Interval + for j < len(in) && i < len(s.Intervals) { + if minuend.Stop != s.Intervals[i].Stop { // `i` changed? + minuend = s.Intervals[i] + } + subtrahend = in[j] + + if minuend.Stop <= subtrahend.Start { + // no overlapping + n = append(n, minuend) + i++ + } else if minuend.Start >= subtrahend.Stop { + // no overlapping + j++ + } else { + if minuend.Start < subtrahend.Start && minuend.Stop < subtrahend.Stop { + n = append(n, Interval{minuend.Start, subtrahend.Start}) + i++ + } else if minuend.Start > subtrahend.Start && minuend.Stop > subtrahend.Stop { + minuend = Interval{subtrahend.Stop, minuend.Stop} + j++ + } else if minuend.Start >= subtrahend.Start && minuend.Stop <= subtrahend.Stop { + // minuend is completely removed + i++ + } else { + n = append(n, Interval{minuend.Start, subtrahend.Start}) + minuend = Interval{subtrahend.Stop, minuend.Stop} + j++ + } + } + } + + lastSub := in[len(in)-1] + for ; i < len(s.Intervals); i++ { + minuend = s.Intervals[i] + if minuend.Start < lastSub.Stop { + n = append(n, Interval{lastSub.Stop, minuend.Stop}) + } else { + n = append(n, s.Intervals[i]) + } + } + + s.Intervals = n.Normalize() +} + func (s *UUIDSet) String() string { return hack.String(s.Bytes()) } @@ -304,6 +355,8 @@ type MysqlGTIDSet struct { Sets map[string]*UUIDSet } +var _ GTIDSet = &MysqlGTIDSet{} + func ParseMysqlGTIDSet(str string) (GTIDSet, error) { s := new(MysqlGTIDSet) s.Sets = make(map[string]*UUIDSet) @@ -362,6 +415,20 @@ func (s *MysqlGTIDSet) AddSet(set *UUIDSet) { } } +func (s *MysqlGTIDSet) MinusSet(set *UUIDSet) { + if set == nil { + return + } + sid := set.SID.String() + uuidSet, ok := s.Sets[sid] + if ok { + uuidSet.MinusInterval(set.Intervals) + if uuidSet.Intervals == nil { + delete(s.Sets, sid) + } + } +} + func (s *MysqlGTIDSet) Update(GTIDStr string) error { gtidSet, err := ParseMysqlGTIDSet(GTIDStr) if err != nil { @@ -373,6 +440,20 @@ func (s *MysqlGTIDSet) Update(GTIDStr string) error { return nil } +func (s *MysqlGTIDSet) Add(addend MysqlGTIDSet) error { + for _, uuidSet := range addend.Sets { + s.AddSet(uuidSet) + } + return nil +} + +func (s *MysqlGTIDSet) Minus(subtrahend MysqlGTIDSet) error { + for _, uuidSet := range subtrahend.Sets { + s.MinusSet(uuidSet) + } + return nil +} + func (s *MysqlGTIDSet) Contain(o GTIDSet) bool { sub, ok := o.(*MysqlGTIDSet) if !ok { diff --git a/vendor/github.com/go-mysql-org/go-mysql/mysql/result.go b/vendor/github.com/go-mysql-org/go-mysql/mysql/result.go index 797a4af7..a4dbb0ee 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/mysql/result.go +++ b/vendor/github.com/go-mysql-org/go-mysql/mysql/result.go @@ -1,7 +1,8 @@ package mysql type Result struct { - Status uint16 + Status uint16 + Warnings uint16 InsertId uint64 AffectedRows uint64 diff --git a/vendor/github.com/go-mysql-org/go-mysql/mysql/resultset.go b/vendor/github.com/go-mysql-org/go-mysql/mysql/resultset.go index f244b7d0..c3ad7ef1 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/mysql/resultset.go +++ b/vendor/github.com/go-mysql-org/go-mysql/mysql/resultset.go @@ -17,6 +17,9 @@ type Resultset struct { RawPkg []byte RowDatas []RowData + + Streaming bool + StreamingDone bool } var ( @@ -64,6 +67,9 @@ func (r *Resultset) Reset(fieldsCount int) { } func (r *Resultset) RowNumber() int { + if r == nil { + return 0 + } return len(r.Values) } diff --git a/vendor/github.com/go-mysql-org/go-mysql/mysql/resultset_helper.go b/vendor/github.com/go-mysql-org/go-mysql/mysql/resultset_helper.go index 3c22c9c4..0cc859d7 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/mysql/resultset_helper.go +++ b/vendor/github.com/go-mysql-org/go-mysql/mysql/resultset_helper.go @@ -8,7 +8,7 @@ import ( "github.com/siddontang/go/hack" ) -func formatTextValue(value interface{}) ([]byte, error) { +func FormatTextValue(value interface{}) ([]byte, error) { switch v := value.(type) { case int8: return strconv.AppendInt(nil, int64(v), 10), nil @@ -165,7 +165,7 @@ func BuildSimpleTextResultset(names []string, values [][]interface{}) (*Resultse return nil, errors.Errorf("row types aren't consistent") } } - b, err = formatTextValue(value) + b, err = FormatTextValue(value) if err != nil { return nil, errors.Trace(err) diff --git a/vendor/github.com/go-mysql-org/go-mysql/mysql/rowdata.go b/vendor/github.com/go-mysql-org/go-mysql/mysql/rowdata.go index d3cfd0ae..c757578d 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/mysql/rowdata.go +++ b/vendor/github.com/go-mysql-org/go-mysql/mysql/rowdata.go @@ -177,7 +177,7 @@ func (p RowData) ParseBinary(f []*Field, dst []FieldValue) ([]FieldValue, error) case MYSQL_TYPE_DECIMAL, MYSQL_TYPE_NEWDECIMAL, MYSQL_TYPE_VARCHAR, MYSQL_TYPE_BIT, MYSQL_TYPE_ENUM, MYSQL_TYPE_SET, MYSQL_TYPE_TINY_BLOB, MYSQL_TYPE_MEDIUM_BLOB, MYSQL_TYPE_LONG_BLOB, MYSQL_TYPE_BLOB, - MYSQL_TYPE_VAR_STRING, MYSQL_TYPE_STRING, MYSQL_TYPE_GEOMETRY: + MYSQL_TYPE_VAR_STRING, MYSQL_TYPE_STRING, MYSQL_TYPE_GEOMETRY, MYSQL_TYPE_JSON: v, isNull, n, err = LengthEncodedString(p[pos:]) pos += n if err != nil { diff --git a/vendor/github.com/go-mysql-org/go-mysql/mysql/util.go b/vendor/github.com/go-mysql-org/go-mysql/mysql/util.go index 054fe4fe..1a86a6bc 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/mysql/util.go +++ b/vendor/github.com/go-mysql-org/go-mysql/mysql/util.go @@ -8,8 +8,10 @@ import ( "encoding/binary" "fmt" "io" + mrand "math/rand" "runtime" "strings" + "time" "github.com/pingcap/errors" "github.com/siddontang/go/hack" @@ -107,18 +109,11 @@ func AppendLengthEncodedInteger(b []byte, n uint64) []byte { func RandomBuf(size int) ([]byte, error) { buf := make([]byte, size) - - if _, err := io.ReadFull(rand.Reader, buf); err != nil { - return nil, errors.Trace(err) + mrand.Seed(time.Now().UTC().UnixNano()) + min, max := 30, 127 + for i := 0; i < size; i++ { + buf[i] = byte(min + mrand.Intn(max-min)) } - - // avoid to generate '\0' - for i, b := range buf { - if b == 0 { - buf[i] = '0' - } - } - return buf, nil } @@ -142,7 +137,7 @@ func BFixedLengthInt(buf []byte) uint64 { func LengthEncodedInt(b []byte) (num uint64, isNull bool, n int) { if len(b) == 0 { - return 0, true, 1 + return 0, true, 0 } switch b[0] { diff --git a/vendor/github.com/go-mysql-org/go-mysql/packet/conn.go b/vendor/github.com/go-mysql-org/go-mysql/packet/conn.go index c60f68e6..60de437c 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/packet/conn.go +++ b/vendor/github.com/go-mysql-org/go-mysql/packet/conn.go @@ -3,15 +3,14 @@ package packet import ( "bufio" "bytes" - "io" - "net" - "sync" - "crypto/rand" "crypto/rsa" "crypto/sha1" "crypto/x509" "encoding/pem" + "io" + "net" + "sync" . "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/utils" @@ -97,10 +96,28 @@ func (c *Conn) ReadPacketReuseMem(dst []byte) ([]byte, error) { if err := c.ReadPacketTo(buf); err != nil { return nil, errors.Trace(err) + } + + readBytes := buf.Bytes() + readSize := len(readBytes) + var result []byte + if len(dst) > 0 { + result = append(dst, readBytes...) + // if read block is big, do not cache buf any more + if readSize > utils.TooBigBlockSize { + buf = nil + } } else { - result := append(dst, buf.Bytes()...) - return result, nil + if readSize > utils.TooBigBlockSize { + // if read block is big, use read block as result and do not cache buf any more + result = readBytes + buf = nil + } else { + result = append(dst, readBytes...) + } } + + return result, nil } func (c *Conn) copyN(dst io.Writer, src io.Reader, n int64) (written int64, err error) { diff --git a/vendor/github.com/go-mysql-org/go-mysql/replication/binlogsyncer.go b/vendor/github.com/go-mysql-org/go-mysql/replication/binlogsyncer.go index a5dfcb54..7c35362a 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/replication/binlogsyncer.go +++ b/vendor/github.com/go-mysql-org/go-mysql/replication/binlogsyncer.go @@ -10,11 +10,12 @@ import ( "sync" "time" - "github.com/go-mysql-org/go-mysql/client" - . "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/errors" uuid "github.com/satori/go.uuid" "github.com/siddontang/go-log/log" + + "github.com/go-mysql-org/go-mysql/client" + . "github.com/go-mysql-org/go-mysql/mysql" ) var ( @@ -311,6 +312,12 @@ func (b *BinlogSyncer) registerSlave() error { return errors.Trace(err) } + serverUUID := uuid.NewV1() + if _, err = b.c.Execute(fmt.Sprintf("SET @slave_uuid = '%s', @replica_uuid = '%s'", serverUUID, serverUUID)); err != nil { + log.Errorf("failed to set @slave_uuid = '%s', err: %v", serverUUID, err) + return errors.Trace(err) + } + return nil } @@ -728,11 +735,10 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { s.closeWithError(err) return case EOF_HEADER: - // Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html - // In the MySQL client/server protocol, EOF and OK packets serve the same purpose. - // Some users told me that they received EOF packet here, but I don't know why. - // So we only log a message and retry ReadPacket. - log.Info("receive EOF packet, retry ReadPacket") + // refer to https://dev.mysql.com/doc/internals/en/com-binlog-dump.html#binlog-dump-non-block + // when COM_BINLOG_DUMP command use BINLOG_DUMP_NON_BLOCK flag, + // if there is no more event to send an EOF_Packet instead of blocking the connection + log.Info("receive EOF packet, no more binlog event now.") continue default: log.Errorf("invalid stream header %c", data[0]) diff --git a/vendor/github.com/go-mysql-org/go-mysql/replication/const.go b/vendor/github.com/go-mysql-org/go-mysql/replication/const.go index 6230257f..ce7b98b4 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/replication/const.go +++ b/vendor/github.com/go-mysql-org/go-mysql/replication/const.go @@ -217,3 +217,11 @@ const ( TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET ) + +type IntVarEventType byte + +const ( + INVALID IntVarEventType = iota + LAST_INSERT_ID + INSERT_ID +) diff --git a/vendor/github.com/go-mysql-org/go-mysql/replication/event.go b/vendor/github.com/go-mysql-org/go-mysql/replication/event.go index 90b38d37..9111691d 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/replication/event.go +++ b/vendor/github.com/go-mysql-org/go-mysql/replication/event.go @@ -643,3 +643,19 @@ func (e *MariadbGTIDListEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Lists: %v\n", e.GTIDs) fmt.Fprintln(w) } + +type IntVarEvent struct { + Type IntVarEventType + Value uint64 +} + +func (i *IntVarEvent) Decode(data []byte) error { + i.Type = IntVarEventType(data[0]) + i.Value = binary.LittleEndian.Uint64(data[1:]) + return nil +} + +func (i *IntVarEvent) Dump(w io.Writer) { + fmt.Fprintf(w, "Type: %d\n", i.Type) + fmt.Fprintf(w, "Value: %d\n", i.Value) +} diff --git a/vendor/github.com/go-mysql-org/go-mysql/replication/generic_event.go b/vendor/github.com/go-mysql-org/go-mysql/replication/generic_event.go index b0fa83a3..ac55703c 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/replication/generic_event.go +++ b/vendor/github.com/go-mysql-org/go-mysql/replication/generic_event.go @@ -141,11 +141,6 @@ func (e *GenericEvent) Decode(data []byte) error { // Seed2 uint64 // } -// type IntVarEvent struct { -// Type uint8 -// Value uint64 -// } - // type UserVarEvent struct { // NameLength uint32 // Name []byte diff --git a/vendor/github.com/go-mysql-org/go-mysql/replication/parser.go b/vendor/github.com/go-mysql-org/go-mysql/replication/parser.go index 77eb7e9a..7688fe7b 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/replication/parser.go +++ b/vendor/github.com/go-mysql-org/go-mysql/replication/parser.go @@ -11,6 +11,8 @@ import ( "time" "github.com/pingcap/errors" + + "github.com/go-mysql-org/go-mysql/utils" ) var ( @@ -110,8 +112,11 @@ func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, var err error var n int64 - var buf bytes.Buffer - if n, err = io.CopyN(&buf, r, EventHeaderSize); err == io.EOF { + // Here we use `sync.Pool` to avoid allocate/destroy buffers frequently. + buf := utils.BytesBufferGet() + defer utils.BytesBufferPut(buf) + + if n, err = io.CopyN(buf, r, EventHeaderSize); err == io.EOF { return true, nil } else if err != nil { return false, errors.Errorf("get event header err %v, need %d but got %d", err, EventHeaderSize, n) @@ -126,14 +131,15 @@ func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, if h.EventSize < uint32(EventHeaderSize) { return false, errors.Errorf("invalid event header, event size is %d, too small", h.EventSize) } - if n, err = io.CopyN(&buf, r, int64(h.EventSize-EventHeaderSize)); err != nil { + if n, err = io.CopyN(buf, r, int64(h.EventSize-EventHeaderSize)); err != nil { return false, errors.Errorf("get event err %v, need %d but got %d", err, h.EventSize, n) } if buf.Len() != int(h.EventSize) { return false, errors.Errorf("invalid raw data size in event %s, need %d but got %d", h.EventType, h.EventSize, buf.Len()) } - rawData := buf.Bytes() + var rawData []byte + rawData = append(rawData, buf.Bytes()...) bodyLen := int(h.EventSize) - EventHeaderSize body := rawData[EventHeaderSize:] if len(body) != bodyLen { @@ -281,6 +287,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( e = ee case PREVIOUS_GTIDS_EVENT: e = &PreviousGTIDsEvent{} + case INTVAR_EVENT: + e = &IntVarEvent{} default: e = &GenericEvent{} } diff --git a/vendor/github.com/go-mysql-org/go-mysql/replication/row_event.go b/vendor/github.com/go-mysql-org/go-mysql/replication/row_event.go index d49e51a5..36adf79d 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/replication/row_event.go +++ b/vendor/github.com/go-mysql-org/go-mysql/replication/row_event.go @@ -1,12 +1,12 @@ package replication import ( - "bytes" "encoding/binary" "encoding/hex" "fmt" "io" "strconv" + "strings" "time" "github.com/pingcap/errors" @@ -38,11 +38,11 @@ type TableMapEvent struct { NullBitmap []byte /* - The followings are available only after MySQL-8.0.1 or MariaDB-10.5.0 + The following are available only after MySQL-8.0.1 or MariaDB-10.5.0 + By default MySQL and MariaDB do not log the full row metadata. see: - https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_row_metadata - - https://mysqlhighavailability.com/more-metadata-is-written-into-binary-log/ - - https://jira.mariadb.org/browse/MDEV-20477 + - https://mariadb.com/kb/en/replication-and-binary-log-system-variables/#binlog_row_metadata */ // SignednessBitmap stores signedness info for numeric columns. @@ -830,6 +830,16 @@ type RowsEvent struct { //lenenc_int ColumnCount uint64 + + /* + By default MySQL and MariaDB log the full row image. + see + - https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_row_image + - https://mariadb.com/kb/en/replication-and-binary-log-system-variables/#binlog_row_image + + ColumnBitmap1, ColumnBitmap2 and SkippedColumns are not set on the full row image. + */ + //len = (ColumnCount + 7) / 8 ColumnBitmap1 []byte @@ -1141,7 +1151,11 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ // Refer: https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L404 length = int(FixedLengthInt(data[0:meta])) n = length + int(meta) - v, err = e.decodeJsonBinary(data[meta:n]) + var d []byte + d, err = e.decodeJsonBinary(data[meta:n]) + if err == nil { + v = hack.String(d) + } case MYSQL_TYPE_GEOMETRY: // MySQL saves Geometry as Blob in binlog // Seem that the binary format is SRID (4 bytes) + WKB, outer can use @@ -1172,20 +1186,29 @@ func decodeString(data []byte, length int) (v string, n int) { return } +// ref: https://github.com/mysql/mysql-server/blob/a9b0c712de3509d8d08d3ba385d41a4df6348775/strings/decimal.c#L137 const digitsPerInteger int = 9 var compressedBytes = []int{0, 1, 1, 2, 2, 3, 3, 4, 4, 4} func decodeDecimalDecompressValue(compIndx int, data []byte, mask uint8) (size int, value uint32) { size = compressedBytes[compIndx] - databuff := make([]byte, size) - for i := 0; i < size; i++ { - databuff[i] = data[i] ^ mask + switch size { + case 0: + case 1: + value = uint32(data[0] ^ mask) + case 2: + value = uint32(data[1]^mask) | uint32(data[0]^mask)<<8 + case 3: + value = uint32(data[2]^mask) | uint32(data[1]^mask)<<8 | uint32(data[0]^mask)<<16 + case 4: + value = uint32(data[3]^mask) | uint32(data[2]^mask)<<8 | uint32(data[1]^mask)<<16 | uint32(data[0]^mask)<<24 } - value = uint32(BFixedLengthInt(databuff)) return } +var zeros = [digitsPerInteger]byte{48, 48, 48, 48, 48, 48, 48, 48, 48} + func decodeDecimal(data []byte, precision int, decimals int, useDecimal bool) (interface{}, int, error) { //see python mysql replication and https://github.com/jeremycole/mysql_binlog integral := precision - decimals @@ -1207,7 +1230,8 @@ func decodeDecimal(data []byte, precision int, decimals int, useDecimal bool) (i // The sign is encoded in the high bit of the the byte // But this bit can also be used in the value value := uint32(data[0]) - var res bytes.Buffer + var res strings.Builder + res.Grow(precision + 2) var mask uint32 = 0 if value&0x80 == 0 { mask = uint32((1 << 32) - 1) @@ -1217,35 +1241,61 @@ func decodeDecimal(data []byte, precision int, decimals int, useDecimal bool) (i //clear sign data[0] ^= 0x80 + zeroLeading := true + pos, value := decodeDecimalDecompressValue(compIntegral, data, uint8(mask)) - res.WriteString(fmt.Sprintf("%d", value)) + if value != 0 { + zeroLeading = false + res.WriteString(strconv.FormatUint(uint64(value), 10)) + } for i := 0; i < uncompIntegral; i++ { value = binary.BigEndian.Uint32(data[pos:]) ^ mask pos += 4 - res.WriteString(fmt.Sprintf("%09d", value)) + if zeroLeading { + if value != 0 { + zeroLeading = false + res.WriteString(strconv.FormatUint(uint64(value), 10)) + } + } else { + toWrite := strconv.FormatUint(uint64(value), 10) + res.Write(zeros[:digitsPerInteger-len(toWrite)]) + res.WriteString(toWrite) + } } - res.WriteString(".") - - for i := 0; i < uncompFractional; i++ { - value = binary.BigEndian.Uint32(data[pos:]) ^ mask - pos += 4 - res.WriteString(fmt.Sprintf("%09d", value)) + if zeroLeading { + res.WriteString("0") } - if size, value := decodeDecimalDecompressValue(compFractional, data[pos:], uint8(mask)); size > 0 { - res.WriteString(fmt.Sprintf("%0*d", compFractional, value)) - pos += size + if pos < len(data) { + res.WriteString(".") + + for i := 0; i < uncompFractional; i++ { + value = binary.BigEndian.Uint32(data[pos:]) ^ mask + pos += 4 + toWrite := strconv.FormatUint(uint64(value), 10) + res.Write(zeros[:digitsPerInteger-len(toWrite)]) + res.WriteString(toWrite) + } + + if size, value := decodeDecimalDecompressValue(compFractional, data[pos:], uint8(mask)); size > 0 { + toWrite := strconv.FormatUint(uint64(value), 10) + padding := compFractional - len(toWrite) + if padding > 0 { + res.Write(zeros[:padding]) + } + res.WriteString(toWrite) + pos += size + } } if useDecimal { - f, err := decimal.NewFromString(hack.String(res.Bytes())) + f, err := decimal.NewFromString(res.String()) return f, pos, err } - f, err := strconv.ParseFloat(hack.String(res.Bytes()), 64) - return f, pos, err + return res.String(), pos, nil } func decodeBit(data []byte, nbits int, length int) (value int64, err error) { @@ -1410,11 +1460,10 @@ func decodeTime2(data []byte, dec uint16) (string, int, error) { intPart := int64(0) frac := int64(0) switch dec { - case 1: - case 2: + case 1, 2: intPart = int64(BFixedLengthInt(data[0:3])) - TIMEF_INT_OFS frac = int64(data[3]) - if intPart < 0 && frac > 0 { + if intPart < 0 && frac != 0 { /* Negative values are stored with reverse fractional part order, for binary sort compatibility. @@ -1436,11 +1485,10 @@ func decodeTime2(data []byte, dec uint16) (string, int, error) { frac -= 0x100 /* -(0x100 - frac) */ } tmp = intPart<<24 + frac*10000 - case 3: - case 4: + case 3, 4: intPart = int64(BFixedLengthInt(data[0:3])) - TIMEF_INT_OFS frac = int64(binary.BigEndian.Uint16(data[3:5])) - if intPart < 0 && frac > 0 { + if intPart < 0 && frac != 0 { /* Fix reverse fractional part order: "0x10000 - frac". See comments for FSP=1 and FSP=2 above. @@ -1450,9 +1498,9 @@ func decodeTime2(data []byte, dec uint16) (string, int, error) { } tmp = intPart<<24 + frac*100 - case 5: - case 6: + case 5, 6: tmp = int64(BFixedLengthInt(data[0:6])) - TIMEF_OFS + return timeFormat(tmp, n) default: intPart = int64(BFixedLengthInt(data[0:3])) - TIMEF_INT_OFS tmp = intPart << 24 @@ -1462,6 +1510,10 @@ func decodeTime2(data []byte, dec uint16) (string, int, error) { return "00:00:00", n, nil } + return timeFormat(tmp, n) +} + +func timeFormat(tmp int64, n int) (string, int, error) { hms := int64(0) sign := "" if tmp < 0 { diff --git a/vendor/github.com/go-mysql-org/go-mysql/schema/schema.go b/vendor/github.com/go-mysql-org/go-mysql/schema/schema.go index 68448f6e..ea2ec558 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/schema/schema.go +++ b/vendor/github.com/go-mysql-org/go-mysql/schema/schema.go @@ -46,6 +46,7 @@ type TableColumn struct { IsAuto bool IsUnsigned bool IsVirtual bool + IsStored bool EnumValues []string SetValues []string FixedSize uint @@ -56,6 +57,7 @@ type Index struct { Name string Columns []string Cardinality []uint64 + NoneUnique uint64 } type Table struct { @@ -146,6 +148,8 @@ func (ta *Table) AddColumn(name string, columnType string, collation string, ext ta.Columns[index].IsAuto = true } else if extra == "VIRTUAL GENERATED" { ta.Columns[index].IsVirtual = true + } else if extra == "STORED GENERATED" { + ta.Columns[index].IsStored = true } } @@ -190,7 +194,7 @@ func (ta *Table) AddIndex(name string) (index *Index) { } func NewIndex(name string) *Index { - return &Index{name, make([]string, 0, 8), make([]uint64, 0, 8)} + return &Index{name, make([]string, 0, 8), make([]uint64, 0, 8), 0} } func (idx *Index) AddColumn(name string, cardinality uint64) { @@ -317,6 +321,7 @@ func (ta *Table) fetchIndexes(conn mysql.Executer) error { cardinality, _ := r.GetUint(i, 6) colName, _ := r.GetString(i, 4) currentIndex.AddColumn(colName, cardinality) + currentIndex.NoneUnique, _ = r.GetUint(i, 1) } return ta.fetchPrimaryKeyColumns() @@ -334,27 +339,31 @@ func (ta *Table) fetchIndexesViaSqlDB(conn *sql.DB) error { currentName := "" var unusedVal interface{} - unused := &unusedVal for r.Next() { var indexName, colName string + var noneUnique uint64 var cardinality interface{} - - err := r.Scan( - &unused, - &unused, - &indexName, - &unused, - &colName, - &unused, - &cardinality, - &unused, - &unused, - &unused, - &unused, - &unused, - &unused, - ) + cols, err := r.Columns() + if err != nil { + return errors.Trace(err) + } + values := make([]interface{}, len(cols)) + for i := 0; i < len(cols); i++ { + switch i { + case 1: + values[i] = &noneUnique + case 2: + values[i] = &indexName + case 4: + values[i] = &colName + case 6: + values[i] = &cardinality + default: + values[i] = &unusedVal + } + } + err = r.Scan(values...) if err != nil { return errors.Trace(err) } @@ -366,6 +375,7 @@ func (ta *Table) fetchIndexesViaSqlDB(conn *sql.DB) error { c := toUint64(cardinality) currentIndex.AddColumn(colName, c) + currentIndex.NoneUnique = noneUnique } return ta.fetchPrimaryKeyColumns() diff --git a/vendor/github.com/go-mysql-org/go-mysql/utils/bytes_buffer_pool.go b/vendor/github.com/go-mysql-org/go-mysql/utils/bytes_buffer_pool.go index a1ca8707..fe0066ed 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/utils/bytes_buffer_pool.go +++ b/vendor/github.com/go-mysql-org/go-mysql/utils/bytes_buffer_pool.go @@ -5,6 +5,10 @@ import ( "sync" ) +const ( + TooBigBlockSize = 1024 * 1024 * 4 +) + var ( bytesBufferPool = sync.Pool{ New: func() interface{} { @@ -27,6 +31,10 @@ func BytesBufferGet() (data *bytes.Buffer) { } func BytesBufferPut(data *bytes.Buffer) { + if data == nil || len(data.Bytes()) > TooBigBlockSize { + return + } + select { case bytesBufferChan <- data: default: diff --git a/vendor/modules.txt b/vendor/modules.txt index cfc8dd78..20ccbed5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -10,7 +10,7 @@ github.com/Microsoft/go-winio github.com/Microsoft/go-winio/pkg/guid # github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew/spew -# github.com/go-mysql-org/go-mysql v1.3.0 +# github.com/go-mysql-org/go-mysql v1.4.1-0.20220112102103-b3f1a27311d8 ## explicit github.com/go-mysql-org/go-mysql/client github.com/go-mysql-org/go-mysql/mysql From 85120de56a3c180452528924154532345d6cb53d Mon Sep 17 00:00:00 2001 From: Aaron Brady Date: Thu, 3 Feb 2022 10:29:27 -0500 Subject: [PATCH 2/2] Add additional comments on JSON changes and remove unneeded schema type comparison Co-authored-by: Shiv Nagarajan --- dml_events.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dml_events.go b/dml_events.go index 30a0f9c4..a87b1c13 100644 --- a/dml_events.go +++ b/dml_events.go @@ -423,6 +423,7 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol switch v := value.(type) { case string: + // since https://github.com/go-mysql-org/go-mysql/pull/658/files merged, go-mysql returns JSON events as a string, but we would prefer them as []byte for consistency with other types if column.Type == schema.TYPE_JSON { return appendEscapedBuffer(buffer, []byte(v), true) } @@ -435,7 +436,8 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol return appendEscapedString(buffer, v, rightPadLengthForBinaryColumn) case []byte: - return appendEscapedBuffer(buffer, v, column.Type == schema.TYPE_JSON) + // schema type cannot be JSON at this point because all JSON results are strings + return appendEscapedBuffer(buffer, v, false) case bool: if v { return append(buffer, '1')