Skip to content

Commit

Permalink
Merge branch 'tickets/DM-41291'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Dec 7, 2023
2 parents 894bb13 + 4cb58b7 commit cf333ec
Show file tree
Hide file tree
Showing 195 changed files with 4,762 additions and 8,403 deletions.
21 changes: 21 additions & 0 deletions admin/local/docker/compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ services:
--db-uri mysql://[email protected]:3306
--db-admin-uri mysql://root:[email protected]:3306
--vnid-config "@/usr/local/lib64/libreplica.so {{db_uri}}/qservw_worker 0 0"
--repl-instance-id qserv_proj
--repl-auth-key replauthkey
--repl-admin-auth-key=repladminauthkey
--repl-registry-host repl-mgr-registry
--repl-registry-port 25082
--results-dirname /qserv/data/results
--cmsd-manager-name manager-xrootd
--cmsd-manager-count 1
Expand Down Expand Up @@ -123,6 +128,11 @@ services:
--db-uri mysql://qsmaster@worker-db-0:3306
--vnid-config "@/usr/local/lib64/libreplica.so mysql://[email protected]:3306/qservw_worker 0 0"
--results-dirname /qserv/data/results
--repl-instance-id qserv_proj
--repl-auth-key replauthkey
--repl-admin-auth-key=repladminauthkey
--repl-registry-host repl-mgr-registry
--repl-registry-port 25082
--cmsd-manager-name manager-xrootd
--cmsd-manager-count 1
network_mode: "service:worker-xrootd-0"
Expand Down Expand Up @@ -186,6 +196,11 @@ services:
--db-uri mysql://[email protected]:3306?socket={{db_socket}}
--db-admin-uri mysql://root:[email protected]:3306?socket={{db_socket}}
--vnid-config "@/usr/local/lib64/libreplica.so mysql://[email protected]:3306/qservw_worker 0 0"
--repl-instance-id qserv_proj
--repl-auth-key replauthkey
--repl-admin-auth-key=repladminauthkey
--repl-registry-host repl-mgr-registry
--repl-registry-port 25082
--results-dirname /qserv/data/results
--cmsd-manager-name manager-xrootd
--cmsd-manager-count 1
Expand Down Expand Up @@ -218,6 +233,12 @@ services:
--db-uri mysql://qsmaster@worker-db-1:3306?socket=/qserv/mariadb/run/mysqld.sock
--vnid-config "@/usr/local/lib64/libreplica.so mysql://[email protected]:3306/qservw_worker 0 0"
--results-dirname /qserv/data/results
--repl-instance-id qserv_proj
--repl-auth-key replauthkey
--repl-admin-auth-key=repladminauthkey
--repl-registry-host repl-mgr-registry
--repl-registry-port 25082
--cmsd-manager-name manager-xrootd
--cmsd-manager-count 1
network_mode: "service:worker-xrootd-1"
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ add_subdirectory(ccontrol)
add_subdirectory(css)
add_subdirectory(czar)
add_subdirectory(global)
add_subdirectory(http)
add_subdirectory(memman)
add_subdirectory(mimic)
add_subdirectory(mysql)
Expand Down
19 changes: 18 additions & 1 deletion src/admin/python/lsst/qserv/admin/cli/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@
options_file_option,
mysql_monitor_password_option,
reload_option,
repl_admin_auth_key_option,
repl_auth_key_option,
repl_admin_auth_key_option,
repl_connection_option,
repl_instance_id_option,
repl_registry_host_option,
repl_registry_port_option,
repl_http_port_option,
results_dirname_option,
results_protocol_option,
run_option,
Expand Down Expand Up @@ -553,6 +557,13 @@ def xrootd_manager(ctx: click.Context, **kwargs: Any) -> None:
@pass_context
@db_uri_option(help=worker_db_help)
@vnid_config_option(required=True)
@vnid_config_option(required=True)
@repl_instance_id_option(required=True)
@repl_auth_key_option(required=True)
@repl_admin_auth_key_option(required=True)
@repl_registry_host_option(required=True)
@repl_registry_port_option(required=True)
@repl_http_port_option(required=True)
@results_dirname_option()
@results_protocol_option()
@cmsd_manager_name_option()
Expand Down Expand Up @@ -587,6 +598,12 @@ def worker_cmsd(ctx: click.Context, **kwargs: Any) -> None:
@db_uri_option(help=worker_db_help)
@db_admin_uri_option(help=admin_worker_db_help)
@vnid_config_option(required=True)
@repl_instance_id_option(required=True)
@repl_auth_key_option(required=True)
@repl_admin_auth_key_option(required=True)
@repl_registry_host_option(required=True)
@repl_registry_port_option(required=True)
@repl_http_port_option(required=True)
@results_dirname_option()
@results_protocol_option()
@cmsd_manager_name_option()
Expand Down
70 changes: 56 additions & 14 deletions src/admin/python/lsst/qserv/admin/cli/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,62 @@ def __call__(self, f: Callable) -> Callable:
)


