diff --git a/README-cn.md b/README-cn.md index 3e03c3e..897ef77 100644 --- a/README-cn.md +++ b/README-cn.md @@ -87,7 +87,7 @@ services.AddDtmcli(x => x.BranchTimeout = 10000; // 子事务屏障的数据库类型, mysql, postgres, sqlserver - x.DBType = "mysql"; + x.SqlDbType = "mysql"; // 子事务屏障的数据表名 x.BarrierTableName = "dtm_barrier.barrier"; @@ -108,7 +108,7 @@ services.AddDtmcli(Configuration, "dtm"); "DtmUrl": "http://localhost:36789", "DtmTimeout": 10000, "BranchTimeout": 10000, - "DBType": "mysql", + "SqlDbType": "mysql", "BarrierTableName": "dtm_barrier.barrier", } } @@ -265,12 +265,32 @@ public class MyBusi this._globalTransaction = globalTransaction; } - public async Task DoBusAsync() + public async Task DoBusAsync(CancellationToken cancellationToken) { + var svc = "http://localhost:5005"; + await _globalTransaction.ExcecuteAsync(async (Xa xa) => { - await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken); - await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken); + // NOTE: XA 模式的限制 + // 当前模式仅支持mysql、postgresDB,请修改相应的客户端配置,如SqlDbType等。 + // 如使用Mysql并且版本低于8.0需关闭连接池使用 + + // 调用 XA 子事务 + await xa.CallBranch( + // 参数 + new TransRequest("1", -30), + + // 操作的 URL + svc + "/XaTransOut", + + // 取消令牌 + cancellationToken); + + await xa.CallBranch( + new TransRequest("2", 30), + svc + "/XaTransIn", + cancellationToken); + }, cancellationToken); } } diff --git a/README.md b/README.md index c182741..b54193f 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ services.AddDtmcli(x => x.BranchTimeout = 10000; // barrier database type, mysql, postgres, sqlserver - x.DBType = "mysql"; + x.SqlDbType = "mysql"; // barrier table name x.BarrierTableName = "dtm_barrier.barrier"; @@ -115,7 +115,7 @@ And the configuration file "DtmUrl": "http://localhost:36789", "DtmTimeout": 10000, "BranchTimeout": 10000, - "DBType": "mysql", + "SqlDbType": "mysql", "BarrierTableName": "dtm_barrier.barrier", } } @@ -273,12 +273,32 @@ public class MyBusi this._globalTransaction = globalTransaction; } - public async Task DoBusAsync() + public async Task DoBusAsync(CancellationToken cancellationToken) { + var svc = "http://localhost:5005"; + await _globalTransaction.ExcecuteAsync(async (Xa xa) => { - await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken); - await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken); + // NOTE: Limitations of using Xa mode + // The current mode only supports mysql, postgresDB, please modify the corresponding client configuration, such as SqlDbType, etc. + // Connection pooling needs to be turned off for mysql versions below 8.0 + + // Create XA sub-transaction + await xa.CallBranch( + // Arguments of action + new TransRequest("1", -30), + + // URL of action + svc + "/XaTransOut", + + // Cancel token + cancellationToken); + + await xa.CallBranch( + new TransRequest("2", 30), + svc + "/XaTransIn", + cancellationToken); + }, cancellationToken); } } diff --git a/samples/DtmSample/Controllers/TransController.cs b/samples/DtmSample/Controllers/TransController.cs index 42d1d92..a345ffc 100644 --- a/samples/DtmSample/Controllers/TransController.cs +++ b/samples/DtmSample/Controllers/TransController.cs @@ -1,6 +1,12 @@ -using DtmSample.Dtos; +using Dapper; +using Dtmcli; +using DtmSample.Dtos; +using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging; +using MySqlConnector; +using System.Threading; +using System.Threading.Tasks; namespace DtmSample.Controllers { @@ -10,10 +16,12 @@ namespace DtmSample.Controllers public class TransController : ControllerBase { private readonly ILogger _logger; + private readonly XaLocalTransaction _xaTrans; - public TransController(ILogger logger) + public TransController(ILogger logger, XaLocalTransaction xaTrans) { _logger = logger; + _xaTrans = xaTrans; } #region TCC @@ -119,7 +127,42 @@ public IActionResult TransInRevert([FromBody] TransRequest body) _logger.LogInformation("TransInConfirm, QueryString={0}", Request.QueryString); _logger.LogInformation("用户: {0},转入 {1} 元---回滚", body.UserId, body.Amount); return Ok(TransResponse.BuildSucceedResponse()); - } + } + #endregion + + #region Xa + [HttpPost("XaTransOut")] + public async Task XaTransOut(CancellationToken token) + { + //todo: Connection pooling needs to be turned off for mysql versions below 8.0 + using var conn = new MySqlConnection("Server=en.dtm.pub; Port=3306; User ID=dtm; Password=passwd123dtm; Database=dtm_busi;Pooling=False"); + await this._xaTrans.ExcecuteAsync(this.Request.Query, conn, "mysql", async (dbConn, xa) => + { + var body = await this.Request.ReadFromJsonAsync(); + await dbConn.ExecuteAsync($"UPDATE dtm_busi.user_account SET balance = balance + {body.Amount} where user_id = '{body.UserId}'"); + _logger.LogInformation("XaTransOut, QueryString={0}", Request.QueryString); + _logger.LogInformation("用户: {0},转出 {1} 元", body.UserId, body.Amount); + }, token); + + return Ok(TransResponse.BuildSucceedResponse()); + } + + [HttpPost("XaTransIn")] + public async Task XaTransIn(CancellationToken token) + { + //todo: Connection pooling needs to be turned off for mysql versions below 8.0 + using var conn = new MySqlConnection("Server=en.dtm.pub; Port=3306; User ID=dtm; Password=passwd123dtm; Database=dtm_busi;Pooling=False"); + await this._xaTrans.ExcecuteAsync(this.Request.Query, conn, "mysql", async (dbConn, xa) => + { + var body = await this.Request.ReadFromJsonAsync(); + await dbConn.ExecuteAsync($"UPDATE dtm_busi.user_account SET balance = balance + {body.Amount} where user_id = '{body.UserId}'"); + _logger.LogInformation("XaTransIn, QueryString={0}", Request.QueryString); + _logger.LogInformation("用户: {0},转入 {1} 元", body.UserId, body.Amount); + }, token); + + return Ok(TransResponse.BuildSucceedResponse()); + } #endregion } } + diff --git a/samples/DtmSample/Controllers/XaTestController.cs b/samples/DtmSample/Controllers/XaTestController.cs new file mode 100644 index 0000000..e7cd164 --- /dev/null +++ b/samples/DtmSample/Controllers/XaTestController.cs @@ -0,0 +1,92 @@ +using Dtmcli; +using DtmSample.Dtos; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace DtmSample.Controllers +{ + /// + /// XA 示例 + /// + [ApiController] + [Route("/api")] + public class XaTestController : ControllerBase + { + + private readonly ILogger _logger; + private readonly XaGlobalTransaction _globalTransaction; + private readonly AppSettings _settings; + + public XaTestController(ILogger logger, IOptions optionsAccs, XaGlobalTransaction transaction) + { + _logger = logger; + _settings = optionsAccs.Value; + _globalTransaction = transaction; + } + + /// + /// Xa 成功提交 + /// + /// + /// + [HttpPost("commit")] + public async Task Commit(CancellationToken cancellationToken) + { + //todo: Currently only supported by mysql, please modify the appsettings.json + try + { + await _globalTransaction.ExcecuteAsync(async (Xa xa) => + { + //// 用户1 转出30元 + var res1 = await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken); + + //// 用户2 转入30元 + var res2 = await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken); + }, cancellationToken); + + return Ok(TransResponse.BuildSucceedResponse()); + } + catch (Exception ex) + { + _logger.LogError(ex, "Xa Error"); + return Ok(TransResponse.BuildFailureResponse()); + } + } + + + /// + /// Xa 失败回滚 + /// + /// + /// + [HttpPost("rollbcak")] + public async Task Rollbcak(CancellationToken cancellationToken) + { + //todo: Currently only supported by mysql, please modify the appsettings.json + try + { + await _globalTransaction.ExcecuteAsync(async (Xa xa) => + { + //// 用户1 转出30元 + var res1 = await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken); + + //// 用户2 转入30元 + var res2 = await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken); + + throw new Exception("rollbcak"); + }, cancellationToken); + + return Ok(TransResponse.BuildSucceedResponse()); + } + catch (Exception ex) + { + _logger.LogError(ex, "Xa Error"); + return Ok(TransResponse.BuildFailureResponse()); + } + } + } +}