Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature http xa #69

Merged
merged 11 commits into from
Jul 11, 2023
22 changes: 22 additions & 0 deletions README-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,28 @@ public class MyBusi
}
```

### XA pattern

```cs
public class MyBusi
{
private readonly Dtmcli.XaGlobalTransaction _globalTransaction;

public MyBusi(Dtmcli.XaGlobalTransaction globalTransaction)
{
this._globalTransaction = globalTransaction;
}

public async Task DoBusAsync()
{
await _globalTransaction.Excecute(async (Xa xa) =>
{
await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);
await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);
}, cancellationToken);
}
}
```

## 可运行的使用示例

Expand Down
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,30 @@ public class MyBusi
}
```

### XA pattern


```cs
public class MyBusi
{
private readonly Dtmcli.XaGlobalTransaction _globalTransaction;

public MyBusi(Dtmcli.XaGlobalTransaction globalTransaction)
{
this._globalTransaction = globalTransaction;
}

public async Task DoBusAsync()
{
await _globalTransaction.Excecute(async (Xa xa) =>
{
await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);
await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);
}, cancellationToken);
}
}
```

## Complete example


Expand Down
1 change: 1 addition & 0 deletions src/DtmCommon/Constant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class Constant
public static readonly string TYPE_TCC = "tcc";
public static readonly string TYPE_SAGA = "saga";
public static readonly string TYPE_MSG = "msg";
public static readonly string TYPE_XA = "xa";

public static readonly string ResultFailure = "FAILURE";
public static readonly string ResultSuccess = "SUCCESS";
Expand Down
29 changes: 25 additions & 4 deletions src/DtmCommon/Imp/DbSpecialDelegate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,37 @@ public class DbSpecialDelegate
{
private readonly IDbSpecial _special;

private readonly Dictionary<string, IDbSpecial> _specialDic;

public DbSpecialDelegate(IEnumerable<IDbSpecial> specials, IOptions<DtmOptions> optionsAccs)
{
var dbSpecial = specials.FirstOrDefault(x => x.Name.Equals(optionsAccs.Value.SqlDbType));
this._specialDic = GetSpecialDictionary(specials);
if (this._specialDic.TryGetValue(optionsAccs.Value.SqlDbType, out _special) == false)
throw new DtmException($"unknown db type '{optionsAccs.Value.SqlDbType}'");
}

if (dbSpecial == null) throw new DtmException($"unknown db type '{optionsAccs.Value.SqlDbType}'");
public IDbSpecial GetDbSpecial() => _special;

public IDbSpecial GetDbSpecialByName(string sqlDbType)
{
IDbSpecial special;
if (this._specialDic.TryGetValue(sqlDbType, out special) == false)
throw new DtmException($"unknown db type '{sqlDbType}'");

_special = dbSpecial;
return special;
}

public IDbSpecial GetDbSpecial() => _special;
Dictionary<string, IDbSpecial> GetSpecialDictionary(IEnumerable<IDbSpecial> specials)
{
Dictionary<string, IDbSpecial> specialDic = new();
foreach (var special in specials)
{
if (specialDic.ContainsKey(special.Name) == false)
specialDic.Add(special.Name, special);
}

return specialDic;
}
}

}
2 changes: 2 additions & 0 deletions src/Dtmcli/Constant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ internal static class Request

internal const string OPERATION_REGISTERBRANCH = "registerBranch";

internal const string PHASE2_URL = "phase2_url";

