-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcron.go
164 lines (134 loc) · 4.2 KB
/
cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package main
import (
"fmt"
"io"
"strings"
"time"
"github.com/0xERR0R/crony/healthchecks"
"github.com/armon/circbuf"
"github.com/docker/docker/pkg/stdcopy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
)
const (
maxLogSize = 1 * 1024 * 1024
)
var (
executed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "crony_executed_count",
Help: "Number of job executions",
}, []string{"container_name", "success"})
durationGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "crony_last_duration_sec",
Help: "last job duration in sec",
}, []string{"container_name", "success"})
lastExecutionGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "crony_last_execution_ts",
Help: "last job execution timestamp",
}, []string{"container_name", "success"})
)
type ContainerJob struct {
docker *DockerClient
containerName string
mailConfig *MailConfig
hc *healthchecks.Check
}
func (cj *ContainerJob) Run() {
log.Debugf("starting execution of container '%s'", cj.containerName)
startTime := time.Now()
// TODO check container state
err := cj.docker.ContainerStart(cj.containerName)
if err != nil {
log.Errorf("can't start container '%s': %v", cj.containerName, err)
return
}
cj.jobStarted()
statusCh, errCh := cj.docker.ContainerWait(cj.containerName)
var returnCode int64
select {
case err := <-errCh:
if err != nil {
log.Errorf("can't wait for the end of the execution of container '%s': %v", cj.containerName, err)
return
}
case s := <-statusCh:
returnCode = s.StatusCode
}
labels := prometheus.Labels{
"container_name": cj.containerName,
"success": fmt.Sprintf("%t", returnCode == 0)}
defer executed.With(labels).Inc()
defer lastExecutionGauge.With(labels).Set(float64(startTime.Unix()))
endTime := time.Now()
jobDuration := endTime.Sub(startTime)
defer durationGauge.With(labels).Set(jobDuration.Seconds())
log.StandardLogger().Logf(logLevelForReturnCode(returnCode), "Execution of container '%s' finished with return code %d", cj.containerName, returnCode)
out, err := cj.docker.ContainerLogs(cj.containerName, startTime)
if err != nil {
log.Errorf("can't retrieve logs for container '%s': %v", cj.containerName, err)
out = io.NopCloser(strings.NewReader(fmt.Sprintf("can't retrieve logs for container '%s'", cj.containerName)))
}
log.Debug("using mail config: ", cj.mailConfig)
stdOutBuf, _ := circbuf.NewBuffer(maxLogSize)
stdErrBuf, _ := circbuf.NewBuffer(maxLogSize)
_, err = stdcopy.StdCopy(stdOutBuf, stdErrBuf, out)
if err != nil {
log.Error("can't retrieve output streams: ", err)
}
stdout := stdOutBuf.String()
stderr := stdErrBuf.String()
cj.jobFinished(returnCode, fmt.Sprintf("%s\n%s", stdout, stderr))
if cj.mailConfig.MailPolicy == Always || (cj.mailConfig.MailPolicy == OnError && returnCode != 0) {
err = SendMail(cj.mailConfig, MailParams{
ContainerName: cj.containerName,
ReturnCode: returnCode,
Duration: jobDuration,
StdOut: stdout,
StdErr: stderr,
})
if err != nil {
log.Error("can't send mail: ", err)
}
}
}
func (cj *ContainerJob) jobFinished(returnCode int64, message string) {
if cj.hc != nil {
err := cj.hc.Ping(returnCode, message)
if err != nil {
log.Error("can't ping 'end' to hc.io: ", err)
}
}
}
func (cj *ContainerJob) jobStarted() {
if cj.hc != nil {
err := cj.hc.Start()
if err != nil {
log.Error("can't ping 'start' to hc.io: ", err)
}
}
}
func logLevelForReturnCode(returnCode int64) log.Level {
if returnCode != 0 {
return log.WarnLevel
}
return log.DebugLevel
}
type SkipLogger struct {
containerName string
}
func (l *SkipLogger) Info(_ string, _ ...interface{}) {
log.StandardLogger().Infof("skipping execution of container '%s', is still running", l.containerName)
}
func (l *SkipLogger) Error(err error, msg string, keysAndValues ...interface{}) {
log.StandardLogger().Error(err, msg, keysAndValues)
}
func createAndStartCron() *cron.Cron {
_ = prometheus.Register(executed)
_ = prometheus.Register(lastExecutionGauge)
_ = prometheus.Register(durationGauge)
c := cron.New()
c.Start()
return c
}