分布式事务之 XA
默认
发布于: 2021-07-17

最近一位运维的小伙伴突然问道:你们是如何回滚事务的?这问的我一头雾水,后面才知道他原来是想问我分布式事务是怎么实施的。

正好想写一写关于 XA 的分布式事务,本文使用 Golang 实现几个服务,也附带一个玩具性质的事务管理器 distributed-transaction-examples

X/Open XA

在计算技术上,XA规范是开放群组关于分布式事务处理 (DTP)的规范。规范描述了全局的事务管理器与局部的资源管理器之间的接口。XA规范的目的是允许多个资源(如数据库,应用服务器,消息队列,等等)在同一事务中访问,这样可以使ACID属性跨越应用程序而保持有效。XA使用两阶段提交来保证所有资源同时提交或回滚任何特定的事务。

XA规范描述了资源管理器要支持事务性访问所必需做的事情。遵守该规范的资源管理器被称为XA compliant。

维基百科

XA 是标准的两阶段提交方案,也是分布式事务中常提到的2PC. XA 主要依赖数据库,开发思想易于理解并且相对成熟,目前主流的数据库针对 XA 协议都有各自实现。

MySQL XA

从 MySQL5 开始,InnoDB 开始支持 XA 协议。以下就以 MySQL8 为例,介绍在 MySQL 中使用 XA 的一些语法或命令。

XA 的 SQL 语法

XA {START|BEGIN} xid [JOIN|RESUME]

XA END xid [SUSPEND [FOR MIGRATE]]
XA PREPARE xid
XA COMMIT xid [ONE PHASE]
XA ROLLBACK xid
XA RECOVER [CONVERT XID]

针对 XA START xid 语句 JOINRESUME 是不支持的,同样注意的还有 XA END xidSUSPEND 以及 FOR MIGRATE. 具体的内容可以查看 MySQL8 文档中关于上述字段的介绍。

13.3.8.1 XA Transaction SQL Statements

一个完整的 XA 事务

开启一个 xa 事务,其中 xid 为事务 ID 可以由数据库生成,也可以由业务方自行生成。需要注意的是,针对同一数据源或数据库实例 xid必须保证全局唯一。

开启事务

xa startxa end间可以做具体的 SQL 业务。

xa start '90rizh5w';
update account set balance = balance - 10 where id = 1;
xa end '90rizh5w'

预提交

第一阶段预提交

xa prepare '90rizh5w'

提交

第二阶段提交

xa commit '90rizh5w'

回滚

XA 事务中第二阶段可以是 COMMIT 也可以是 ROLLBACK

xa rollback '90rizh5w'

完整的 COMMIT SQL

xa start '90rizh5w';
update account set balance = balance - 10 where id = 1;
xa end '90rizh5w';
xa prepare '90rizh5w';
xa commit '90rizh5w';

完整的 ROLLBACK SQL

xa start '90rizh5w';
update account set balance = balance - 10 where id = 1;
xa end '90rizh5w';
xa prepare '90rizh5w';
xa rollback '90rizh5w';

实践 XA 需要注意的问题

空回滚

某个服务并没有进行 xa start但是收到 TM的回滚回调,要允许空回滚。

二阶段 COMMIT 失败

二阶段 commit 会不会失败?失败后如何处理?

二阶段 commit 失败存在几种可能:

  • 网络故障导致 TM 无法正确通知到某一个服务,或无法通知到所有服务

  • 某一个服务收到 TM 回调消息后所在机器断电了

超时问题

某一服务处于阻塞状态,导致事务锁定资源过长,若事务未在限定时间内执行完成,TM 应回滚事务。

微服务中的 XA

接下来将实现几个服务和一个简单的分布式事务管理为例,来模拟演示如何在微服务架构中应用 XA 事务。本示例所有 RPC 请求全部使用 HTTP 实现,未作并发控制不可用于生产环境。

Github 地址

业务背景

用户在我方平台上的商家下单,请提供一个开放接口完成用户和商家的金额扣除和增加的业务逻辑。

服务列表

api-service 提供一个 HTTP 协议的/order 接口来支持用户下单,并开启全局事务,以安全调用其他相关服务完成金额的增减。

customer-service 提供一个 HTTP 协议的 RPC 接口 /reduce 来减去用户余额。

merchant-service 提供一个 HTTP 协议的 RPC 接口 /add 来增加商户余额。

tm 一个分布事务管理器,用于生成全局事务,并协调全局事务下的各个子事务进行统一的提交和回滚操作。提供以下接口:

  • /new 创建一个新的全局事务;

  • /register 向全局事务注册子事务;

  • /done 完成全局事务,并通知相关子事务可以进行 COMMIT 提交;

  • /rollback 完成全局事务,并通知相关子事务进行 ROLLBACK 回滚;

  • /transactions 列出正在进行的所有事务;

代码实现

本示例使用 Golang 实现,未作过多封装,代码清晰简单,依赖环境如下:

  • Golang

  • MySQL

  • Docker

  • docker-compose

