forked from tmgxz/mongodb-operation
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MimickingTransactionalBehavior.cs
207 lines (185 loc) · 9.16 KB
/
MimickingTransactionalBehavior.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleApp_MongoDB
{
/// <summary>
/// 模拟关系数据库的事务
/// 任务描述:两个银行账户之间的转账操作
/// { _id: 1, balance: 100, txns: [] },{ _id: 2, balance: 0, txns: [] }
/// </summary>
public class MimickingTransactionalBehavior
{
private string TransactionCollectionName = "TransactionCollection";
private string AccountsCollectionName = "UserAccounts";
private MongoDBService mongoDBService = new MongoDBService("mongodb://localhost:27017/MimiTransaction?maxPoolSize=100&minPoolSize=10",
"MimiTransaction");
public void Transfer(double amt, string source, string destination,TimeSpan maxTxnTime)
{
try
{
//准备转账
TransactionDocument txn = PrepareTransfer(amt, source, destination);
CommitTransfer(txn, maxTxnTime);
}
catch(Exception ex)
{
Console.WriteLine(ex.Message);
}
}
//准备转账
private TransactionDocument PrepareTransfer(double amt, string source, string destination)
{
string strGUID = System.Guid.NewGuid().ToString("N"); //类似e0a953c3ee6040eaa9fae2b667060e09
//创建事务文档
TransactionDocument tDoc = new TransactionDocument
{
_id = strGUID, //这个应该是随机生成的一个串
State ="new",
Ts = DateTime.Now,
Amt = amt,
Src = source,
Dst = destination
};
//将事务文档插入事务集合
bool isSu = mongoDBService.Insert(TransactionCollectionName, tDoc);
if(!isSu)
{
throw new Exception("构建事务文档失败!");
}
FilterDefinitionBuilder<Account> filterBuilder = Builders<Account>.Filter;
//更新source账户
FilterDefinition<Account> filterS = filterBuilder.Eq(m => m._id, source)&filterBuilder.Gte(m => m.Balance, amt);
UpdateDefinition<Account> updateS = Builders<Account>.Update.Push(m => m.Txns, tDoc._id).Inc(m => m.Balance, -amt);
UpdateResult updateResult = mongoDBService.DocumentUpdate(AccountsCollectionName, filterS, updateS);
//检测更新是否成功
bool isSuccess = updateResult.ModifiedCount > 0 && updateResult.ModifiedCount == updateResult.MatchedCount?
true:false;
if (!isSuccess)
{
mongoDBService.Delete<TransactionDocument>(TransactionCollectionName, m => m._id == tDoc._id);
throw new Exception("更新source账户失败");
}
//更新destination账户
FilterDefinition<Account> filterD = filterBuilder.Eq(m => m._id, destination);
var updateD = Builders<Account>.Update.Push(m => m.Txns, tDoc._id).Inc(m => m.Balance, amt);
UpdateResult updateResultD = mongoDBService.DocumentUpdate(AccountsCollectionName, filterD, updateD);
bool isSuccessD = updateResultD.ModifiedCount > 0 && updateResultD.ModifiedCount == updateResultD.MatchedCount ?
true : false;
if (!isSuccessD)
{
throw new Exception("更新destination账户失败");
}
return tDoc;
}
//提交
private void CommitTransfer(TransactionDocument txn, TimeSpan maxTxnTime)
{
DateTime now = DateTime.Now.ToUniversalTime();
DateTime cutOff = now - maxTxnTime;
//更新事务文档
FilterDefinitionBuilder<TransactionDocument> filterBuilder = Builders<TransactionDocument>.Filter;
FilterDefinition<TransactionDocument> filter1 = filterBuilder.Eq(m => m._id, txn._id);
FilterDefinition<TransactionDocument> filter2 = filterBuilder.Gt(m => m.Ts, cutOff);
FilterDefinition<TransactionDocument> filter = filterBuilder.And(new FilterDefinition<TransactionDocument>[] { filter1, filter2 });
var update = Builders<TransactionDocument>.Update.Set(m => m.State, "commit");
UpdateResult updateResult = mongoDBService.DocumentUpdate(TransactionCollectionName, filter, update);
bool isSuccess = updateResult.ModifiedCount > 0 && updateResult.ModifiedCount == updateResult.MatchedCount ?
true : false;
if (!isSuccess)
{
throw new Exception("修改事务文档失败");
}
else
{
RetireTransaction(txn);
}
}
//收回事务
//事务收回方法也是幂等的
private void RetireTransaction(TransactionDocument txn)
{
FilterDefinitionBuilder<Account> filterBuilder = Builders<Account>.Filter;
FilterDefinition<Account> filter = filterBuilder.Eq(m => m._id, txn.Src);//source
var update = Builders<Account>.Update.Pull(m => m.Txns, txn._id);
mongoDBService.DocumentUpdate(AccountsCollectionName, filter, update);
FilterDefinition<Account> filterD = filterBuilder.Eq(m => m._id, txn.Dst);//dest
var updateD = Builders<Account>.Update.Pull(m => m.Txns, txn._id);
mongoDBService.DocumentUpdate(AccountsCollectionName, filterD, updateD);
mongoDBService.Delete<TransactionDocument>(TransactionCollectionName, m => m._id == txn._id);
}
//清理,这个方法应该定期执行
//清理方法也是“幂等的”
public void CleanupTransactions(TimeSpan maxTxnTime)
{
//原书中for txn in db.transaction.find({ 'state': 'commit' }, {'_id': 1}):中的1不对,去掉
List<TransactionDocument> docCommitList = mongoDBService.List<TransactionDocument>(TransactionCollectionName, m => m.State == "commit");
foreach (TransactionDocument tdoc in docCommitList)
{
RetireTransaction(tdoc);
}
DateTime now = DateTime.Now.ToUniversalTime();
DateTime cutOff = now - maxTxnTime;
//找出超时操作 回滚
List<TransactionDocument> docRoolbackList = mongoDBService.List<TransactionDocument>(TransactionCollectionName, m => m.Ts.CompareTo(cutOff) < 0 && m.State == "new");
foreach (TransactionDocument tdoc in docRoolbackList)
{
RollbackTransfer(tdoc);
}
}
//回滚
//这里的回滚方法是“幂等的”
private void RollbackTransfer(TransactionDocument txn)
{
//恢复账户信息
FilterDefinitionBuilder<Account> filterBuilder = Builders<Account>.Filter;
FilterDefinition<Account> filter1 = filterBuilder.Eq(m => m._id, txn.Src);//source
FilterDefinition<Account> filter2 = filterBuilder.Where(m => m.Txns.Contains(txn._id));
FilterDefinition<Account> filter = filterBuilder.And(new FilterDefinition<Account>[]{filter1,filter2});
var update = Builders<Account>.Update.Inc(m => m.Balance, txn.Amt).Pull(m =>m.Txns,txn._id);
mongoDBService.DocumentUpdate(AccountsCollectionName, filter, update);
FilterDefinition<Account> filterD1 = filterBuilder.Eq(m => m._id, txn.Dst);//dest
FilterDefinition<Account> filterD2 = filterBuilder.Where(m => m.Txns.Contains(txn._id));
FilterDefinition<Account> filterD = filterBuilder.And(new FilterDefinition<Account>[] { filterD1, filterD2 });
var updateD = Builders<Account>.Update.Inc(m => m.Balance, -txn.Amt).Pull(m => m.Txns, txn._id);
mongoDBService.DocumentUpdate(AccountsCollectionName, filterD, updateD);
//删除事务文档
mongoDBService.Delete<TransactionDocument>(TransactionCollectionName, m => m._id == txn._id);
}
public void TT()
{
Account ac = new Account
{
_id = "3",
Balance =100,
Txns= new List<string>()
};
mongoDBService.Insert(AccountsCollectionName, ac);
}
}
/// <summary>
/// 事务文档
/// 事务文档存在事务集合中
/// </summary>
public class TransactionDocument
{
public string _id { set; get; }
public string State { set; get; }//状态
public string Src { set; get; }//源
public string Dst { set; get; }//目标
public double Amt { set; get; }//金额
public DateTime Ts { set; get; }//更新时间
}
/// <summary>
/// 银行账号
/// </summary>
public class Account
{
public string _id { set; get; }
public double Balance { set; get; }
public List<string> Txns { set; get; }//事务文档的_id集合
}
}