repl_instance_id_option = partial(
click.option,
"--repl-instance-id",
help="The unique identifier of the current Replication System's domain. "
"The identifier is a part of the security context preventing accidental 'cross-talks' "
"between unrelated domains.",
default="",
show_default=True,
)


repl_auth_key_option = partial(
click.option,
"--repl-auth-key",
help="The authorization key. The key is a part of the security context "
"preventing unauthorized operations witin the current Replication System's "
"domain.",
default="",
show_default=True,
)

repl_admin_auth_key_option = partial(
click.option,
"--repl-admin-auth-key",
help="The admin authorizaiton key for the replication-ingest system.",
default="",
show_default=True,
)


repl_registry_host_option = partial(
click.option,
"--repl-registry-host",
help="The FQDN of a host where the Replication System's Registry service is run.",
)


repl_registry_port_option = partial(
click.option,
"--repl-registry-port",
help="The port number of the Replication System's Registry service.",
default=8080,
show_default=True,
)


repl_http_port_option = partial(
click.option,
"--repl-http-port",
help="The port number of the of the worker control service used by the Replication System "
"and worker monitoring applications.",
default=0,
show_default=True,
)


results_dirname_option = partial(
click.option,
"--results-dirname",
Expand Down Expand Up @@ -307,17 +363,3 @@ def decorators(self) -> List[Callable]:
default="/config-etc/log/log.cnf",
show_default=True,
)


repl_auth_key_option = partial(
click.option,
"--repl-auth-key",
help="The authorizaiton key for the replication-ingest system."
)