/// <summary>
/// branch type for message, SAGA, XA
/// </summary>
Expand Down
12 changes: 4 additions & 8 deletions src/Dtmcli/DtmClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ namespace Dtmcli
public class DtmClient : IDtmClient
{
private static readonly char Slash = '/';
private static readonly string QueryStringFormat = "dtm={0}&gid={1}&trans_type={2}&branch_id={3}&op={4}";
private static readonly string QuestionMark = "?";
private static readonly string And = "&";

Expand Down Expand Up @@ -91,13 +90,10 @@ public async Task TransRegisterBranch(TransBase tb, Dictionary<string, string> a

public async Task<HttpResponseMessage> TransRequestBranch(TransBase tb, HttpMethod method, object body, string branchID, string op, string url, CancellationToken cancellationToken)
{
var queryParams = string.Format(
QueryStringFormat,
string.Concat(_dtmOptions.DtmUrl.TrimEnd(Slash), Constant.Request.URLBASE_PREFIX),
tb.Gid,
tb.TransType,
branchID,
op);
var uriPath = string.Concat(_dtmOptions.DtmUrl.TrimEnd(Slash), Constant.Request.URLBASE_PREFIX);
var queryParams = tb.TransType == DtmCommon.Constant.TYPE_XA ?
$"dtm={uriPath}&gid={tb.Gid}&trans_type={tb.TransType}&branch_id={branchID}&op={op}&phase2_url={url}" :
$"dtm={uriPath}&gid={tb.Gid}&trans_type={tb.TransType}&branch_id={branchID}&op={op}";

var client = _httpClientFactory.CreateClient(Constant.BranchClientHttpName);

Expand Down
2 changes: 2 additions & 0 deletions src/Dtmcli/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ private static void AddDtmCore(IServiceCollection services)
services.TryAddSingleton<IDtmTransFactory, DtmTransFactory>();
services.TryAddSingleton<IDtmClient, DtmClient>();
services.TryAddSingleton<TccGlobalTransaction>();
services.TryAddSingleton<XaGlobalTransaction>();
services.TryAddSingleton<XaLocalTransaction>();

DtmCommon.ServiceCollectionExtensions.AddDtmCommon(services);

Expand Down
110 changes: 110 additions & 0 deletions src/Dtmcli/Xa/Xa.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
using Dtmcli.DtmImp;
using DtmCommon;
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using static Dtmcli.Constant;

namespace Dtmcli
{
public sealed class Xa : TransBase
{
private readonly IDtmClient _dtmClient;

internal Xa(IDtmClient dtmHttpClient, string gid)
{
this._dtmClient = dtmHttpClient;
this.Gid = gid;
this.TransType = DtmCommon.Constant.TYPE_XA;
this.BranchIDGen = new BranchIDGen();
}

internal Xa(IDtmClient dtmHttpClient)
{
this._dtmClient = dtmHttpClient;
}

public async Task<string> CallBranch(object body, string url, CancellationToken cancellationToken = default)
{
using var response = await _dtmClient.TransRequestBranch(
this,
HttpMethod.Post,
body,
this.BranchIDGen.NewSubBranchID(),
Constant.Request.BRANCH_ACTION,
url,
cancellationToken).ConfigureAwait(false);

Exception ex = await Utils.RespAsErrorCompatible(response);
if (null != ex)
throw ex;

return await response.Content.ReadAsStringAsync();
}

[JsonIgnore]
public string Phase2Url { get; set; }


#if NET5_0_OR_GREATER
public static Xa FromQuery(IDtmClient dtmClient, Microsoft.AspNetCore.Http.IQueryCollection quersy)
JackBOBO marked this conversation as resolved.
Show resolved Hide resolved
{
if (quersy.TryGetValue(Request.GID, out var gid) == false || string.IsNullOrEmpty(gid))
throw new ArgumentNullException(Request.GID);

if (quersy.TryGetValue(Request.TRANS_TYPE, out var transType) == false || string.IsNullOrEmpty(transType))
throw new ArgumentNullException(Request.TRANS_TYPE);

if (quersy.TryGetValue(Request.OP, out var op) == false || string.IsNullOrEmpty(op))
throw new ArgumentNullException(Request.OP);

if (quersy.TryGetValue(Request.BRANCH_ID, out var branchID) == false || string.IsNullOrEmpty(branchID))
throw new ArgumentNullException(Request.BRANCH_ID);

quersy.TryGetValue(Request.DTM, out var dtm);
quersy.TryGetValue(Request.PHASE2_URL, out var phase2Url);

return new(dtmClient)
{
Gid = gid,
Dtm = dtm,
Op = op,
TransType = transType,
Phase2Url = phase2Url,
BranchIDGen = new BranchIDGen(branchID),
};
}
#else
public static Xa FromQuery(IDtmClient dtmClient, IDictionary<string, string> quersy)
{
if (!quersy.TryGetValue(Request.GID, out var gid) == false || string.IsNullOrEmpty(gid))
throw new ArgumentNullException(Request.GID);

if (quersy.TryGetValue(Request.TRANS_TYPE, out var transType) == false || string.IsNullOrEmpty(transType))
throw new ArgumentNullException(Request.TRANS_TYPE);

if (quersy.TryGetValue(Request.OP, out var op) == false || string.IsNullOrEmpty(op))
throw new ArgumentNullException(Request.OP);

if (quersy.TryGetValue(Request.BRANCH_ID, out var branchID) == false || string.IsNullOrEmpty(branchID))
throw new ArgumentNullException(Request.BRANCH_ID);

quersy.TryGetValue(Request.DTM, out var dtm);
quersy.TryGetValue(Request.PHASE2_URL, out var phase2Url);

return new(dtmClient)
{
Gid = gid,
Dtm = dtm,
Op = op,
TransType = transType,
Phase2Url = phase2Url,
BranchIDGen = new BranchIDGen(branchID),
};
}
#endif
}
}
52 changes: 52 additions & 0 deletions src/Dtmcli/Xa/XaGlobalTransaction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using DtmCommon;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Dtmcli
{
public sealed class XaGlobalTransaction
{
private readonly IDtmClient _dtmClient;
private readonly ILogger _logger;

public XaGlobalTransaction(IDtmClient dtmClient, ILoggerFactory factory)
{
this._dtmClient = dtmClient;
this._logger = factory.CreateLogger<XaGlobalTransaction>();
}

public async Task<string> ExcecuteAsync(Func<Xa, Task> xa_cb, CancellationToken cancellationToken = default)
{
var gid = await _dtmClient.GenGid(cancellationToken);
await this.Excecute(gid, xa_cb, cancellationToken);
return gid;
}

public async Task Excecute(string gid, Func<Xa, Task> xa_cb, CancellationToken cancellationToken = default)
{
await ExcecuteAsync(gid, null, xa_cb, cancellationToken);
}

public async Task ExcecuteAsync(string gid, Action<Xa> custom, Func<Xa, Task> xa_cb, CancellationToken cancellationToken = default)
{
Xa xa = new(this._dtmClient, gid);
if (null != custom)
custom(xa);

try
{
await _dtmClient.TransCallDtm(null, xa, Constant.Request.OPERATION_PREPARE, cancellationToken);
await xa_cb(xa);
await _dtmClient.TransCallDtm(null, xa, Constant.Request.OPERATION_SUBMIT, cancellationToken);
}
catch (Exception ex)
{
xa.RollbackReason = ex.Message.Substring(0, ex.Message.Length > 1023 ? 1023 : ex.Message.Length);
_logger.LogError(ex, "prepare or submitting global transaction error");
await _dtmClient.TransCallDtm(null, xa, Constant.Request.OPERATION_ABORT, cancellationToken);
}
}
}
}
Loading