Skip to content

Commit

Permalink
Temporary commit: Switched the control plane protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Nov 23, 2023
1 parent 4571647 commit a3a3954
Show file tree
Hide file tree
Showing 39 changed files with 735 additions and 3,831 deletions.
75 changes: 0 additions & 75 deletions src/replica/AddReplicaQservHttpMgtRequest.cc

This file was deleted.

110 changes: 0 additions & 110 deletions src/replica/AddReplicaQservHttpMgtRequest.h

This file was deleted.

67 changes: 10 additions & 57 deletions src/replica/AddReplicaQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,10 @@
// Class header
#include "replica/AddReplicaQservMgtRequest.h"

// Third party headers
#include "XrdSsi/XrdSsiProvider.hh"
#include "XrdSsi/XrdSsiService.hh"

// Qserv headers
#include "global/ResourceUnit.h"
#include "proto/worker.pb.h"
#include "replica/Configuration.h"
#include "replica/ServiceProvider.h"

// LSST headers
#include "lsst/log/Log.h"

using namespace nlohmann;
using namespace std;

namespace {
Expand All @@ -46,21 +37,20 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.AddReplicaQservMgtRequest");
namespace lsst::qserv::replica {

AddReplicaQservMgtRequest::Ptr AddReplicaQservMgtRequest::create(
ServiceProvider::Ptr const& serviceProvider, string const& worker, unsigned int chunk,
shared_ptr<ServiceProvider> const& serviceProvider, string const& worker, unsigned int chunk,
vector<string> const& databases, AddReplicaQservMgtRequest::CallbackType const& onFinish) {
return AddReplicaQservMgtRequest::Ptr(
new AddReplicaQservMgtRequest(serviceProvider, worker, chunk, databases, onFinish));
}

AddReplicaQservMgtRequest::AddReplicaQservMgtRequest(ServiceProvider::Ptr const& serviceProvider,
AddReplicaQservMgtRequest::AddReplicaQservMgtRequest(shared_ptr<ServiceProvider> const& serviceProvider,
string const& worker, unsigned int chunk,
vector<string> const& databases,
AddReplicaQservMgtRequest::CallbackType const& onFinish)
: QservMgtRequest(serviceProvider, "QSERV_ADD_REPLICA", worker),
_chunk(chunk),
_databases(databases),
_onFinish(onFinish),
_qservRequest(nullptr) {}
_onFinish(onFinish) {}

list<pair<string, string>> AddReplicaQservMgtRequest::extendedPersistentState() const {
list<pair<string, string>> result;
Expand All @@ -71,52 +61,15 @@ list<pair<string, string>> AddReplicaQservMgtRequest::extendedPersistentState()
return result;
}

void AddReplicaQservMgtRequest::startImpl(replica::Lock const& lock) {
auto const request = shared_from_base<AddReplicaQservMgtRequest>();

_qservRequest = xrdreq::AddChunkGroupQservRequest::create(
chunk(), databases(), [request](proto::WorkerCommandStatus::Code code, string const& error) {
if (request->state() == State::FINISHED) return;
replica::Lock lock(request->_mtx, request->context() + string(__func__) + "[callback]");
if (request->state() == State::FINISHED) return;

switch (code) {
case proto::WorkerCommandStatus::SUCCESS:
request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS);
break;
case proto::WorkerCommandStatus::INVALID:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD, error);
break;
case proto::WorkerCommandStatus::IN_USE:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_CHUNK_IN_USE, error);
break;
case proto::WorkerCommandStatus::ERROR:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error);
break;
default:
throw logic_error("AddReplicaQservMgtRequest::" + string(__func__) +
" unhandled request completion code: " +
proto::WorkerCommandStatus_Code_Name(code));
}
});
XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker()));
service()->ProcessRequest(*_qservRequest, resource);
}

