Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XA全局事务成功后,DTM Admin 显示正常,但是本地数据库XA事务未提交,要修改的数据未变动,需要手动执行XA事务提交才生效 #470

Open
bodhi-code opened this issue Oct 16, 2023 · 2 comments

Comments

@bodhi-code
Copy link

原始数据库表截图如下:
image
使用Dtm XA分布式事务,实现如下场景:用户1002 balance转出 100 1001 balance转入100,
程序执行成功,并执行成功后,dtm admin 的截图如下:
image
在Dtm XA分布式事务完成后,数据库表截图如下:
image
可见结果未达到预期,在本地数据库中使用xa recover命令后,发现如下截图:
image
在本地数据库使用如下命令手动提交XA事务:

xa commit '5b63c59f-a435-4ffc-bf4e-e6dc5190cf1d-02';
xa commit '5b63c59f-a435-4ffc-bf4e-e6dc5190cf1d-01';

数据库表数据发生了预期的变化,数据库表截图如下:
image

@yedf2
Copy link
Contributor

yedf2 commented Oct 21, 2023

能有可以复现的例子吗?

@bodhi-code
Copy link
Author

bodhi-code commented Oct 22, 2023

具体的示例的如下,有两个文件,一个是test_dtm.go,代码如下:

package main

import (
	"crypto/md5"
	"fmt"
	"github.com/bwmarrin/snowflake"
	"github.com/dtm-labs/client/dtmcli"
	"github.com/gin-gonic/gin"
	"github.com/go-resty/resty/v2"
	"github.com/google/uuid"
	"log"
	"strings"
)

func CryptToMD5(v1 []byte, v2 []byte, uppercase bool) string {
	var rs = make([]string, 0)
	m := md5.New()
	m.Write(v1)
	bm := m.Sum(v2)
	for _, v := range bm {
		if uppercase {
			rs = append(rs, fmt.Sprintf("%02X", v))
		} else {
			rs = append(rs, fmt.Sprintf("%02x", v))
		}
	}
	return strings.Join(rs, "")
	/*return hex.EncodeToString(h.Sum(format.ToByte(v2)))*/ //第二种返回字符串的方法,返回的参数是小写
}

func NewGid() string {
	u := uuid.New()
	return fmt.Sprintf("%s", u)
}

func TransXa(serverUrl, businessUrl string) {
	gid := NewGid()
	transInReq := &gin.H{
		"transInUserId": 1001,
		"amount":        100,
	}
	transOutReq := &gin.H{
		"transOutUserId": 1002,
		"amount":         100,
	}
	err := dtmcli.XaGlobalTransaction(serverUrl, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
		resp, err := xa.CallBranch(transOutReq, businessUrl+"/trans-out-xa")
		if err != nil {
			return resp, err
		}
		return xa.CallBranch(transInReq, businessUrl+"/trans-in-xa")
	})
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("transaction:%s xa success", gid)
}

func main() {
	serverUrl := "http://192.168.0.101:31980/api/dtmsvr"
	businessUrl := "http://192.168.0.101:8686/api/v1/business"
	TransXa(serverUrl, businessUrl)
}

一个文件是test_dtm_http_xa.go,代码如下:

package main

import (
	"database/sql"
	"fmt"
	"github.com/dtm-labs/client/dtmcli"
	"github.com/gin-gonic/gin"
	"github.com/gin-gonic/gin/binding"
	"github.com/go-playground/locales/en"
	"github.com/go-playground/locales/zh"
	ut "github.com/go-playground/universal-translator"
	"github.com/go-playground/validator/v10"
	enTranslations "github.com/go-playground/validator/v10/translations/en"
	zhTranslations "github.com/go-playground/validator/v10/translations/zh"
	jsoniter "github.com/json-iterator/go"
	rotatelogs "github.com/lestrrat-go/file-rotatelogs"
	"github.com/pkg/errors"
	"gorm.io/driver/mysql"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
	"gorm.io/gorm/logger"
	"log"
	"net/http"
	"net/url"
	"time"
)

type TransInXaRequest struct {
	TransInUserId int     `json:"transInUserId" binding:"required" desc:"转入账号id"` //转入账号id
	Amount        float64 `json:"amount" binding:"required" desc:"转入金额"`          //转入金额
}

type TransOutXaRequest struct {
	TransOutUserId int     `json:"transOutUserId" binding:"required" desc:"转出账号id"` //转出账号id
	Amount         float64 `json:"amount" binding:"required" desc:"转出金额"`           //转出金额
}

