-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchange.py
331 lines (302 loc) · 9.82 KB
/
change.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# To keep things simple, we use a FIFO for IPC. The server process is hooked to
# Github. Whenever a repository is updated, it writes to the FIFO the name of
# this repository, followed by a line break. On its side, the update process
# reads the repository names and updates things accordingly. The same is done
# for bibliography updates.
#
# We do not
# implement any buffering for passing messages, because pipe buffers are big
# enough for our purposes.
#
# We use the WAL mode in SQLite. Thus, writers don't block readers and
# vice-versa, but writers still do block each other, which is why we use just
# one and serialize writes.
# TODO add an option for turning off web access (no git pulls, no backups)
import os, sys, time, select, errno, logging, fcntl, argparse, traceback
from dharma import common, texts, biblio, catalog, people, langs
from dharma import gaiji, prosody, repos
SKIP_PULL = False
FIFO_ADDR = common.path_of("change.hid")
db = common.db("texts")
# Timestamp of the last git pull/clone
last_pull = 0
# Wait this long between two pulls, counting in seconds
min_pull_wait = 10
@common.transaction("texts")
def all_useful_repos():
db = common.db("texts")
# Always process repos in the same order.
ret = db.execute("""select repo from repos
where textual or repo = 'project-documentation'
order by repo""")
ret = [name for (name,) in ret]
return ret
def clone_repo(name):
path = common.path_of("repos", name)
# The simplest way to determine if we already have cloned the repo is
# to check if we have a non-empty directory at the expected location.
try:
os.rmdir(path)
except FileNotFoundError:
pass
except OSError as e:
if e.errno == errno.ENOTEMPTY:
return False
raise
common.command("git", "clone", f"[email protected]:erc-dharma/{name}.git",
path, capture_output=False)
return True
# Github apparently doesn't like it when we pull too often. We often get a
# message "kex_exchange_identification: read: Connection reset by peer". So we
# wait a bit between pulls.
def update_repo(name):
if SKIP_PULL:
return
global last_pull
now = time.time()
diff = now - last_pull
if diff < min_pull_wait:
time.sleep(min_pull_wait - diff)
last_pull = now
# Attempt to clone the repo, in case we don't have it. Otherwise pull.
if clone_repo(name):
return
return common.command("git", "-C", common.path_of("repos", name),
"pull", capture_output=False)
def latest_commit_in_repo(name):
r = common.command("git", "-C", common.path_of("repos", name),
"log", "-1", "--format=%H %at")
hash, date = r.stdout.strip().split()
date = int(date)
return hash, date
class Changes:
def __init__(self, repo):
self.repo = repo
self.since = -1
self.done = False
self.before = set()
self.insert = []
self.update = []
self.delete = []
self.commit_hash, self.commit_date = latest_commit_in_repo(self.repo)
def check_db(self):
commit_hash, code_hash = db.execute("""
select commit_hash, code_hash
from repos where repo = ?""",
(self.repo,)).fetchone() or (None, None)
if commit_hash == self.commit_hash:
if code_hash == common.CODE_HASH:
self.done = True
return
# The code changed, we need to update everything
# (even possibly deleted files, file matching rules
# might have changed).
else:
# Need to update all files that have been modified
# since the last commit viz. files that have been
# modified more recently than the newest file seen
# so far in this repo.
(self.since,) = db.execute("""select max(mtime)
from files where repo = ?""",
(self.repo,)).fetchone() or (-1,)
for (name,) in db.execute("""select name from files
where repo = ?""", (self.repo,)):
self.before.add(name)
def check_repo(self):
if self.done:
return
seen = set()
for file in texts.iter_texts_in_repo(self.repo):
seen.add(file.name)
if file.name not in self.before:
self.insert.append(file)
elif file.mtime > self.since:
self.update.append(file)
else:
continue
for name in self.before:
if name not in seen:
self.delete.append(name)
texts.gather_web_pages(self.repo, self.insert + self.update)
# Always process files in the same order, for reproductibility.
self.insert.sort(key=lambda file: file.name, reverse=True)
self.update.sort(key=lambda file: file.name, reverse=True)
self.delete.sort()
self.done = True
def update_db(repo):
changes = Changes(repo)
changes.check_db()
if changes.done:
return
changes.check_repo()
db.execute("""update repos
set commit_hash = ?, commit_date = ?, code_hash = ?
where repo = ?""",
(changes.commit_hash, changes.commit_date, common.CODE_HASH, changes.repo))
for name in changes.delete:
catalog.delete(name)
db.execute("delete from owners where name = ?", (name,))
db.execute("delete from files where name = ?", (name,))
for todo in ("insert", "update"):
todo = getattr(changes, todo)
while todo:
file = todo.pop()
db.save_file(file)
catalog.insert(file)
# We should always put stuff like names, etc. in the db instead of keeping it
# in-memory, so that we can tell what's the current data just by looking at
# the db. Otherwise would have to write introspection code. Other reason: at
# some point, we want to have a downloadable read-only db. Ideally, it should
# be possible to run the code without having to set up repositories.
def update_project():
# TODO add tests to verify whether the files we need changed, to avoid
# doing a full rebuild when not necessary.
people.make_db()
langs.make_db()
gaiji.make_db()
prosody.make_db()
repos.make_db()
catalog.rebuild()
repo = "project-documentation"
commit_hash, commit_date = latest_commit_in_repo(repo)
db.execute("""update repos
set commit_hash = ?, commit_date = ?, code_hash = ?
where repo = ?""",
(commit_hash, commit_date, common.CODE_HASH, repo))
# XXX we also need to store schemas in the db, but for this we need to
# derive them at runtime
# Request from Arlo. This should eventually removed in favor of an export to
# electronic-texts.
def backup_to_jawakuno():
common.command("bash", "-x", common.path_of("backup_to_jawakuno.sh"),
capture_output=False)
def backup_biblio():
common.command("bash", "-x", common.path_of("backup_biblio.sh"),
capture_output=False)
@common.transaction("texts")
def handle_changes(name):
update_repo(name)
if name == "project-documentation":
update_project()
else:
update_db(name)
db.execute("replace into metadata values('last_updated', strftime('%s', 'now'))")
if name == "tfd-nusantara-philology":
backup_to_jawakuno()
# Must be at least this big in POSIX. Linux currently has 4096.
PIPE_BUF = 512
# When we should force a full update. We perform one at startup.
NEXT_FULL_UPDATE = time.time()
# Force a full update every FORCE_UPDATE_DELTA seconds.
FORCE_UPDATE_DELTA = 1 * 60 * 60
# In the worst case, if we're not fast enough to handle any update events, we
# just end up running forced full updates continuously. We check if a full
# update is necessary *before* trying to update a singular repo, so there is
# always an opportunity for all repos to be updated.
def read_names(fd):
buf = ""
global NEXT_FULL_UPDATE
while True:
now = time.time()
wait = NEXT_FULL_UPDATE - now
if wait <= 0:
logging.info("forcing full update")
yield "all"
wait = FORCE_UPDATE_DELTA
NEXT_FULL_UPDATE = now + wait
continue
end = buf.find("\n")
if end >= 0:
name = buf[:end]
yield name
buf = buf[end + 1:]
continue
logging.info("selecting")
rlist, _, _ = select.select([fd], [], [], wait)
if not rlist:
continue
data = os.read(fd, PIPE_BUF)
logging.info("read %d" % len(data))
buf += data.decode("ascii")
def read_changes(fd):
for name in read_names(fd):
repos = all_useful_repos()
if name == "all":
logging.info("updating everything...")
for name in repos:
logging.info("updating %r" % name)
handle_changes(name)
logging.info("updated %r" % name)
logging.info("updating biblio...")
biblio.update()
logging.info("updated biblio")
backup_biblio()
logging.info("updated everything")
elif name == "bib":
logging.info("updating biblio...")
biblio.update()
logging.info("updated biblio")
backup_biblio()
elif name in repos:
logging.info("updating single repo %r..." % name)
handle_changes(name)
logging.info("updated single repo %r" % name)
else:
logging.warning("junk command: %r" % name)
# To be used by clients, not when running this __main__ (this would release the
# lock we hold on the fifo).
def notify(name):
msg = name.encode("ascii") + b"\n"
assert len(msg) <= PIPE_BUF
fd = os.open(FIFO_ADDR, os.O_RDWR | os.O_NONBLOCK)
try:
os.write(fd, msg)
finally:
os.close(fd)
def init_db():
common.db("texts")
def main():
try:
os.mkdir(common.path_of("repos"))
except FileExistsError:
pass
try:
os.mkfifo(FIFO_ADDR)
except FileExistsError:
pass
fd = os.open(FIFO_ADDR, os.O_RDWR)
try:
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
logging.error("cannot obtain lock, is another change process running?")
sys.exit(1)
init_db()
logging.info("ready")
while True:
try:
read_changes(fd)
except KeyboardInterrupt:
break
except Exception as e:
logging.error(e)
traceback.print_exception(e)
# Don't immediately retry to avoid a busy loop. Might
# want to distinguish network errors from programming
# errors, etc.; in the first case, we could retry
# sooner.
global NEXT_FULL_UPDATE
now = time.time()
if NEXT_FULL_UPDATE - now < 0:
NEXT_FULL_UPDATE += FORCE_UPDATE_DELTA
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-k", "--skip-update", action="store_true", help="""
do not force an update at startup""")
parser.add_argument("-l", "--local", action="store_true", help="""
do not pull git repositories""")
args = parser.parse_args()
if args.skip_update:
NEXT_FULL_UPDATE += FORCE_UPDATE_DELTA
if args.local:
SKIP_PULL = True
main()