void AddReplicaQservMgtRequest::finishImpl(replica::Lock const& lock) {
switch (extendedState()) {
case ExtendedState::CANCELLED:
case ExtendedState::TIMEOUT_EXPIRED:
if (_qservRequest) _qservRequest->cancel();
break;
default:
break;
}
void AddReplicaQservMgtRequest::createHttpReqImpl(replica::Lock const& lock) {
string const method = "POST";
string const target = "/replica";
json const data = json::object({{"chunk", _chunk}, {"databases", _databases}});
createHttpReq(lock, method, target, data);
}

void AddReplicaQservMgtRequest::notify(replica::Lock const& lock) {
LOGS(_log, LOG_LVL_DEBUG, context() << __func__);

LOGS(_log, LOG_LVL_TRACE, context() << __func__);
notifyDefaultImpl<AddReplicaQservMgtRequest>(lock, _onFinish);
}

Expand Down
37 changes: 19 additions & 18 deletions src/replica/AddReplicaQservMgtRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,21 @@
#define LSST_QSERV_REPLICA_ADDREPLICAQSERVMGTREQUEST_H

// System headers
#include <list>
#include <memory>
#include <string>
#include <vector>
#include <utility>

// Third party headers
#include "nlohmann/json.hpp"

// Qserv headers
#include "replica/QservMgtRequest.h"
#include "replica/ServiceProvider.h"
#include "xrdreq/ChunkGroupQservRequest.h"

namespace lsst::qserv::replica {
class ServiceProvider;
} // namespace lsst::qserv::replica

// This header declarations

Expand All @@ -50,7 +57,7 @@ class AddReplicaQservMgtRequest : public QservMgtRequest {
AddReplicaQservMgtRequest(AddReplicaQservMgtRequest const&) = delete;
AddReplicaQservMgtRequest& operator=(AddReplicaQservMgtRequest const&) = delete;

~AddReplicaQservMgtRequest() final = default;
virtual ~AddReplicaQservMgtRequest() final = default;

/**
* Static factory method is needed to prevent issues with the lifespan
Expand All @@ -65,7 +72,7 @@ class AddReplicaQservMgtRequest : public QservMgtRequest {
* @param onFinish (optional) callback function to be called upon request completion.
* @return A pointer to the created object.
*/
static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& worker,
static Ptr create(std::shared_ptr<ServiceProvider> const& serviceProvider, std::string const& worker,
unsigned int chunk, std::vector<std::string> const& databases,
CallbackType const& onFinish = nullptr);

Expand All @@ -76,32 +83,26 @@ class AddReplicaQservMgtRequest : public QservMgtRequest {
std::vector<std::string> const& databases() const { return _databases; }

/// @see QservMgtRequest::extendedPersistentState()
std::list<std::pair<std::string, std::string>> extendedPersistentState() const final;
virtual std::list<std::pair<std::string, std::string>> extendedPersistentState() const final;

protected:
/// @see QservMgtRequest::startImpl
void startImpl(replica::Lock const& lock) final;

/// @see QservMgtRequest::finishImpl
void finishImpl(replica::Lock const& lock) final;
/// @see QservMgtRequest::createHttpReqImpl
virtual void createHttpReqImpl(replica::Lock const& lock) final;

/// @see QservMgtRequest::notify
void notify(replica::Lock const& lock) final;
virtual void notify(replica::Lock const& lock) final;

private:
/// @see AddReplicaQservMgtRequest::create()
AddReplicaQservMgtRequest(ServiceProvider::Ptr const& serviceProvider, std::string const& worker,
unsigned int chunk, std::vector<std::string> const& databases,
CallbackType const& onFinish);
AddReplicaQservMgtRequest(std::shared_ptr<ServiceProvider> const& serviceProvider,
std::string const& worker, unsigned int chunk,
std::vector<std::string> const& databases, CallbackType const& onFinish);

// Input parameters

unsigned int const _chunk;
std::vector<std::string> const _databases;
CallbackType _onFinish; /// @note is reset when the request finishes

/// A request to the remote services
xrdreq::AddChunkGroupQservRequest::Ptr _qservRequest;
CallbackType _onFinish; ///< The callback is reset when the request finishes.
};

} // namespace lsst::qserv::replica
Expand Down
Loading

0 comments on commit a3a3954

Please sign in to comment.