forked from pgadmin-org/pgagent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpgAgent.cpp
244 lines (201 loc) · 6.33 KB
/
pgAgent.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
//////////////////////////////////////////////////////////////////////////
//
// pgAgent - PostgreSQL Tools
//
// Copyright (C) 2002 - 2021, The pgAdmin Development Team
// This software is released under the PostgreSQL Licence
//
// pgAgent.cpp - pgAgent main entry
//
//////////////////////////////////////////////////////////////////////////
#include "pgAgent.h"
#if !BOOST_OS_WINDOWS
#include <unistd.h>
#endif
std::string connectString;
std::string backendPid;
long longWait = 30;
long shortWait = 5;
long minLogLevel = LOG_ERROR;
using namespace std;
#define MAXATTEMPTS 10
#if !BOOST_OS_WINDOWS
bool runInForeground = false;
std::string logFile;
#else
// pgAgent Initialized
void Initialized();
#endif
int MainRestartLoop(DBconn *serviceConn)
{
// clean up old jobs
int rc;
LogMessage("Clearing zombies", LOG_DEBUG);
rc = serviceConn->ExecuteVoid("CREATE TEMP TABLE pga_tmp_zombies(jagpid int4)");
if (serviceConn->BackendMinimumVersion(9, 2))
{
rc = serviceConn->ExecuteVoid(
"INSERT INTO pga_tmp_zombies (jagpid) "
"SELECT jagpid "
" FROM pgagent.pga_jobagent AG "
" LEFT JOIN pg_stat_activity PA ON jagpid=pid "
" WHERE pid IS NULL"
);
}
else
{
rc = serviceConn->ExecuteVoid(
"INSERT INTO pga_tmp_zombies (jagpid) "
"SELECT jagpid "
" FROM pgagent.pga_jobagent AG "
" LEFT JOIN pg_stat_activity PA ON jagpid=procpid "
" WHERE procpid IS NULL"
);
}
if (rc > 0)
{
// There are orphaned agent entries
// mark the jobs as aborted
rc = serviceConn->ExecuteVoid(
"UPDATE pgagent.pga_joblog SET jlgstatus='d' WHERE jlgid IN ("
"SELECT jlgid "
"FROM pga_tmp_zombies z, pgagent.pga_job j, pgagent.pga_joblog l "
"WHERE z.jagpid=j.jobagentid AND j.jobid = l.jlgjobid AND l.jlgstatus='r');\n"
"UPDATE pgagent.pga_jobsteplog SET jslstatus='d' WHERE jslid IN ( "
"SELECT jslid "
"FROM pga_tmp_zombies z, pgagent.pga_job j, pgagent.pga_joblog l, pgagent.pga_jobsteplog s "
"WHERE z.jagpid=j.jobagentid AND j.jobid = l.jlgjobid AND l.jlgid = s.jsljlgid AND s.jslstatus='r');\n"
"UPDATE pgagent.pga_job SET jobagentid=NULL, jobnextrun=NULL "
" WHERE jobagentid IN (SELECT jagpid FROM pga_tmp_zombies);\n"
"DELETE FROM pgagent.pga_jobagent "
" WHERE jagpid IN (SELECT jagpid FROM pga_tmp_zombies);\n"
);
}
rc = serviceConn->ExecuteVoid("DROP TABLE pga_tmp_zombies");
std::string host_name = boost::asio::ip::host_name();
rc = serviceConn->ExecuteVoid(
"INSERT INTO pgagent.pga_jobagent (jagpid, jagstation) SELECT pg_backend_pid(), '" +
host_name + "'"
);
if (rc < 0)
return rc;
while (1)
{
bool foundJobToExecute = false;
LogMessage("Checking for jobs to run", LOG_DEBUG);
DBresultPtr res = serviceConn->Execute(
"SELECT J.jobid "
" FROM pgagent.pga_job J "
" WHERE jobenabled "
" AND jobagentid IS NULL "
" AND jobnextrun <= now() "
" AND (jobhostagent = '' OR jobhostagent = '" + host_name + "')"
" ORDER BY jobnextrun"
);
if (res)
{
while (res->HasData())
{
std::string jobid = res->GetString("jobid");
boost::thread job_thread = boost::thread(JobThread(jobid));
job_thread.detach();
foundJobToExecute = true;
res->MoveNext();
}
res = NULL;
LogMessage("Sleeping...", LOG_DEBUG);
WaitAWhile();
}
else
LogMessage("Failed to query jobs table!", LOG_ERROR);
if (!foundJobToExecute)
DBconn::ClearConnections();
}
return 0;
}
void MainLoop()
{
int attemptCount = 1;
// OK, let's get down to business
do
{
LogMessage("Creating primary connection", LOG_DEBUG);
DBconn *serviceConn = DBconn::InitConnection(connectString);
if (serviceConn)
{
// Basic sanity check, and a chance to get the serviceConn's PID
LogMessage("Database sanity check", LOG_DEBUG);
DBresultPtr res = serviceConn->Execute(
"SELECT count(*) As count, pg_backend_pid() AS pid FROM pg_class cl JOIN pg_namespace ns ON ns.oid=relnamespace WHERE relname='pga_job' AND nspname='pgagent'"
);
if (res)
{
std::string val = res->GetString("count");
if (val == "0")
LogMessage(
"Could not find the table 'pgagent.pga_job'. Have you run pgagent.sql on this database?",
LOG_ERROR
);
backendPid = res->GetString("pid");
res = NULL;
}
// Check for particular version
bool hasSchemaVerFunc = false;
std::string sqlCheckSchemaVersion =
"SELECT COUNT(*) " \
"FROM pg_proc " \
"WHERE proname = 'pgagent_schema_version' AND " \
" pronamespace = (SELECT oid " \
" FROM pg_namespace " \
" WHERE nspname = 'pgagent') AND " \
" prorettype = (SELECT oid " \
" FROM pg_type " \
" WHERE typname = 'int2') AND " \
" proargtypes = '' ";
res = serviceConn->Execute(sqlCheckSchemaVersion);
if (res)
{
if (res->IsValid() && res->GetString(0) == "1")
hasSchemaVerFunc = true;
res = NULL;
}
if (!hasSchemaVerFunc)
{
LogMessage(
"Couldn't find the function 'pgagent_schema_version' - please run ALTER EXTENSION \"pgagent\" UPDATE;.",
LOG_ERROR
);
}
std::string strPgAgentSchemaVer = serviceConn->ExecuteScalar(
"SELECT pgagent.pgagent_schema_version()"
);
std::string currentPgAgentVersion = (boost::format("%d") % PGAGENT_VERSION_MAJOR).str();
if (strPgAgentSchemaVer != currentPgAgentVersion)
{
LogMessage(
"Unsupported schema version: " + strPgAgentSchemaVer +
". Version " + currentPgAgentVersion +
" is required - please run ALTER EXTENSION \"pgagent\" UPDATE;.",
LOG_ERROR
);
}
#ifdef WIN32
Initialized();
#endif
MainRestartLoop(serviceConn);
}
LogMessage((boost::format(
"Couldn't create the primary connection [Attempt #%d]") % attemptCount
).str(), LOG_STARTUP);
DBconn::ClearConnections(true);
// Try establishing primary connection upto MAXATTEMPTS times
if (attemptCount++ >= (int)MAXATTEMPTS)
{
LogMessage(
"Stopping pgAgent: Couldn't establish the primary connection with the database server.",
LOG_ERROR
);
}
WaitAWhile();
} while (1);
}