Skip to content

Commit

Permalink
Migrated: ReplicationRequest DeleteRequest FindRequest FindAllRequest
Browse files Browse the repository at this point in the history
          EchoRequest DirectorIndexRequest Sql*Request
  • Loading branch information
iagaponenko committed Nov 22, 2024
1 parent 6e44e0d commit 4aa6d8e
Show file tree
Hide file tree
Showing 48 changed files with 652 additions and 1,034 deletions.
95 changes: 34 additions & 61 deletions src/replica/apps/ControllerApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,31 +525,29 @@ int ControllerApp::runImpl() {
Request::Ptr request;

if ("REPLICATE" == _requestType) {
request = controller->replicate(
_workerName, _sourceWorkerName, _databaseName, _chunkNumber,
[](Request::Ptr const& request_) { request_->print(); }, _priority, !_doNotTrackRequest,
_allowDuplicates);

request = ReplicationRequest::create(
controller, _workerName, _sourceWorkerName, _databaseName, _chunkNumber,
[](ReplicationRequest::Ptr const& request_) { request_->print(); }, _priority,
!_doNotTrackRequest, _allowDuplicates);
} else if ("DELETE" == _requestType) {
request = controller->deleteReplica(_workerName, _databaseName, _chunkNumber, Request::defaultPrinter,
_priority, !_doNotTrackRequest, _allowDuplicates);

request = DeleteRequest::create(controller, _workerName, _databaseName, _chunkNumber,
Request::defaultPrinter, _priority, !_doNotTrackRequest,
_allowDuplicates);
} else if ("FIND" == _requestType) {
request = controller->findReplica(_workerName, _databaseName, _chunkNumber, Request::defaultPrinter,
_priority, _computeCheckSum, !_doNotTrackRequest);

request = FindRequest::create(controller, _workerName, _databaseName, _chunkNumber,
Request::defaultPrinter, _priority, _computeCheckSum,
!_doNotTrackRequest);
} else if ("FIND_ALL" == _requestType) {
request = controller->findAllReplicas(_workerName, _databaseName, !_doNotSaveReplicaInfo,
Request::defaultPrinter, _priority, !_doNotTrackRequest);

request = FindAllRequest::create(controller, _workerName, _databaseName, !_doNotSaveReplicaInfo,
Request::defaultPrinter, _priority, !_doNotTrackRequest);
} else if ("ECHO" == _requestType) {
request = controller->echo(_workerName, _echoData, _echoDelayMilliseconds, Request::defaultPrinter,
_priority, !_doNotTrackRequest);

request = EchoRequest::create(controller, _workerName, _echoData, _echoDelayMilliseconds,
Request::defaultPrinter, _priority, !_doNotTrackRequest);
} else if ("INDEX" == _requestType) {
bool const hasTransactions = _transactionId != numeric_limits<TransactionId>::max();
request = controller->directorIndex(
_workerName, _sqlDatabase, _sqlTable, _chunkNumber, hasTransactions, _transactionId,
request = DirectorIndexRequest::create(
controller, _workerName, _sqlDatabase, _sqlTable, _chunkNumber, hasTransactions,
_transactionId,
[&](DirectorIndexRequest::Ptr const& request_) {
Request::defaultPrinter(request_);
auto const& responseData = request_->responseData();
Expand All @@ -570,125 +568,100 @@ int ControllerApp::runImpl() {
}
},
_priority, !_doNotTrackRequest);

} else if ("SQL_ALTER_TABLES" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlAlterTables(_workerName, _sqlDatabase, tables, _sqlAlterSpec,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

request = SqlAlterTablesRequest::create(controller, _workerName, _sqlDatabase, tables, _sqlAlterSpec,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_QUERY" == _requestType) {
request = controller->sqlQuery(_workerName, _sqlQuery, _sqlUser, _sqlPassword, _sqlMaxRows,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

} else if ("SQL_CREATE_DATABASE" == _requestType) {
request = controller->sqlCreateDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority,
request = SqlQueryRequest::create(controller, _workerName, _sqlQuery, _sqlUser, _sqlPassword,
_sqlMaxRows, SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

} else if ("SQL_CREATE_DATABASE" == _requestType) {
request = SqlCreateDbRequest::create(controller, _workerName, _sqlDatabase,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_DELETE_DATABASE" == _requestType) {
request = controller->sqlDeleteDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

request = SqlDeleteDbRequest::create(controller, _workerName, _sqlDatabase,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_ENABLE_DATABASE" == _requestType) {
request = controller->sqlEnableDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

request = SqlEnableDbRequest::create(controller, _workerName, _sqlDatabase,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_DISABLE_DATABASE" == _requestType) {
request = controller->sqlDisableDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

request = SqlDisableDbRequest::create(controller, _workerName, _sqlDatabase,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_GRANT_ACCESS" == _requestType) {
request = controller->sqlGrantAccess(_workerName, _sqlDatabase, _sqlUser, SqlRequest::extendedPrinter,
_priority, !_doNotTrackRequest);

request = SqlGrantAccessRequest::create(controller, _workerName, _sqlDatabase, _sqlUser,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);
} else if ("SQL_CREATE_TABLE" == _requestType) {
request = controller->sqlCreateTable(_workerName, _sqlDatabase, _sqlTable, _sqlEngine,
_sqlPartitionByColumn,
SqlSchemaUtils::readFromTextFile(_sqlSchemaFile),
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

} else if ("SQL_CREATE_TABLES" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlCreateTables(_workerName, _sqlDatabase, tables, _sqlEngine,
_sqlPartitionByColumn,
SqlSchemaUtils::readFromTextFile(_sqlSchemaFile),
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

} else if ("SQL_DELETE_TABLE" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlDeleteTable(_workerName, _sqlDatabase, tables, SqlRequest::extendedPrinter,
_priority, !_doNotTrackRequest);

} else if ("SQL_REMOVE_TABLE_PARTITIONS" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlRemoveTablePartitions(_workerName, _sqlDatabase, tables,
SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

} else if ("SQL_DELETE_TABLE_PARTITION" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlDeleteTablePartition(_workerName, _sqlDatabase, tables, _transactionId,
SqlRequest::extendedPrinter, _priority,
!_doNotTrackRequest);

} else if ("SQL_CREATE_TABLE_INDEXES" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlCreateTableIndexes(
_workerName, _sqlDatabase, tables, SqlRequestParams::IndexSpec(_sqlIndexSpecStr),
request = SqlCreateIndexesRequest::create(
controller, _workerName, _sqlDatabase, tables, SqlRequestParams::IndexSpec(_sqlIndexSpecStr),
_sqlIndexName, _sqlIndexComment,
SqlSchemaUtils::readIndexSpecFromTextFile(_sqlIndexColumnsFile), SqlRequest::extendedPrinter,
_priority, !_doNotTrackRequest);

} else if ("SQL_DROP_TABLE_INDEXES" == _requestType) {
vector<string> const tables = {_sqlTable};
request =
controller->sqlDropTableIndexes(_workerName, _sqlDatabase, tables, _sqlIndexName,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

} else if ("SQL_GET_TABLE_INDEXES" == _requestType) {
vector<string> const tables = {_sqlTable};
request = controller->sqlGetTableIndexes(_workerName, _sqlDatabase, tables,
SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest);

} else if ("SQL_TABLE_ROW_STATS" == _requestType) {
auto const databaseInfo = controller->serviceProvider()->config()->databaseInfo(_sqlDatabase);
bool const isPartitioned = databaseInfo.findTable(_sqlTable).isPartitioned;
vector<string> const tables = {
isPartitioned ? ChunkedTable(_sqlTable, _chunkNumber, _isOverlap).name() : _sqlTable};
request = controller->sqlRowStats(_workerName, _sqlDatabase, tables, SqlRequest::extendedPrinter,
_priority, !_doNotTrackRequest);

} else if ("STATUS" == _requestType) {
request = _launchStatusRequest(controller);

} else if ("STOP" == _requestType) {
request = _launchStatusRequest(controller);

} else if ("DISPOSE" == _requestType) {
vector<string> const targetIds = {_affectedRequestId};
request = controller->dispose(_workerName, targetIds, Request::defaultPrinter);

} else if ("SERVICE_SUSPEND" == _requestType) {
request =
controller->suspendWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter);

} else if ("SERVICE_RESUME" == _requestType) {
request = controller->resumeWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter);

} else if ("SERVICE_STATUS" == _requestType) {
request =
controller->statusOfWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter);

} else if ("SERVICE_REQUESTS" == _requestType) {
request = controller->requestsOfWorkerService(_workerName,
ServiceManagementRequestBase::extendedPrinter);

} else if ("SERVICE_DRAIN" == _requestType) {
request = controller->drainWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter);

} else if ("SERVICE_RECONFIG" == _requestType) {
request =
controller->reconfigWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter);

} else {
throw logic_error(context + "unsupported request: " + _affectedRequest);
}
Expand Down
4 changes: 2 additions & 2 deletions src/replica/apps/MessengerTestApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ int MessengerTestApp::runImpl() {
_onNumActiveCv.wait(lock, [&] { return _numActive < _maxActiveRequests; });

// Submit the next request.
auto const request = controller->echo(
_workerName, _data, _proccesingTimeSec,
auto const request = EchoRequest::create(
controller, _workerName, _data, _proccesingTimeSec,
[&](EchoRequest::Ptr request) {
{
unique_lock<mutex> lock(_mtx);
Expand Down
Loading

0 comments on commit 4aa6d8e

Please sign in to comment.