Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-45929: Worker-centric view of the ingest contributions on the Qserv Web Dashboard #866

Merged
merged 6 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/admin/python/lsst/qserv/admin/replicationInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(
self.repl_ctrl = urlparse(repl_ctrl_uri)
self.auth_key = auth_key
self.admin_auth_key = admin_auth_key
self.repl_api_version = 36
self.repl_api_version = 37
_log.debug(f"ReplicationInterface %s", self.repl_ctrl)

def version(self) -> str:
Expand Down
2 changes: 1 addition & 1 deletion src/http/ChttpMetaModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ string const adminAuthKey;

namespace lsst::qserv::http {

unsigned int const ChttpMetaModule::version = 36;
unsigned int const ChttpMetaModule::version = 37;

void ChttpMetaModule::process(string const& context, nlohmann::json const& info, httplib::Request const& req,
httplib::Response& resp, string const& subModuleName) {
Expand Down
2 changes: 1 addition & 1 deletion src/http/MetaModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ string const adminAuthKey;

namespace lsst::qserv::http {

unsigned int const MetaModule::version = 36;
unsigned int const MetaModule::version = 37;

void MetaModule::process(string const& context, nlohmann::json const& info,
shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp,
Expand Down
153 changes: 125 additions & 28 deletions src/replica/contr/HttpIngestTransModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
#include "replica/jobs/AbortTransactionJob.h"
#include "replica/jobs/DirectorIndexJob.h"
#include "replica/mysql/DatabaseMySQL.h"
#include "replica/services/DatabaseServices.h"
#include "replica/services/ServiceProvider.h"
#include "replica/util/Mutex.h"
#include "replica/util/NamedMutexRegistry.h"
#include "util/String.h"

using namespace std;
using json = nlohmann::json;
Expand Down Expand Up @@ -93,7 +93,7 @@ json HttpIngestTransModule::executeImpl(string const& subModuleName) {

json HttpIngestTransModule::_getTransactions() {
debug(__func__);
checkApiVersion(__func__, 16);
checkApiVersion(__func__, 37);

auto const config = controller()->serviceProvider()->config();
auto const databaseServices = controller()->serviceProvider()->databaseServices();
Expand All @@ -106,6 +106,7 @@ json HttpIngestTransModule::_getTransactions() {
auto const longContribFormat = query().optionalUInt64("contrib_long", 0) != 0;
bool const includeContext = query().optionalUInt64("include_context", 0) != 0;
bool const includeLog = query().optionalUInt64("include_log", 0) != 0;
bool const includeExtensions = query().optionalUInt64("include_extensions", 0) != 0;
bool const includeWarnings = query().optionalUInt64("include_warnings", 0) != 0;
bool const includeRetries = query().optionalUInt64("include_retries", 0) != 0;

Expand All @@ -117,17 +118,24 @@ json HttpIngestTransModule::_getTransactions() {
debug(__func__, "contrib_long=" + bool2str(longContribFormat));
debug(__func__, "include_context=" + bool2str(includeContext));
debug(__func__, "include_log=" + bool2str(includeLog));
debug(__func__, "include_extensions=" + bool2str(includeExtensions));
debug(__func__, "include_warnings=" + bool2str(includeWarnings));
debug(__func__, "include_retries=" + bool2str(includeRetries));

auto const transStateSelector = _parseTransStateSelector("trans_state");
auto const contribStatusSelector = _parseContribStatusSelector("contrib_status");

vector<string> databases;
if (databaseName.empty()) {
databases = config->databases(family, allDatabases, isPublished);
} else {
databases.push_back(databaseName);
}

string const anyTableSelector;
string const anyWorkerSelector;
bool const allWorkers = true;
int const anyChunkSelector = -1;
json result;
result["databases"] = json::object();
for (auto&& databaseName : databases) {
Expand All @@ -138,11 +146,14 @@ json HttpIngestTransModule::_getTransactions() {
result["databases"][database.name]["is_published"] = database.isPublished ? 1 : 0;
result["databases"][database.name]["num_chunks"] = chunks.size();
result["databases"][database.name]["transactions"] = json::array();
for (auto&& transaction : databaseServices->transactions(database.name, includeContext, includeLog)) {
for (auto&& transaction :
databaseServices->transactions(database.name, includeContext, includeLog, transStateSelector)) {
json transJson = transaction.toJson();
if (includeContributions) {
transJson["contrib"] = _getTransactionContributions(transaction, longContribFormat,
includeWarnings, includeRetries);
transJson["contrib"] = _getTransactionContributions(
transaction, anyTableSelector, anyWorkerSelector, contribStatusSelector,
anyChunkSelector, longContribFormat, includeExtensions, includeWarnings,
includeRetries);
}
result["databases"][database.name]["transactions"].push_back(transJson);
}
Expand All @@ -152,41 +163,84 @@ json HttpIngestTransModule::_getTransactions() {

json HttpIngestTransModule::_getTransaction() {
debug(__func__);
checkApiVersion(__func__, 16);
checkApiVersion(__func__, 37);

auto const config = controller()->serviceProvider()->config();
auto const databaseServices = controller()->serviceProvider()->databaseServices();
auto const id = stoul(params().at("id"));
TransactionId const transactionId = stoul(params().at("id"));
auto const databaseName = query().optionalString("database");
auto const tableName = query().optionalString("table");
auto const workerName = query().optionalString("worker");
int const chunkSelector = query().optionalInt("chunk", -1);
auto const includeContributions = query().optionalUInt64("contrib", 0) != 0;
auto const longContribFormat = query().optionalUInt64("contrib_long", 0) != 0;
bool const includeContext = query().optionalUInt64("include_context", 0) != 0;
bool const includeLog = query().optionalUInt64("include_log", 0) != 0;
bool const includeExtensions = query().optionalUInt64("include_extensions", 0) != 0;
bool const includeWarnings = query().optionalUInt64("include_warnings", 0) != 0;
bool const includeRetries = query().optionalUInt64("include_retries", 0) != 0;
size_t const minRetries = query().optionalUInt64("min_retries", 0);
size_t const minWarnings = query().optionalUInt64("min_warnings", 0);
size_t const maxEntries = query().optionalUInt64("max_entries", 0);

debug(__func__, "id=" + to_string(id));
debug(__func__, "id=" + to_string(transactionId));
debug(__func__, "database=" + databaseName);
debug(__func__, "table=" + tableName);
debug(__func__, "worker=" + workerName);
debug(__func__, "chunk=" + to_string(chunkSelector));
debug(__func__, "contrib=" + bool2str(includeContributions));
debug(__func__, "contrib_long=" + bool2str(longContribFormat));
debug(__func__, "include_context=" + bool2str(includeContext));
debug(__func__, "include_log=" + bool2str(includeLog));
debug(__func__, "include_extensions=" + bool2str(includeExtensions));
debug(__func__, "include_warnings=" + bool2str(includeWarnings));
debug(__func__, "include_retries=" + bool2str(includeRetries));
debug(__func__, "min_retries=" + to_string(minRetries));
debug(__func__, "min_warnings=" + to_string(minWarnings));
debug(__func__, "max_entries=" + to_string(maxEntries));

auto const transStateSelector = _parseTransStateSelector("trans_state");
auto const contribStatusSelector = _parseContribStatusSelector("contrib_status");

if (databaseName.empty() && (transactionId == 0)) {
throw http::Error(__func__, "either 'id' or 'database' query parameter must be specified");
}

DatabaseInfo database;
vector<TransactionInfo> transactions;
if (transactionId != 0) {
auto transaction = databaseServices->transaction(transactionId, includeContext, includeLog);
database = config->databaseInfo(transaction.database);
if (!databaseName.empty() && (databaseName != database.name)) {
throw http::Error(__func__, "transaction id=" + to_string(transactionId) +
" is associated with database '" + database.name +
"' which is different from the requested database '" +
databaseName + "'");
}
transactions.push_back(move(transaction));
} else {
database = config->databaseInfo(databaseName);
transactions =
databaseServices->transactions(database.name, includeContext, includeLog, transStateSelector);
}

auto const transaction = databaseServices->transaction(id, includeContext, includeLog);
auto const database = config->databaseInfo(transaction.database);
bool const allWorkers = true;
vector<unsigned int> chunks;
databaseServices->findDatabaseChunks(chunks, transaction.database, allWorkers);
databaseServices->findDatabaseChunks(chunks, database.name, allWorkers);

json transJson = transaction.toJson();
if (includeContributions) {
transJson["contrib"] =
_getTransactionContributions(transaction, longContribFormat, includeWarnings, includeRetries);
}
json result;
result["databases"][transaction.database]["is_published"] = database.isPublished ? 1 : 0;
result["databases"][transaction.database]["num_chunks"] = chunks.size();
result["databases"][transaction.database]["transactions"].push_back(transJson);
result["databases"][database.name]["is_published"] = database.isPublished ? 1 : 0;
result["databases"][database.name]["num_chunks"] = chunks.size();
for (auto&& transaction : transactions) {
json transJson = transaction.toJson();
if (includeContributions) {
transJson["contrib"] = _getTransactionContributions(
transaction, tableName, workerName, contribStatusSelector, chunkSelector,
longContribFormat, includeExtensions, includeWarnings, includeRetries, minRetries,
minWarnings, maxEntries);
}
result["databases"][database.name]["transactions"].push_back(transJson);
}
return result;
}

Expand Down Expand Up @@ -452,18 +506,21 @@ json HttpIngestTransModule::_endTransaction() {

json HttpIngestTransModule::_getContribution() {
debug(__func__);
checkApiVersion(__func__, 16);
checkApiVersion(__func__, 37);

unsigned int const id = stoul(params().at("id"));
bool const includeExtensions = query().optionalUInt64("include_extensions", 1) != 0;
bool const includeWarnings = query().optionalUInt64("include_warnings", 0) != 0;
bool const includeRetries = query().optionalUInt64("include_retries", 0) != 0;

debug(__func__, "id=" + to_string(id));
debug(__func__, "include_extensions=" + bool2str(includeExtensions));
debug(__func__, "include_warnings=" + bool2str(includeWarnings));
debug(__func__, "include_retries=" + bool2str(includeRetries));

auto const databaseServices = controller()->serviceProvider()->databaseServices();
auto const contrib = databaseServices->transactionContrib(id, includeWarnings, includeRetries);
auto const contrib =
databaseServices->transactionContrib(id, includeExtensions, includeWarnings, includeRetries);

json result;
result["contribution"] = contrib.toJson();
Expand Down Expand Up @@ -529,9 +586,11 @@ void HttpIngestTransModule::_removePartitionFromDirectorIndex(DatabaseInfo const
}
}

json HttpIngestTransModule::_getTransactionContributions(TransactionInfo const& transaction,
bool longContribFormat, bool includeWarnings,
bool includeRetries) const {
json HttpIngestTransModule::_getTransactionContributions(
TransactionInfo const& transaction, string const& tableName, string const& workerName,
set<TransactionContribInfo::Status> const& contribStatusSelector, int chunkSelector,
bool longContribFormat, bool includeExtensions, bool includeWarnings, bool includeRetries,
size_t minRetries, size_t minWarnings, size_t maxEntries) const {
auto const config = controller()->serviceProvider()->config();
auto const databaseServices = controller()->serviceProvider()->databaseServices();
DatabaseInfo const database = config->databaseInfo(transaction.database);
Expand Down Expand Up @@ -559,14 +618,13 @@ json HttpIngestTransModule::_getTransactionContributions(TransactionInfo const&

// Default selectors for contributions imply pulling all contributions
// attempted in a scope of the transaction.
string const anyTableSelector;
string const anyWorkerSelector;
TransactionContribInfo::TypeSelector const anyTypeSelector =
TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC;

vector<TransactionContribInfo> const contribs = databaseServices->transactionContribs(
transaction.id, anyTableSelector, anyWorkerSelector, anyTypeSelector,
longContribFormat && includeWarnings, longContribFormat && includeRetries);
transaction.id, tableName, workerName, contribStatusSelector, anyTypeSelector, chunkSelector,
longContribFormat && includeExtensions, longContribFormat && includeWarnings,
longContribFormat && includeRetries, minRetries, minWarnings, maxEntries);

for (auto&& contrib : contribs) {
if (longContribFormat) {
Expand Down Expand Up @@ -701,4 +759,43 @@ json HttpIngestTransModule::_getTransactionContributions(TransactionInfo const&
return resultJson;
}

set<TransactionInfo::State> HttpIngestTransModule::_parseTransStateSelector(string const& param) const {
set<TransactionInfo::State> result;
auto const stateStr = query().optionalString(param);
debug(__func__, param + "=" + stateStr);
if (stateStr == "!STARTED") {
result = TransactionInfo::allStates;
result.erase(TransactionInfo::State::STARTED);
} else if (stateStr == "!FINISHED") {
result = TransactionInfo::allStates;
result.erase(TransactionInfo::State::FINISHED);
} else {
bool const skipEmpty = true;
for (auto const& str : util::String::split(stateStr, ",", skipEmpty)) {
result.insert(TransactionInfo::string2state(str));
}
}
return result;
}

set<TransactionContribInfo::Status> HttpIngestTransModule::_parseContribStatusSelector(
string const& param) const {
set<TransactionContribInfo::Status> result;
auto const statusStr = query().optionalString(param);
debug(__func__, param + "=" + statusStr);
if (statusStr == "!IN_PROGRESS") {
result = TransactionContribInfo::allStatuses;
result.erase(TransactionContribInfo::Status::IN_PROGRESS);
} else if (statusStr == "!FINISHED") {
result = TransactionContribInfo::allStatuses;
result.erase(TransactionContribInfo::Status::FINISHED);
} else {
bool const skipEmpty = true;
for (auto const& str : util::String::split(statusStr, ",", skipEmpty)) {
result.insert(TransactionContribInfo::str2status(str));
}
}
return result;
}

} // namespace lsst::qserv::replica
41 changes: 39 additions & 2 deletions src/replica/contr/HttpIngestTransModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

// Qserv headers
#include "replica/contr/HttpModule.h"
#include "replica/ingest/TransactionContrib.h"
#include "replica/services/DatabaseServices.h"
#include "replica/util/Common.h"

// Forward declarations
Expand Down Expand Up @@ -130,18 +132,53 @@ class HttpIngestTransModule : public HttpModule {
/**
* Extract contributions into a transaction.
* @param transaction A transaction defining a scope of the request.
* @param tableName The name of a table to pull contributions from (if empty then all tables will be
* assumed).
* @param workerName The name of a worker to pull contributions from (if empty then all workers will be
* assumed).
* @param contribStatus A set of the contribution statuses to filter the contributions by (all
* contributions if empty).
* @param chunkSelector The chunk selector to filter the contributions by (all contributions if -1).
* @param longContribFormat If 'true' then the method will also return info on
* the individual file contributions rather than just the summary info.
* @param includeExtensions If 'true' then include info on the contributions extensions. Note that
* this option is ignored if longContribFormat == false.
* @param includeWarnings If 'true' then include info on the MySQL warnings
* if any were captured after LOAD DATA INFILE. Note that this option is
* ignored if longContribFormat == false.
* @param includeRetries If 'true' then include info on the failed retries
* if any were made when reading the input data of the contributions. Note that
* this option is ignored if longContribFormat == false.
* @param minRetries The minimum number of retries for a contribution to be included in
* the response (0 for all).
* @param minWarnings The minimum number of warnings for a contribution to include in
* the response (0 for all).
* @param maxEntries The maximum number of contributions to return (0 for all).
* @return A JSON object.
*/
nlohmann::json _getTransactionContributions(TransactionInfo const& transaction, bool longContribFormat,
bool includeWarnings, bool includeRetries) const;
nlohmann::json _getTransactionContributions(TransactionInfo const& transaction,
std::string const& tableName, std::string const& workerName,
std::set<TransactionContribInfo::Status> const& contribStatus,
int chunkSelector, bool longContribFormat,
bool includeExtensions, bool includeWarnings,
bool includeRetries, size_t minRetries = 0,
size_t minWarnings = 0, size_t maxEntries = 0) const;

/**
* Parse a string representation of the transaction state.
* @param param The name of the query parameter to parse.
* @return A set of the transaction states (empty if the parameter is not set).
* @throws std::invalid_argument If the string didn't match any known code.
*/
std::set<TransactionInfo::State> _parseTransStateSelector(std::string const& param) const;

/**
* Parse a string representation of the contribution status.
* @param param The name of the query parameter to parse.
* @return A set of the contribution statuses (empty if the parameter is not set).
* @throws std::invalid_argument If the string didn't match any known code.
*/
std::set<TransactionContribInfo::Status> _parseContribStatusSelector(std::string const& param) const;

/// Named mutexes are used for acquiring exclusive transient locks on the transaction
/// management operations performed by the module.
Expand Down
Loading
Loading