func initValidator(locale string) (ut.Translator, error) {
	if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
		zhTranslator := zh.New()
		enTranslator := en.New()
		uni := ut.New(enTranslator, zhTranslator, enTranslator)
		translator, ok := uni.GetTranslator(locale)
		if !ok {
			return nil, errors.New("init validator failed")
		}
		var err error
		switch locale {
		case "en":
			err = enTranslations.RegisterDefaultTranslations(v, translator)
		case "zh":
			err = zhTranslations.RegisterDefaultTranslations(v, translator)
		default:
			err = enTranslations.RegisterDefaultTranslations(v, translator)
		}
		return translator, err
	}
	return nil, errors.New("init validator failed")
}

func ValidateError(ctx *gin.Context, err error) bool {
	if err == nil {
		return false
	}
	errs, ok := err.(validator.ValidationErrors)
	if ok {
		translator, err := initValidator("zh")
		if err == nil {
			return false
		}
		errMsg, _ := jsoniter.Marshal(errs.Translate(translator))
		ctx.JSON(http.StatusOK, gin.H{
			"code":    -1,
			"message": "validate request failed:" + string(errMsg),
		})
	} else {
		ctx.JSON(http.StatusOK, gin.H{
			"code":    -1,
			"message": "read request failed:" + err.Error(),
		})
	}
	return true
}

func ResponseFailed(ctx *gin.Context, message string, err error) {
	if err == nil {
		ctx.JSON(http.StatusOK, gin.H{
			"code":    -1,
			"message": message + "!",
		})
	} else {
		ctx.JSON(http.StatusOK, gin.H{
			"code":    -1,
			"message": message + "," + err.Error(),
		})
	}
}

func ResponseSuccess(ctx *gin.Context, message string, data ...interface{}) {
	if len(data) > 0 {
		ctx.JSON(http.StatusOK, gin.H{
			"code":    0,
			"message": message + "!",
			"data":    data[0],
		})
	} else {
		ctx.JSON(http.StatusOK, gin.H{
			"code":    0,
			"message": message + "!",
		})
	}
}

func ResponseWithStatusCode(ctx *gin.Context, statusCode int, code int, message string, data ...interface{}) {
	if statusCode != 200 {
		code = statusCode
	}
	if len(data) > 0 {
		ctx.JSON(statusCode, gin.H{
			"code":    code,
			"message": message,
			"data":    data[0],
		})
	} else {
		ctx.JSON(statusCode, gin.H{
			"code":    code,
			"message": message,
		})
	}
}

type DBConf struct {
	User            string `yaml:"user" desc:"用户名"`
	Password        string `yaml:"password" desc:"用户密码"`
	Host            string `yaml:"host" desc:"主机名"`
	Port            int    `yaml:"port" desc:"主机端口"`
	Database        string `yaml:"database" desc:"数据库名"`
	Dialect         string `yaml:"dialect" desc:"数据库类型"`
	DBSource        string `yaml:"db_source" desc:"数据库源"`
	DBDebug         bool   `yaml:"db_debug" desc:"是否输出gorm数据库调试语句"`
	MaxAge          int    `yaml:"max_age" desc:"日志最大保留时间"`
	RotateTimeLevel int    `yaml:"rotate_time_level" desc:"日志分片时间等级 0 自定义时间分片 1 日分片 2 1小时分片 3 1分钟分片"`
	RotateTime      int    `yaml:"rotate_time" desc:"自定义时间分片时长 单位为:min"`
}

const RotateByTimestamp = 0 //自定义时间分片
const RotateByDate = 1      //日分片
const RotateByHour = 2      //1小时分片
const RotateByMinute = 3    //1分钟分片

type Dao struct {
	DB *gorm.DB
}

