Skip to content

Commit

Permalink
Merge pull request #663 from dongahn/reconstruct2
Browse files Browse the repository at this point in the history
Add basic state recovery support
  • Loading branch information
mergify[bot] authored Jun 11, 2020
2 parents cd3c95c + 8ef2d7a commit 577d821
Show file tree
Hide file tree
Showing 31 changed files with 1,153 additions and 45 deletions.
6 changes: 5 additions & 1 deletion qmanager/modules/qmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ extern "C" {
#include <jansson.h>
}

#include <sstream>

#include "qmanager/policies/base/queue_policy_base.hpp"
#include "qmanager/policies/base/queue_policy_base_impl.hpp"
#include "qmanager/policies/queue_policy_factory_impl.hpp"
Expand Down Expand Up @@ -100,7 +102,9 @@ static int process_config_file (std::shared_ptr<qmanager_ctx_t> &ctx)
optmgr_kv_t<qmanager_opts_t> opts_store;
std::string info_str = "";
json_object_foreach (conf, k, v) {
std::string value = json_string_value (v);
std::string value = json_dumps (v, JSON_ENCODE_ANY|JSON_COMPACT);
if (json_typeof (v) == JSON_STRING)
value = value.substr (1, value.length () - 2);
if ( (rc = opts_store.put (k, value)) < 0) {
flux_log_error (ctx->h, "%s: optmgr_kv_t::put (%s, %s)",
__FUNCTION__, k, value.c_str ());
Expand Down
53 changes: 39 additions & 14 deletions qmanager/modules/qmanager_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern "C" {
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <jansson.h>
}

#include "qmanager/modules/qmanager_callbacks.hpp"
Expand Down Expand Up @@ -96,30 +97,54 @@ int qmanager_cb_t::post_sched_loop (flux_t *h, schedutil_t *schedutil,
return rc;
}

// FIXME: This will be expanded when we implement full scheduler
// resilency schemes: Issue #470.
// Until PR #625 is merged and Issue 240 of RFC project (adding the
// queue name of the job to R), we insert the job into the default queue.
int qmanager_cb_t::jobmanager_hello_cb (flux_t *h,
flux_jobid_t id, int prio, uint32_t uid,
double ts, const char *R, void *arg)

{
int rc = -1;
qmanager_cb_ctx_t *ctx = nullptr;
ctx = static_cast<qmanager_cb_ctx_t *> (arg);
std::shared_ptr<job_t> running_job
= std::make_shared<job_t> (job_state_kind_t::
RUNNING, id, uid, prio, ts, R);
std::string queue_name = ctx->opts.get_opt ().get_default_queue ();
auto &queue = ctx->queues.at (queue_name);
int rc = 0;
json_t *o = NULL;
json_error_t err;
std::string R_out;
char *qn_attr = NULL;
std::string queue_name;
std::shared_ptr<queue_policy_base_t> queue;
std::shared_ptr<job_t> running_job = nullptr;
qmanager_cb_ctx_t *ctx = static_cast<qmanager_cb_ctx_t *> (arg);

if (queue->reconstruct (running_job) < 0) {
if ( (o = json_loads (R, 0, &err)) == NULL) {
rc = -1;
errno = EPROTO;
flux_log (h, LOG_ERR, "%s: parsing R for job (id=%jd): %s %s@%d:%d",
__FUNCTION__, static_cast<intmax_t> (id),
err.text, err.source, err.line, err.column);
goto out;
}
if ( (rc = json_unpack (o, "{ s?:{s?:{s?:{s?:s}}} }",
"attributes",
"system",
"scheduler",
"queue", &qn_attr)) < 0) {
json_decref (o);
errno = EPROTO;
flux_log (h, LOG_ERR, "%s: json_unpack for attributes", __FUNCTION__);
goto out;
}

queue_name = qn_attr? qn_attr : ctx->opts.get_opt ().get_default_queue ();
json_decref (o);
queue = ctx->queues.at (queue_name);
running_job = std::make_shared<job_t> (job_state_kind_t::RUNNING,
id, uid, prio, ts, R);

if ( (rc = queue->reconstruct (static_cast<void *> (h),
running_job, R_out)) < 0) {
flux_log_error (h, "%s: reconstruct (id=%jd queue=%s)", __FUNCTION__,
static_cast<intmax_t> (id), queue_name.c_str ());
goto out;
}
rc = 0;
flux_log (h, LOG_DEBUG, "requeue success (queue=%s id=%jd)",
queue_name.c_str (), static_cast<intmax_t> (id));

out:
return rc;
Expand Down
41 changes: 37 additions & 4 deletions qmanager/policies/base/queue_policy_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class queue_policy_base_impl_t
const std::shared_ptr<job_t> lookup (flux_jobid_t id);

protected:
int reconstruct (std::shared_ptr<job_t> running_job);
int reconstruct_queue (std::shared_ptr<job_t> running_job);
std::shared_ptr<job_t> pending_pop ();
std::shared_ptr<job_t> alloced_pop ();
std::shared_ptr<job_t> rejected_pop ();
Expand Down Expand Up @@ -181,6 +181,28 @@ class queue_policy_base_t : public detail::queue_policy_base_impl_t
*/
virtual int run_sched_loop (void *h, bool use_alloced_queue) = 0;

/*! Resource reconstruct interface that must be implemented by
* derived classes.
*
* \param h Opaque handle. How it is used is an implementation
* detail. However, when it is used within a Flux's
* service module such as qmanager, it is expected
* to be a pointer to a flux_t object.
* \param job shared pointer to a running job whose resource
* state is being requested to be reconstructed.
* job->schedule.R is the requested R.
* \param ret_R Replied R (must be equal to job->schedule.R
* if succeeded).
* \return 0 on success; -1 on error.
* EINVAL: invalid argument.
* ENOMEM: out of memory.
* ERANGE: out of range request.
* EPROTO: job->schedule.R doesn't comply.
* ENOTSUP: job->schedule.R has unsupported feature.
*/
virtual int reconstruct_resource (void *h, std::shared_ptr<job_t> job,
std::string &ret_R) = 0;

/*! Set queue parameters. Can be called multiple times.
*
* \param params comma-delimited key-value pairs string
Expand Down Expand Up @@ -241,12 +263,23 @@ class queue_policy_base_t : public detail::queue_policy_base_impl_t
/*! Append a job into the internal running-job queue to reconstruct
* the queue state.
*
* \param running_job
* a shared pointer pointing to a job_t object.
* \param h Opaque handle. How it is used is an implementation
* detail. However, when it is used within a Flux's
* service module such as qmanager, it is expected
* to be a pointer to a flux_t object.
* \param job shared pointer to a running job whose resource
* state is being requested to be reconstructed.
* job->schedule.R is the requested R.
* \param ret_R replied R (must be equal to job->schedule.R
* if succeeded).
* \return 0 on success; -1 on error.
* EINVAL: invalid argument.
* ENOMEM: out of memory.
* ERANGE: out of range request.
* EPROTO: job->schedule.R doesn't comply.
* ENOTSUP: job->schedule.R has unsupported feature.
*/
int reconstruct (std::shared_ptr<job_t> running_job);
int reconstruct (void *h, std::shared_ptr<job_t> job, std::string &R_out);

/*! Pop the first job from the pending job queue. The popped
* job is completely graduated from the queue policy layer.
Expand Down
35 changes: 28 additions & 7 deletions qmanager/policies/base/queue_policy_base_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,13 @@ const std::shared_ptr<job_t> queue_policy_base_t::lookup (flux_jobid_t id)
return detail::queue_policy_base_impl_t::lookup (id);
}

int queue_policy_base_t::reconstruct (std::shared_ptr<job_t> running_job)
int queue_policy_base_t::reconstruct (void *h, std::shared_ptr<job_t> job,
std::string &R_out)
{
return detail::queue_policy_base_impl_t::reconstruct (running_job);
int rc = 0;
if ( (rc = reconstruct_resource (h, job, R_out)) < 0)
return rc;
return detail::queue_policy_base_impl_t::reconstruct_queue (job);
}

std::shared_ptr<job_t> queue_policy_base_t::pending_pop ()
Expand Down Expand Up @@ -267,18 +271,35 @@ const std::shared_ptr<job_t> queue_policy_base_impl_t::lookup (flux_jobid_t id)
return m_jobs[id];
}

int queue_policy_base_impl_t::reconstruct (std::shared_ptr<job_t> job)
int queue_policy_base_impl_t::reconstruct_queue (std::shared_ptr<job_t> job)
{
int rc = -1;
std::pair<std::map<uint64_t, flux_jobid_t>::iterator, bool> ret;
std::pair<std::map<flux_jobid_t,
std::shared_ptr<job_t>>::iterator, bool> ret2;

if (job == nullptr || m_jobs.find (job->id) != m_jobs.end ()) {
errno = EINVAL;
goto out;
}
job->t_stamps.running_ts = m_rq_cnt++;
m_running.insert (std::pair<uint64_t, flux_jobid_t>(job->t_stamps.running_ts,
job->id));
m_jobs.insert (std::pair<flux_jobid_t, std::shared_ptr<job_t>> (job->id,
job));

ret = m_running.insert (std::pair<uint64_t, flux_jobid_t>(
job->t_stamps.running_ts, job->id));
if (ret.second == false) {
rc = -1;
errno = ENOMEM;
goto out;
}
ret2 = m_jobs.insert (std::pair<flux_jobid_t, std::shared_ptr<job_t>> (
job->id, job));
if (ret2.second == false) {
m_running.erase (ret.first);
rc = -1;
errno = ENOMEM;
goto out;
}

rc = 0;
out:
return rc;
Expand Down
2 changes: 2 additions & 0 deletions qmanager/policies/queue_policy_bf_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class queue_policy_bf_base_t : public queue_policy_base_t
public:
virtual ~queue_policy_bf_base_t ();
virtual int run_sched_loop (void *h, bool use_alloced_queue);
virtual int reconstruct_resource (void *h, std::shared_ptr<job_t> job,
std::string &R_out);
virtual int apply_params ();

protected:
Expand Down
13 changes: 11 additions & 2 deletions qmanager/policies/queue_policy_bf_base_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ int queue_policy_bf_base_t<reapi_type>::cancel_completed_jobs (void *h)
// Pop newly completed jobs (e.g., per a free request from job-manager
// as received by qmanager) to remove them from the resource infrastructure.
while ((job = complete_pop ()) != nullptr)
rc += reapi_type::cancel (h, job->id);
rc += reapi_type::cancel (h, job->id, true);
return rc;
}

Expand All @@ -56,7 +56,7 @@ int queue_policy_bf_base_t<reapi_type>::cancel_reserved_jobs (void *h)
int rc = 0;
std::map<uint64_t, flux_jobid_t>::const_iterator citer;
for (citer = m_reserved.begin (); citer != m_reserved.end (); citer++)
rc += reapi_type::cancel (h, citer->second);
rc += reapi_type::cancel (h, citer->second, false);
m_reserved.clear ();
return rc;
}
Expand Down Expand Up @@ -183,6 +183,15 @@ int queue_policy_bf_base_t<reapi_type>::run_sched_loop (void *h,
return rc;
}

template<class reapi_type>
int queue_policy_bf_base_t<reapi_type>::reconstruct_resource (
void *h, std::shared_ptr< job_t> job, std::string &R_out)
{
return reapi_type::update_allocate (h, job->id, job->schedule.R,
job->schedule.at,
job->schedule.ov, R_out);
}

} // namespace Flux::queue_manager::detail
} // namespace Flux::queue_manager
} // namespace Flux
Expand Down
2 changes: 2 additions & 0 deletions qmanager/policies/queue_policy_fcfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class queue_policy_fcfs_t : public queue_policy_base_t
public:
virtual ~queue_policy_fcfs_t ();
virtual int run_sched_loop (void *h, bool use_alloced_queue);
virtual int reconstruct_resource (void *h, std::shared_ptr<job_t> job,
std::string &R_out);
virtual int apply_params ();

private:
Expand Down
12 changes: 11 additions & 1 deletion qmanager/policies/queue_policy_fcfs_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ int queue_policy_fcfs_t<reapi_type>::cancel_completed_jobs (void *h)
// Pop newly completed jobs (e.g., per a free request from job-manager
// as received by qmanager) to remove them from the resource infrastructure.
while ((job = complete_pop ()) != nullptr)
rc += reapi_type::cancel (h, job->id);
rc += reapi_type::cancel (h, job->id, true);
return rc;
}

Expand Down Expand Up @@ -125,6 +125,16 @@ int queue_policy_fcfs_t<reapi_type>::run_sched_loop (void *h,
return rc;
}

template<class reapi_type>
int queue_policy_fcfs_t<reapi_type>::reconstruct_resource (
void *h, std::shared_ptr<job_t> job, std::string &R_out)
{
return reapi_type::update_allocate (h, job->id, job->schedule.R,
job->schedule.at,
job->schedule.ov, R_out);
}


} // namespace Flux::queue_manager::detail
} // namespace Flux::queue_manager
} // namespace Flux
Expand Down
25 changes: 24 additions & 1 deletion resource/hlapi/bindings/c++/reapi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,39 @@ class reapi_t {
return -1;
}

/*! Update the resource state with R.
*
* \param h Opaque handle. How it is used is an implementation
* detail. However, when it is used within a Flux's
* service module, it is expected to be a pointer
* to a flux_t object.
* \param jobid jobid of the uint64_t type.
* \param R R String of std::string.
* \param at return the scheduled time.
* \param ov return the performance overhead
* in terms of elapse time needed to complete
* the update operation.
* \param R_out return the updated R string.
* \return 0 on success; -1 on error.
*/
static int update_allocate (void *h, const uint64_t jobid,
const std::string &R, int64_t &at, double &ov,
std::string &R_out)
{
return -1;
}

/*! Cancel the allocation or reservation corresponding to jobid.
*
* \param h Opaque handle. How it is used is an implementation
* detail. However, when it is used within a Flux's
* service module, it is expected to be a pointer
* to a flux_t object.
* \param jobid jobid of the uint64_t type.
* \param noent_ok don't return an error with nonexistent jobid
* \return 0 on success; -1 on error.
*/
static int cancel (void *h, const uint64_t jobid)
static int cancel (void *h, const uint64_t jobid, bool noent_ok)
{
return -1;
}
Expand Down
5 changes: 4 additions & 1 deletion resource/hlapi/bindings/c++/reapi_cli.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ class reapi_cli_t : public reapi_t {
const std::string &jobspec,
const uint64_t jobid, bool &reserved,
std::string &R, int64_t &at, double &ov);
static int cancel (void *h, const int64_t jobid);
static int update_allocate (void *h, const uint64_t jobid,
const std::string &R, int64_t &at, double &ov,
std::string &R_out);
static int cancel (void *h, const int64_t jobid, bool noent_ok);
static int info (void *h, const int64_t jobid,
bool &reserved, int64_t &at, double &ov);
static int stat (void *h, int64_t &V, int64_t &E,int64_t &J,
Expand Down
9 changes: 8 additions & 1 deletion resource/hlapi/bindings/c++/reapi_cli_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ int reapi_cli_t::match_allocate (void *h, bool orelse_reserve,
return NOT_YET_IMPLEMENTED;
}

int reapi_cli_t::cancel (void *h, const int64_t jobid)
int reapi_cli_t::update_allocate (void *h, const uint64_t jobid,
const std::string &R, int64_t &at, double &ov,
std::string &R_out)
{
return NOT_YET_IMPLEMENTED;
}

int reapi_cli_t::cancel (void *h, const int64_t jobid, bool noent_ok)
{
return NOT_YET_IMPLEMENTED;
}
Expand Down
5 changes: 4 additions & 1 deletion resource/hlapi/bindings/c++/reapi_module.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ class reapi_module_t : public reapi_t {
const std::string &jobspec,
const uint64_t jobid, bool &reserved,
std::string &R, int64_t &at, double &ov);
static int cancel (void *h, const uint64_t jobid);
static int update_allocate (void *h, const uint64_t jobid,
const std::string &R, int64_t &at, double &ov,
std::string &R_out);
static int cancel (void *h, const uint64_t jobid, bool noent_ok);
static int info (void *h, const uint64_t jobid,
bool &reserved, int64_t &at, double &ov);
static int stat (void *h, int64_t &V, int64_t &E,int64_t &J,
Expand Down
Loading

0 comments on commit 577d821

Please sign in to comment.