-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscheduler.awk
210 lines (175 loc) · 7.04 KB
/
scheduler.awk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# see http://howardhinnant.github.io/date_algorithms.html
# system sleep command
function _sleep_sys(seconds) { if (system(sprintf("trap \"exit 1\" 2; sleep %.3f", seconds)) != 0) exit 1 }
# gawk-native sleep implementation
# A gawk 4 extension is also available that provides fractional sleep seconds support and fractional gettimeofday()
# - see: https://www.gnu.org/software/gawk/manual/html_node/Extension-Sample-Time.html#Extension-Sample-Time
# - @load "time"
# - the_time = gettimeofday()
# - result = sleep(seconds) # seconds can be floating point
# alternatives for dev_block : a stalled pipe "/tmp/mypipe" made with mkfifo, "/inet/udp/0/127.0.0.1/0" (faster than localhost), "/dev/stderr", "/dev/fd/3"
function sleep(seconds, dev_block,dev_null) {
if ( (seconds+0) < 0.001 ) return 0 # this prevents most of bad input (negative, zero, lower-than-millesecond, and string arguments)
dev_block="/inet/udp/0/127.0.0.1/0" # alternative: dev_block="/dev/stderr"
# setting READ_TIMEOUT on this input will set ERRNO - not need to wait for getline
ERRNO = 0; PROCINFO[dev_block, "READ_TIMEOUT"] = (seconds * 1000) # timeout in milliseconds
# If there is an error - use the system sleep command
if (ERRNO || PROCINFO["version"] !~ /^4/) { # in case that fails or not supported use system sleep
_sleep_sys(seconds)
} else {
getline dev_null < dev_block; close(dev_block) # we use the dummy var "dev_null" to avoid setting $0
}
return 1
}
# Helper to sleep until specific timestamp
function sleep_until(target, diff,use_msec) {
#if (target ~ /[0-9]+[.][0-9]+$/) use_msec = 1
diff = target - systime_msec()
if ( diff <= 0 ) return 0 # return fail (0) if target has already passed
while (1) {
if ( diff <= 0 ) return 1
if ( diff > 10 ) { sleep(diff/2) } else { sleep(diff) } # wake up early on long idle periods to make for timer drifts (on high system load)
diff = target - systime_msec() # diff = (use_msec) ? (target - systime_msec()) : (target - systime())
}
return 1
}
# Compute the immediate next timestamp when given an interval and an offset
function compute_next(expr, tstamp, first_time, pos, period, offset, offset_mod, unit, tnext) {
period = expr; offset = ""
if (pos = index(expr, ":")) {
period = substr(expr, 1, pos - 1)
offset = substr(expr, pos + 1)
}
tnext = round_timestamp(period, tstamp)
if (offset) {
# Calculate the module of the offset with the period (if the offset is smaller than the period, then that will equal the offset)
tnext = compute_relative("+" offset, tnext)
offset_mod = sprintf("%d", tnext - round_timestamp(period, tnext)) + 0
# Now calculate it with the right offset (offset_mod)
if (offset_mod && offset_mod > 0)
tnext = compute_relative("+" offset_mod "s", round_timestamp(period, tstamp))
}
if ((tnext + 0) <= (tstamp + 1)) # make sure a string timestamp like "1491234567.000" is converted to numeric or else bad things happen
tnext = compute_relative("+" period, tnext)
return tnext
}
# Task Scheduler
function minicron(Tasks, TasksInfo, method, handler,
TasksQueue, TasksNext, Data, now, now_scheduled, tnext, got_rest, cnt_unrest,
task_id, i, n, expr, now_cnt) {
split("", TasksNext)
# Initialize Queue
split("", TasksQueue)
now = systime_msec()
# Start: Find the next time instance for all definitions and add them to the queue
for (task_id = 1; task_id <= length(Tasks); task_id++) {
expr = Tasks[task_id]
tnext = compute_next(expr, now, "first_time")
PrioInsert(TasksQueue, tnext, task_id)
}
if (method == "debug") {
DumpPrioQueue(TasksQueue)
}
while(1) {
now_cnt = 0
if(n = PopLow(TasksQueue, Data)) {
tnext = Data["val"]
TasksNext[++now_cnt] = Data["data"]
}
split("", Data)
while(n > 1 && TasksQueue[TasksQueue["lowest"], "val"] == tnext) {
n = PopLow(TasksQueue, Data)
TasksNext[++now_cnt] = Data["data"]
}
# Get some sleep and check whether the time has already passed
got_rest = sleep_until(tnext)
if(!got_rest) {
if (++cnt_unrest > 3) {
error("Multiple consecutive past timestamps coming in scheduler. That indicates a bug, an unsupported time utility or invalid input. Forcing some sleep..")
sleep(cnt_unrest - 2)
}
} else {
cnt_unrest = 0
}
# Needed timestamps and some checks
now = systime_msec()
now_scheduled = tnext
# Now take some action
for (i=1; i <= now_cnt; i++) {
task_id = TasksNext[i]
expr = Tasks[task_id]
tnext = compute_next(expr, now)
PrioInsert(TasksQueue, tnext, task_id)
if (method == "callback") {
# @handler(task_id, now_scheduled, TasksInfo) # commented-out for gawk3 support
task_handler(task_id, now_scheduled, TasksInfo)
} else if (method == "program") {
system(sprintf("%s %s %s %s", handler, task_id, now_scheduled, TasksInfo))
} else if (method == "debug") {
message("Running task " task_id " at: " now " - next is: " tnext)
}
}
split("", TasksNext)
split("", Data)
}
}
# The same Scheduler as before, only this case cheating the clock
# This function nevers call the real clock, it only pretends that it did by sleeping just some seconds
# and assuming that the time is equal to that of the first scheduled task
function cheatycron(Tasks, TasksInfo, method, handler, time_start, sleep_cheat,
TasksQueue, TasksNext, Data, now, now_scheduled, tnext, got_rest, cnt_unrest,
task_id, i, n, expr, now_cnt) {
split("", TasksNext)
# Initialize Queue
split("", TasksQueue)
now = (time_start) ? time_start : compute_relative("-25h", round_timestamp("1d", systime_msec()))
# Start: Find the next time instance for all definitions and add them to the queue
for (task_id = 1; task_id <= length(Tasks); task_id++) {
expr = Tasks[task_id]
tnext = compute_next(expr, now)
PrioInsert(TasksQueue, tnext, task_id)
}
if (method == "debug") {
DumpPrioQueue(TasksQueue)
}
while(1) {
now_cnt = 0
if(n = PopLow(TasksQueue, Data)) {
tnext = Data["val"]
TasksNext[++now_cnt] = Data["data"]
}
split("", Data)
while(n > 1 && TasksQueue[TasksQueue["lowest"], "val"] == tnext) {
n = PopLow(TasksQueue, Data)
TasksNext[++now_cnt] = Data["data"]
}
# Needed timestamps and some checks
now = tnext
now_scheduled = tnext
# Exit if you reached the present
if ((now + 0) >= (systime_msec() + 0)) {
message("The queue scheduler finished his job. Reached present. Exiting", 1)
exit
}
# Now take some action
for (i=1; i <= now_cnt; i++) {
task_id = TasksNext[i]
expr = Tasks[task_id]
tnext = compute_next(expr, now)
PrioInsert(TasksQueue, tnext, task_id)
if (method == "callback") {
# @handler(task_id, now_scheduled, TasksInfo) # commented-out for gawk3 support
task_handler(task_id, now_scheduled, TasksInfo)
} else if (method == "program") {
system(sprintf("%s %s %s %s", handler, task_id, now_scheduled, TasksInfo))
} else if (method == "debug") {
message("Running task " task_id " at: " now " - next is: " tnext)
}
}
split("", TasksNext)
split("", Data)
# Get some sleep
if (! (sleep_cheat+0)) sleep_cheat = 4
_sleep_sys(sleep_cheat)
}
}