目录树

/xa/tm 下实现本文所需要的事务管理器;

/xa/init.sql 用于初始化所需数据库、表、基本数据等;

/xa/services 下存放 api customer merchat 服务;

.
├── README.md
└── xa
    ├── README.md
    ├── docker-compose.yml
    ├── init.sql
    ├── services
    │   ├── cmd
    │   │   ├── api
    │   │   │   ├── Dockerfile
    │   │   │   └── main.go
    │   │   ├── customer
    │   │   │   ├── Dockerfile
    │   │   │   └── main.go
    │   │   └── merchant
    │   │       ├── Dockerfile
    │   │       └── main.go
    │   ├── go.mod
    │   ├── go.sum
    │   └── pkg
    │       ├── callback
    │       │   └── callback.go
    │       ├── database
    │       │   └── db.go
    │       ├── tm
    │       │   └── tm.go
    │       └── xa
    │           └── xa.go
    └── tm
        ├── Dockerfile
        ├── go.mod
        ├── go.sum
        └── main.go

表结构

customer_account

CREATE TABLE customer_account (
  id int unsigned NOT NULL,
  balance int unsigned NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

merchant_account

CREATE TABLE merchant_account (
  id int unsigned NOT NULL,
  balance int unsigned NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

实现 TM 事务管理器

本文的事务管理器需要几个功能:

  1. 支持创建全局事务并分配对应的唯一 ID;

  2. customermerchant 服务的本地事务(子事务)关联;

  3. 事务发起者(api)通过可以全局事务 ID 控制整个流程回滚或是提交;

  4. tm支持通知到各个服务进行提交或回滚;

在单点的情况下使用 UUID 都可以保证 TIDXID 唯一,本文使用 github.com/rs/xid 来生成,在分布式的情况下请选择分布式 ID 生成器生成。项目基于 HTTP 协议,所以回调部分的基于保存各个服务的回调地址实现。

全局事务状态存放在内存中,分布式场景中请选择例如:Redis / MySQL 等其他方案。

package main
import (
    "encoding/json"
    "log"
    "net/http"
"github.com/pkg/errors"
"github.com/rs/xid"

)
type Resource struct {
    Xid      string
    Callback string
}
type Transaction struct {
    Id        string
    Resources []Resource
}
var transactions map[string]*Transaction = make(map[string]*Transaction)
func main() {
    log.SetPrefix("tm: ")
// create a gloabl transaction id;
http.HandleFunc("/new", func(rw http.ResponseWriter, r *http.Request) {
    guid := xid.New().String()
    log.Printf("new transaction id %s", guid)

    transactions[guid] = &Transaction{
        Id:        guid,
        Resources: make([]Resource, 0),
    }
    rw.Write([]byte(guid))
})

// rm register sub transaction
http.HandleFunc("/register", func(rw http.ResponseWriter, r *http.Request) {
    xid := r.URL.Query().Get("xid")
    tid := r.URL.Query().Get("tid")
    callback := r.URL.Query().Get("callback")

    log.Printf("register resource tid=%s xid=%s callback=%s", tid, xid, callback)

    transactions[tid].Resources = append(transactions[tid].Resources, Resource{
        Xid:      xid,
        Callback: callback,
    })
})

http.HandleFunc("/done", func(rw http.ResponseWriter, r *http.Request) {
    tid := r.URL.Query().Get("tid")

    log.Printf("tid=%s done", tid)

    t := transactions[tid]
    if t != nil {
        for _, r := range t.Resources {
            url := r.Callback + "?status=ok&tid=" + tid + "&xid=" + r.Xid
            log.Printf("callback url=%s", url)
            _, err := http.Get(url)
            if err != nil {
                log.Println(errors.WithStack(err))
            }
        }
        delete(transactions, tid)
    } else {
        log.Printf("do done transaction tid=%s no exist", tid)
    }
})

http.HandleFunc("/rollback", func(rw http.ResponseWriter, r *http.Request) {
    tid := r.URL.Query().Get("tid")
    t := transactions[tid]
    if t != nil {
        for _, r := range t.Resources {
            // todo
            http.Get(r.Callback + "?" + "status=rollback&tid=" + tid + "&xid=" + r.Xid)
        }
        delete(transactions, tid)
    }
})

http.HandleFunc("/transactions", func(rw http.ResponseWriter, r *http.Request) {
    b, _ := json.Marshal(transactions)
    rw.Write(b)
})

log.Fatal(http.ListenAndServe(":9999", nil))

}

由于篇幅问题,剩余服务代码不再粘贴,详情也请移步 Github 查看。

国内分布式事务大多使用 Java 实现,但是封装的都比较严重,一般大家都是理解个表皮在项目中拿来就用了。本次通过实现一个简单的 XA 事务管理器去理解 XA 事务是非常好的一种方式,可以看到 XA 事务比较容易理解,但是由于其同步阻塞的思想实际在项目中应用并不多。

相关链接