repl_admin_auth_key_option = partial(
click.option,
"--repl-admin-auth-key",
help="The admin authorizaiton key for the replication-ingest system."
)
2 changes: 1 addition & 1 deletion src/admin/python/lsst/qserv/admin/replicationInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(
self.repl_ctrl = urlparse(repl_ctrl_uri)
self.auth_key = auth_key
self.admin_auth_key = admin_auth_key
self.repl_api_version = 26
self.repl_api_version = 27
_log.debug(f"ReplicationInterface %s", self.repl_ctrl)

def version(self) -> str:
Expand Down
52 changes: 52 additions & 0 deletions src/admin/templates/xrootd/etc/xrdssi.cf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,55 @@ protocol = {{ results_protocol }}
# Set to any value but 0 if result files (if any) left after the previous run of
# the worker had to be deleted from the corresponding folder.
clean_up_on_start = 1

[replication]

# This section contains parameters related the worker management service
# that is used by the Replication System to manage Qserv workers or to acquire
# monitoring stats from the workers. Workers are required to self register
# themselves with the Replication System's Registry that can be contacted
# at a location specified via connection parameters 'registry_host' and
# 'registry_port'. Configuration parameters 'instance_id', 'auth_key'
# and 'admin_auth_key' represent the security context of the worker control
# protocol. This context is required to be provided by workers when registering
# themselves with the Registry. The context is also used by worker management
# service when processing worker management requests sent to the service by
# the Replication Controller or relevant applications.
#
# Workers are requited to periodically update their registration status at
# the interval specified at 'registry_heartbeat_ival_sec'. Worker status
# includes (at least):
# - the unique identifier of the worker
# - the FQDN of a host (or a pod) where the worker is run
# - the HTTP port of the worker management service (parameter 'http_port')

# The unique instance of the Replication System's domain.
instance_id = {{ repl_instance_id }}

# The authorization key is required by the worker control protocol.
auth_key = {{ repl_auth_key }}
admin_auth_key = {{ repl_admin_auth_key }}

# The FQDN of a host where the Replication System's Registry service is run. Workers
# will self report own connection parameters to the Registry.
registry_host = {{ repl_registry_host }}

# The port number of the Replication System's Registry service.
registry_port = {{ repl_registry_port }}

# The hearbeat interval for self-registering worker management services at
# the Replication System's Registry. Note that the value must be strictly
# higher than 0.
registry_heartbeat_ival_sec = 1

# The port number of the worker control service used by the Replication
# Controller and the worker monitoring apps. If 0 is set as a value of
# the parameter then the first available port will be assumed. Note that
# the actual (weather explicitly specified or dynamically allocated) port
# number will be reported to the Replication Registry service.
# Note that the value must be strictly higher than 0.
http_port = {{ repl_http_port }}

# The number of the BOOST ASIO threads for serving HTTP requests.
# Note that the value must be strictly higher than 0.
num_http_threads = 2
9 changes: 5 additions & 4 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@
#include "global/clock_defs.h"
#include "global/debugUtil.h"
#include "global/MsgReceiver.h"
#include "http/Client.h"
#include "http/Method.h"
#include "proto/ProtoHeaderWrap.h"
#include "proto/ProtoImporter.h"
#include "proto/WorkerResponse.h"
#include "qdisp/CzarStats.h"
#include "qdisp/JobQuery.h"
#include "replica/HttpClient.h"
#include "rproc/InfileMerger.h"
#include "util/Bug.h"
#include "util/common.h"
Expand All @@ -57,7 +58,7 @@ using lsst::qserv::proto::ProtoHeaderWrap;
using lsst::qserv::proto::ProtoImporter;
using lsst::qserv::proto::Result;
using lsst::qserv::proto::WorkerResponse;
using lsst::qserv::replica::HttpClient;
namespace http = lsst::qserv::http;

using namespace std;

Expand Down Expand Up @@ -265,7 +266,7 @@ bool readHttpFileAndMerge(lsst::qserv::proto::Result const& result,
uint32_t msgSizeBytes = 0;
bool success = true;
try {
HttpClient reader("GET", httpUrl);
http::Client reader(http::Method::GET, httpUrl);
reader.read([&](char const* inBuf, size_t inBufSize) {
char const* next = inBuf;
char const* const end = inBuf + inBufSize;
Expand Down Expand Up @@ -352,7 +353,7 @@ bool readHttpFileAndMerge(lsst::qserv::proto::Result const& result,
// Remove the file from the worker if it still exists. Report and ignore errors.
// The files will be garbage-collected by workers.
try {
HttpClient remover("DELETE", httpUrl);
http::Client remover(http::Method::DELETE, httpUrl);
remover.read([](char const* inBuf, size_t inBufSize) {});
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_WARN, context << "failed to remove " << httpUrl << ", ex: " << ex.what());
Expand Down
1 change: 1 addition & 0 deletions src/global/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ target_sources(global PRIVATE
debugUtil.cc
ResourceUnit.cc
sqltoken.cc
stringUtil.cc
)

target_link_libraries(global PUBLIC
Expand Down
20 changes: 0 additions & 20 deletions src/global/ResourceUnit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ std::string ResourceUnit::path() const {
case UNKNOWN:
ss << _pathSep << "UNKNOWN_RESOURCE_UNIT";
break;
case WORKER:
ss << _workerId;
break;
default:
::abort();
break;
Expand All @@ -107,8 +104,6 @@ std::string ResourceUnit::prefix(UnitType const& r) {
return "chk";
case UNKNOWN:
return "UNKNOWN";
case WORKER:
return "worker";
case QUERY:
return "query";
case GARBAGE:
Expand All @@ -121,10 +116,6 @@ std::string ResourceUnit::makePath(int chunk, std::string const& db) {
return _pathSep + prefix(UnitType::DBCHUNK) + _pathSep + db + _pathSep + std::to_string(chunk);
}

std::string ResourceUnit::makeWorkerPath(std::string const& id) {
return _pathSep + prefix(UnitType::WORKER) + _pathSep + id;
}

void ResourceUnit::setAsDbChunk(std::string const& db, int chunk) {
_unitType = DBCHUNK;
_db = db;
Expand Down Expand Up @@ -173,17 +164,6 @@ void ResourceUnit::_setFromPath(std::string const& path) {
}
_chunk = t.tokenAsInt();
_ingestLeafAndKeys(t.token());
} else if (rTypeString == prefix(WORKER)) {
_unitType = WORKER;
if (_markGarbageIfDone(t)) {
return;
}
t.next();
_workerId = t.token();
if (_workerId.empty()) {
_unitType = GARBAGE;
return;
}
} else if (rTypeString == prefix(QUERY)) {
_unitType = QUERY;
if (!t.done()) {
Expand Down
7 changes: 1 addition & 6 deletions src/global/ResourceUnit.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace lsst::qserv {
class ResourceUnit {
public:
class Checker;
enum UnitType { GARBAGE, DBCHUNK, UNKNOWN, WORKER, QUERY };
enum UnitType { GARBAGE, DBCHUNK, UNKNOWN, QUERY };

ResourceUnit() = default;
explicit ResourceUnit(std::string const& path);
Expand All @@ -61,7 +61,6 @@ class ResourceUnit {
UnitType unitType() const { return _unitType; }
std::string const& db() const { return _db; }
int chunk() const { return _chunk; }
std::string const& workerId() const { return _workerId; }

/// Lookup extended path variables (?k=val syntax)
std::string var(std::string const& key) const;
Expand All @@ -72,9 +71,6 @@ class ResourceUnit {
/// @return the path of the database/chunk resource
static std::string makePath(int chunk, std::string const& db);

/// @return the path of the worker-specific resource
static std::string makeWorkerPath(std::string const& id);

// Setup a path of a certain type.
void setAsDbChunk(std::string const& db, int chunk = DUMMY_CHUNK);

Expand All @@ -88,7 +84,6 @@ class ResourceUnit {
UnitType _unitType = UnitType::GARBAGE; //< Type of unit
std::string _db; //< for DBCHUNK type
int _chunk = -1; //< for DBCHUNK type
std::string _workerId; //< for WORKER type

typedef std::map<std::string, std::string> VarMap;
VarMap _vars; //< Key-value specifiers
Expand Down
Loading

0 comments on commit cf333ec

Please sign in to comment.