-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTailService.cs
148 lines (134 loc) · 5.39 KB
/
TailService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
using System;
using System.Configuration;
using System.Threading;
using System.Threading.Tasks;
using Common.Logging;
using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Driver.Builders;
using ServiceStack.Text;
namespace OplogTail
{
public class TailService : TaskService
{
private static readonly ILog Log = LogManager.GetLogger<TailService>();
private const string Namespace = "prmonline.projects";
public TailService()
{
CancelTokenSource.Token.Register(OnStop);
}
public override string Name
{
get { return "Tail service"; }
}
private static string ConnectionString
{
get { return ConfigurationManager.ConnectionStrings["DbConnection"].ConnectionString; }
}
protected override void DoStart()
{
while (true)
{
try
{
CancelTokenSource.Token.ThrowIfCancellationRequested();
Task.Factory.StartNew(TailOpLog,
CancelTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Current)
.Wait();
}
catch (OperationCanceledException)
{
Log.Info("Project indexing service operation cancelled");
}
catch (Exception ex)
{
Log.FatalFormat("Project indexing service encountered a fatal error: {0}", ex, ex.Message);
}
if (CancelTokenSource.Token.IsCancellationRequested)
{
break;
}
// Sleep to prevent thrashing the service
Log.InfoFormat("Restarting {0} after sleep", Name);
Thread.Sleep(2000);
}
Log.Info("Tail shut down");
}
private void TailOpLog()
{
while (true)
{
try
{
CancelTokenSource.Token.ThrowIfCancellationRequested();
Log.InfoFormat("Tailing oplog on {0}", ConnectionString);
// Oplog includes operations that occur in all databases in the replicaset. The oplog denotes which
// database an operation goes to be using a namespace syntax
var client = new MongoClient(ConnectionString);
var collection = client.GetServer().GetDatabase("local").GetCollection("oplog.rs");
var query = Query.And(Query.GT("ts", new BsonTimestamp((int)DateTime.Today.ToUnixTime(), 0)),
Query.EQ("ns", Namespace));
var cursor = collection.Find(query)
.SetFlags(QueryFlags.TailableCursor | QueryFlags.AwaitData)
.SetSortOrder(SortBy.Ascending("$natural"));
using (var enumerator = new MongoCursorEnumerator<BsonDocument>(cursor))
{
while (true)
{
CancelTokenSource.Token.ThrowIfCancellationRequested();
Log.Debug("Enumerating");
if (enumerator.MoveNext())
{
var document = enumerator.Current;
Log.DebugFormat("Processing change: {0}", document);
}
else
{
Log.Debug("Didn't get anything");
if (enumerator.IsDead)
{
Log.Info("Cursor has ended and is dead");
break;
}
if (!enumerator.IsServerAwaitCapable)
{
CancelTokenSource.Token.ThrowIfCancellationRequested();
Log.Info("Throttling next cursor attempt. Awaiting 1 second.");
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
}
else
{
// The method we are using for enumerating causes the loop execute as fast as it can
// Slowing it down here a bit.
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
}
}
}
}
}
catch (OperationCanceledException)
{
Log.Info("Op log tailing operation cancelled");
return;
}
catch (Exception ex)
{
Log.Error(ex.Message, ex);
}
finally
{
if (!CancelTokenSource.Token.IsCancellationRequested)
{
Thread.Sleep(TimeSpan.FromMilliseconds(2000));
}
}
}
}
private void OnStop()
{
Log.Info("Stop called");
}
}
}