func NewDBFromRawDB(db *sql.DB, dbConf DBConf) (*gorm.DB, error) {
	var newDb *gorm.DB
	var err error
	if dbConf.DBDebug {
		var err error
		var loggerWriteSyncer *rotatelogs.RotateLogs
		var loggerFileName = fmt.Sprintf("%s-db-debug.log", dbConf.Database)
		switch dbConf.RotateTimeLevel {
		case RotateByTimestamp:
			loggerWriteSyncer, err = rotatelogs.New(
				"../logs/"+loggerFileName+".%Y%m%d%H%M",
				rotatelogs.WithLinkName("../logs/"+loggerFileName),
				rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
				rotatelogs.WithRotationTime(time.Duration(dbConf.RotateTime)*time.Minute))
		case RotateByDate:
			loggerWriteSyncer, err = rotatelogs.New(
				"../logs/"+loggerFileName+".%Y%m%d",
				rotatelogs.WithLinkName("../logs/"+loggerFileName),
				rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour))
		case RotateByHour:
			loggerWriteSyncer, err = rotatelogs.New(
				"../logs/"+loggerFileName+".%Y%m%d%H",
				rotatelogs.WithLinkName("../logs/"+loggerFileName),
				rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
				rotatelogs.WithRotationTime(time.Hour))
		case RotateByMinute:
			loggerWriteSyncer, err = rotatelogs.New(
				"../logs/"+loggerFileName+".%Y%m%d%H%M",
				rotatelogs.WithLinkName("../logs/"+loggerFileName),
				rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
				rotatelogs.WithRotationTime(time.Minute))
		}
		if err != nil {
			return nil, err
		}
		dbLogger := logger.New(log.New(loggerWriteSyncer, "\r\n", log.LstdFlags), logger.Config{
			SlowThreshold: time.Second,
			LogLevel:      logger.Silent,
			Colorful:      true,
		})
		switch dbConf.Dialect {
		case "mysql":
			newDb, err = gorm.Open(mysql.New(mysql.Config{
				Conn: db,
			}), &gorm.Config{
				Logger: dbLogger,
			})
			break
		case "postgres":
			newDb, err = gorm.Open(postgres.New(postgres.Config{
				Conn: db,
			}), &gorm.Config{
				Logger: dbLogger,
			})
			break
		}
		newDb = newDb.Debug()
	} else {
		switch dbConf.Dialect {
		case "mysql":
			newDb, err = gorm.Open(mysql.New(mysql.Config{
				Conn: db,
			}))
			break
		case "postgres":
			newDb, err = gorm.Open(postgres.New(postgres.Config{
				Conn: db,
			}))
			break
		}
	}
	return newDb, err
}

func (dao *Dao) XaLocalTransaction(qs url.Values, f func(dao *Dao) error) (err error) {
	dbConf := DBConf{
		User:            "root",
		Password:        "CodeMan2022080^2*1",
		Host:            "127.0.0.1",
		Port:            3306,
		Database:        "test",
		Dialect:         "mysql",
		DBSource:        "root:CodeMan2022080^2*1@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True&loc=Local",
		DBDebug:         true,
		RotateTime:      1,
		RotateTimeLevel: 1,
	}
	err = dtmcli.XaLocalTransaction(qs, dtmcli.DBConf{
		Driver:   dbConf.Dialect,
		Host:     dbConf.Host,
		Port:     int64(dbConf.Port),
		User:     dbConf.User,
		Password: dbConf.Password,
		Db:       dbConf.Database,
	}, func(db *sql.DB, xa *dtmcli.Xa) error {
		dao := &Dao{}
		dao.DB, err = NewDBFromRawDB(db, dbConf)
		return f(dao)
	})
	return
}

func (dao *Dao) TransInXa(userId int, amount float64) error {
	result := dao.DB.Exec("update user_account set balance = balance + ? where user_id = ?", amount, userId)
	if result.Error != nil {
		return result.Error
	}
	return nil
}

func (dao *Dao) TransOutXa(userId int, amount float64) error {
	result := dao.DB.Exec("update user_account set balance = balance - ? where user_id = ?", amount, userId)
	if result.Error != nil {
		return result.Error
	}
	return nil
}

func transInXa(ctx *gin.Context) {
	var req TransInXaRequest
	err := ctx.ShouldBind(&req)
	if ValidateError(ctx, err) {
		return
	}
	qs := ctx.Request.URL.Query()
	dao := Dao{}
	err = dao.XaLocalTransaction(qs, func(dao *Dao) error {
		err = dao.TransInXa(req.TransInUserId, req.Amount)
		if err != nil {
			return errors.New("数据库更新出错!")
		}
		return err
	})
	if err != nil {
		ResponseWithStatusCode(ctx, 409, 409, "trans in xa failed:"+err.Error())
		return
	}
	ResponseSuccess(ctx, "trans in xa success")
}

func transOutXa(ctx *gin.Context) {
	var req TransOutXaRequest
	err := ctx.ShouldBind(&req)
	if ValidateError(ctx, err) {
		return
	}
	qs := ctx.Request.URL.Query()
	dao := Dao{}
	err = dao.XaLocalTransaction(qs, func(dao *Dao) error {
		err = dao.TransOutXa(req.TransOutUserId, req.Amount)
		if err != nil {
			return errors.New("数据库更新出错!")
		}
		return err
	})
	if err != nil {
		ResponseWithStatusCode(ctx, 409, 409, "trans out xa failed:"+err.Error())
		return
	}
	ResponseSuccess(ctx, "trans out xa success")
}

func main() {
	router := gin.Default()
	router.POST("/api/v1/business/trans-in-xa", transInXa)
	router.POST("/api/v1/business/trans-out-xa", transOutXa)
	router.Run(":8686")
}

运行环境:golang版本1.21,mysql版本8.0.32
先运行test_dtm_http_xa.go,再运行test_dtm.go,就会出现dtm显示xa事务提交成功,但是实际没有成功提交xa事务的情况。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants