Skip to content

Commit

Permalink
Merge branch 'tickets/DM-40787'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Sep 14, 2023
2 parents db15f28 + 0a51dad commit 6dbcb49
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 72 deletions.
53 changes: 3 additions & 50 deletions src/replica/Registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,16 @@
#include "replica/Configuration.h"
#include "replica/ConfigWorker.h"
#include "replica/HttpClient.h"
#include "util/common.h"

// LSST headers
#include "lsst/log/Log.h"

// Standard headers
#include <netdb.h>
#include <stdexcept>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <thread>

// Third-party headers
#include "boost/asio.hpp"

using namespace std;
using json = nlohmann::json;

Expand All @@ -50,49 +45,6 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.Registry");

string context(string const& func) { return "REGISTRY " + func + " "; }

/**
* @brief Get the FQDN of the current host.
* @note If there will be more than one canonical name associated with the host
* then the result will contain them all separated by comma, such as in:
* @code
* <fqdn-1>,<fqdn-2>,...
* @endcode
* @return The FQDN (or FQDNs)
* @throws std::runtime_error In case if the information couldn't be retreived.
*/
string get_current_host_fqdn() {
// Get the short name of the current host first.
boost::system::error_code ec;
string const hostname = boost::asio::ip::host_name(ec);
if (ec.value() != 0) {
throw runtime_error("Registry::" + string(__func__) +
" boost::asio::ip::host_name failed: " + ec.category().name() + string(":") +
to_string(ec.value()) + "[" + ec.message() + "]");
}

// Get the host's FQDN(s)
struct addrinfo hints;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC; /* either IPV4 or IPV6 */
hints.ai_socktype = SOCK_STREAM; /* IP */
hints.ai_flags = AI_CANONNAME; /* canonical name */
struct addrinfo* info;
while (true) {
int const retCode = getaddrinfo(hostname.data(), "http", &hints, &info);
if (retCode == 0) break;
if (retCode == EAI_AGAIN) continue;
throw runtime_error("Registry::" + string(__func__) +
" getaddrinfo failed: " + gai_strerror(retCode));
}
string fqdn;
for (struct addrinfo* p = info; p != NULL; p = p->ai_next) {
if (!fqdn.empty()) fqdn += ",";
fqdn += p->ai_canonname;
}
freeaddrinfo(info);
return fqdn;
}

} // namespace

namespace lsst::qserv::replica {
Expand Down Expand Up @@ -142,7 +94,8 @@ vector<WorkerInfo> Registry::workers() const {
}

void Registry::add(string const& name) const {
string const hostName = ::get_current_host_fqdn();
bool const all = true;
string const hostName = util::get_current_host_fqdn(all);
auto const config = _serviceProvider->config();
json const request =
json::object({{"instance_id", _serviceProvider->instanceId()},
Expand Down
2 changes: 2 additions & 0 deletions src/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ target_sources(util PRIVATE
Bug.cc
CmdLineParser.cc
Command.cc
common.cc
ConfigStore.cc
DynamicWorkQueue.cc
Error.cc
Expand All @@ -31,6 +32,7 @@ target_sources(util PRIVATE
)

target_link_libraries(util PUBLIC
boost_system
log
)

Expand Down
76 changes: 76 additions & 0 deletions src/util/common.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// -*- LSST-C++ -*-
/*
* LSST Data Management System
* Copyright 2015 LSST Corporation.
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
// Class header
#include "util/common.h"

// Standard headers
#include <netdb.h>
#include <stdexcept>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>

// Third-party headers
#include "boost/asio.hpp"

using namespace std;

namespace lsst::qserv::util {

string get_current_host_fqdn(bool all) {
// Get the short name of the current host first.
boost::system::error_code ec;
string const hostname = boost::asio::ip::host_name(ec);
if (ec.value() != 0) {
throw runtime_error("Registry::" + string(__func__) +
" boost::asio::ip::host_name failed: " + ec.category().name() + string(":") +
to_string(ec.value()) + "[" + ec.message() + "]");
}

// Get the host's FQDN(s)
struct addrinfo hints;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC; /* either IPV4 or IPV6 */
hints.ai_socktype = SOCK_STREAM; /* IP */
hints.ai_flags = AI_CANONNAME; /* canonical name */
struct addrinfo* info;
while (true) {
int const retCode = getaddrinfo(hostname.data(), "http", &hints, &info);
if (retCode == 0) break;
if (retCode == EAI_AGAIN) continue;
throw runtime_error("Registry::" + string(__func__) +
" getaddrinfo failed: " + gai_strerror(retCode));
}
string fqdn;
for (struct addrinfo* p = info; p != NULL; p = p->ai_next) {
if (!fqdn.empty()) {
if (!all) break;
fqdn += ",";
}
fqdn += p->ai_canonname;
}
freeaddrinfo(info);
return fqdn;
}

} // namespace lsst::qserv::util
17 changes: 17 additions & 0 deletions src/util/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@

namespace lsst::qserv::util {

/**
* Get the FQDN(s) of the current host.
*
* If there will be more than one canonical name associated with the host
* and if a value of the input parameter is set as "all=true" then the result
* will contain all names separated by comma, such as in:
* @code
* <fqdn-1>,<fqdn-2>,...
* @endcode
* @note In most setups there will be just one name.
* @param all The optional parameter that allows returning all names instead
* of the first one that was found.
* @return The FQDN (or FQDNs)
* @throws std::runtime_error In case if the information couldn't be retreived.
*/
std::string get_current_host_fqdn(bool all = false);

template <class Map>
typename Map::mapped_type const& getFromMap(Map const& m, typename Map::key_type const& key,
typename Map::mapped_type const& defValue) {
Expand Down
12 changes: 9 additions & 3 deletions src/wbase/FileChannelShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.wbase.FileChannelShared");
/**
* Iterate over the result files at the results folder and remove those
* which satisfy the desired criteria.
* @note The folder must exist when this function gets called. Any other
* scenario means a configuration error or a problem with the infrastructure.
* Running into either of these problems should result in the abort of
* the application.
* @param context The calling context (used for logging purposes).
* @param fileCanBeRemoved The optional validator to be called for each candidate file.
* Note that missing validator means "yes" the candidate file can be removed.
* @return The total number of removed files.
* @throws std::runtime_error If the results folder doesn't exist.
*/
size_t cleanUpResultsImpl(string const& context, fs::path const& dirPath,
function<bool(string const&)> fileCanBeRemoved = nullptr) {
Expand All @@ -66,9 +71,10 @@ size_t cleanUpResultsImpl(string const& context, fs::path const& dirPath,
boost::system::error_code ec;
auto itr = fs::directory_iterator(dirPath, ec);
if (ec.value() != 0) {
LOGS(_log, LOG_LVL_WARN,
context << "failed to open the results folder " << dirPath << ", ec: " << ec << ".");
return numFilesRemoved;
string const err = context + "failed to open the results folder '" + dirPath.string() +
"', ec: " + to_string(ec.value()) + ".";
LOGS(_log, LOG_LVL_ERROR, err);
throw runtime_error(err);
}
for (auto&& entry : boost::make_iterator_range(itr, {})) {
auto filePath = entry.path();
Expand Down
24 changes: 6 additions & 18 deletions src/wbase/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

// Third-party headers
#include <boost/algorithm/string/replace.hpp>
#include "boost/asio.hpp"
#include "boost/filesystem.hpp"

// LSST headers
Expand All @@ -51,6 +50,7 @@
#include "proto/TaskMsgDigest.h"
#include "proto/worker.pb.h"
#include "util/Bug.h"
#include "util/common.h"
#include "util/HoldTrack.h"
#include "util/IterableFormatter.h"
#include "util/TimeUtils.h"
Expand All @@ -69,18 +69,6 @@ namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.wbase.Task");

string get_hostname() {
// Get the short name of the current host.
boost::system::error_code ec;
string const hostname = boost::asio::ip::host_name(ec);
if (ec.value() != 0) {
throw runtime_error("Task::" + string(__func__) +
" boost::asio::ip::host_name failed: " + ec.category().name() + string(":") +
to_string(ec.value()) + "[" + ec.message() + "]");
}
return hostname;
}

string buildResultFilePath(shared_ptr<lsst::qserv::proto::TaskMsg> const& taskMsg,
string const& resultsDirname) {
if (resultsDirname.empty()) return resultsDirname;
Expand Down Expand Up @@ -155,14 +143,14 @@ Task::Task(TaskMsgPtr const& t, int fragmentNumber, std::shared_ptr<UserQueryInf
auto const resultDeliveryProtocol = workerConfig->resultDeliveryProtocol();
if (resultDeliveryProtocol != wconfig::WorkerConfig::ResultDeliveryProtocol::SSI) {
_resultFilePath = ::buildResultFilePath(t, workerConfig->resultsDirname());
auto const fqdn = util::get_current_host_fqdn();
if (resultDeliveryProtocol == wconfig::WorkerConfig::ResultDeliveryProtocol::XROOT) {
// NOTE: one extra '/' after the <server>[:<port>] spec is required to make
// NOTE: one extra '/' after the <host>[:<port>] spec is required to make
// a "valid" XROOTD url.
_resultFileXrootUrl = "xroot://" + ::get_hostname() + ":" +
to_string(workerConfig->resultsXrootdPort()) + "/" + _resultFilePath;
_resultFileXrootUrl = "xroot://" + fqdn + ":" + to_string(workerConfig->resultsXrootdPort()) +
"/" + _resultFilePath;
} else if (resultDeliveryProtocol == wconfig::WorkerConfig::ResultDeliveryProtocol::HTTP) {
_resultFileHttpUrl =
"http://" + ::get_hostname() + ":" + to_string(resultsHttpPort) + _resultFilePath;
_resultFileHttpUrl = "http://" + fqdn + ":" + to_string(resultsHttpPort) + _resultFilePath;
} else {
throw std::runtime_error("wbase::Task::Task: unsupported results delivery protocol: " +
wconfig::WorkerConfig::protocol2str(resultDeliveryProtocol));
Expand Down
4 changes: 3 additions & 1 deletion src/xrdsvc/SsiService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ SsiService::SsiService(XrdSsiLogger* log)
// ATTENTION: this is the blocking operation since it needs to be run before accepting
// new queries to ensure that worker had sufficient resources to process those.
if (workerConfig->resultsCleanUpOnStart()) {
wbase::FileChannelShared::cleanUpResultsOnWorkerRestart();
if (workerConfig->resultDeliveryProtocol() != wconfig::WorkerConfig::ResultDeliveryProtocol::SSI) {
wbase::FileChannelShared::cleanUpResultsOnWorkerRestart();
}
}
}

Expand Down

0 comments on commit 6dbcb49

Please sign in to comment.