From e21d76104eb766d106f53f5b768606806136d6a5 Mon Sep 17 00:00:00 2001 From: Ken Zangelin Date: Mon, 25 Nov 2024 20:54:15 +0100 Subject: [PATCH] Refactor, to make dds notifications have TRoE and HTTP Notifications. Implemented, not very much tested --- .../orionld/dds/ddsConfigTopicToAttribute.cpp | 8 ++- src/lib/orionld/dds/ddsInit.cpp | 15 ++++- src/lib/orionld/dds/ddsNotification.cpp | 40 +++++++++++--- src/lib/orionld/mhd/mhdConnectionTreat.cpp | 48 ---------------- src/lib/rest/rest.cpp | 55 +++++++++++++++++-- 5 files changed, 102 insertions(+), 64 deletions(-) diff --git a/src/lib/orionld/dds/ddsConfigTopicToAttribute.cpp b/src/lib/orionld/dds/ddsConfigTopicToAttribute.cpp index c8647f541b..043224a584 100644 --- a/src/lib/orionld/dds/ddsConfigTopicToAttribute.cpp +++ b/src/lib/orionld/dds/ddsConfigTopicToAttribute.cpp @@ -58,11 +58,17 @@ char* ddsConfigTopicToAttribute(const char* topic, char** entityIdPP, char** ent const char* path[4] = { "dds", "ngsild", "topics", NULL }; static KjNode* topicsP = kjNavigate(ddsConfigTree, path, NULL, NULL); - KjNode* topicP = kjLookup(topicsP, topic); + + if (topicsP == NULL) + KT_RE(NULL, "the field dds/ngsild/topic not found in DDS config file (looking for topic '%s')", topic); + + KjNode* topicP = kjLookup(topicsP, topic); if (topicP == NULL) KT_RE(NULL, "topic '%s' not found in DDS config file", topic); + KT_T(StDdsConfig, "Found topic '%s' in config file", topic); + KjNode* attributeP = kjLookup(topicP, "attribute"); if (attributeP == NULL) diff --git a/src/lib/orionld/dds/ddsInit.cpp b/src/lib/orionld/dds/ddsInit.cpp index d800879064..ce3f2a9ed7 100644 --- a/src/lib/orionld/dds/ddsInit.cpp +++ b/src/lib/orionld/dds/ddsInit.cpp @@ -34,6 +34,8 @@ extern "C" #include "kjson/KjNode.h" // KjNode } +#include "logMsg/logMsg.h" // lmOut + #include "orionld/common/traceLevels.h" // kjTreeLog2 #include "orionld/common/orionldState.h" // configFile #include "orionld/kjTree/kjNavigate.h" // kjNavigate @@ -71,13 +73,20 @@ void ddsTypeNotification(const char* typeName, const char* topicName, const char // void ddsLog(const char* fileName, int lineNo, const char* funcName, int category, const char* msg) { - int level = 0; - char severity = ddsCategoryToKlogSeverity(category, &level); - char* filename = (fileName != NULL)? (char*) fileName : (char*) "no-filename"; char* funcname = (funcName != NULL)? (char*) funcName : (char*) "no-funcname"; +#if 1 + int level = 0; + char severity = ddsCategoryToKlogSeverity(category, &level); + ktOut(filename, lineNo, funcname, severity, level, msg); +#else + int level = 'W'; + char lmType = ddsCategoryToKlogSeverity(category, &level); // Think it will work also for LM + + lmOut((char*) msg, lmType, filename, lineNo, funcname, level); +#endif } diff --git a/src/lib/orionld/dds/ddsNotification.cpp b/src/lib/orionld/dds/ddsNotification.cpp index ba65e39f90..cefc9f39a1 100644 --- a/src/lib/orionld/dds/ddsNotification.cpp +++ b/src/lib/orionld/dds/ddsNotification.cpp @@ -34,15 +34,37 @@ extern "C" #include "orionld/common/orionldState.h" // orionldState, kjTreeLog #include "orionld/common/traceLevels.h" // KT_T trace levels #include "orionld/common/tenantList.h" // tenant0 +#include "orionld/types/OrionLdRestService.h" // OrionLdRestService, OrionLdRestServiceVector, OrionldServiceRoutine +#include "orionld/service/orionldServiceInit.h" // orionldRestServiceV +#include "orionld/serviceRoutines/orionldPutAttribute.h" // orionldPutAttribute +#include "orionld/mongoc/mongocConnectionRelease.h" // mongocConnectionRelease #include "orionld/context/orionldContextItemExpand.h" // orionldContextItemExpand #include "orionld/notifications/orionldAlterationsTreat.h" // orionldAlterationsTreat -#include "orionld/serviceRoutines/orionldPutAttribute.h" // orionldPutAttribute #include "orionld/dds/kjTreeLog.h" // kjTreeLog2 #include "orionld/dds/ddsConfigTopicToAttribute.h" // ddsConfigTopicToAttribute #include "orionld/dds/ddsNotification.h" // Own interface +// ----------------------------------------------------------------------------- +// +// serviceLookupByRoutine - +// +static OrionLdRestService* serviceLookupByRoutine(OrionldServiceRoutine serviceRoutine, Verb verb) +{ + OrionLdRestServiceVector* serviceVectorP = &orionldRestServiceV[verb]; + + for (int ix = 0; ix < serviceVectorP->services; ix++) + { + if (serviceVectorP->serviceV[ix].serviceRoutine == serviceRoutine) + return &serviceVectorP->serviceV[ix]; + } + + return NULL; +} + + + // ----------------------------------------------------------------------------- // // ddsNotification - @@ -123,19 +145,21 @@ void ddsNotification(const char* typeName, const char* topicName, const char* js orionldState.in.pathAttrExpanded = (char*) topicName; orionldState.ddsSample = true; orionldState.ddsPublishTime = publishTime; + orionldState.apiVersion = API_VERSION_NGSILD_V1; kjChildAdd(attrNodeP, participantIdNodeP); KjNode* publishedAt = kjInteger(orionldState.kjsonP, "publishedAt", publishTime); kjChildAdd(attrNodeP, publishedAt); - // - // If the entity does not exist, it needs to be created - // Except of course, if it is registered and exists elsewhere - // + orionldState.serviceP = serviceLookupByRoutine(orionldPutAttribute, HTTP_PUT); + orionldPutAttribute(); - // Do what's needed from the function requestCompleted - if (orionldState.alterations != NULL) - orionldAlterationsTreat(orionldState.alterations); + // + // Cleanup + // + void* con_cls; + extern void requestCompleted(void* cls, MHD_Connection* connection, void** con_cls, MHD_RequestTerminationCode toe); + requestCompleted(NULL, NULL, &con_cls, MHD_REQUEST_TERMINATED_COMPLETED_OK); } diff --git a/src/lib/orionld/mhd/mhdConnectionTreat.cpp b/src/lib/orionld/mhd/mhdConnectionTreat.cpp index 1befef8fd9..019225b384 100644 --- a/src/lib/orionld/mhd/mhdConnectionTreat.cpp +++ b/src/lib/orionld/mhd/mhdConnectionTreat.cpp @@ -1369,54 +1369,6 @@ MHD_Result mhdConnectionTreat(void) // mhdReply(orionldState.responseTree); // orionldState.responsePayload freed and NULLed by mhdReply() - - // - // FIXME: Delay until requestCompleted. The call to orionldStateRelease as well - // - // Call TRoE Routine (if there is one) to save the TRoE data. - // Only if the Service Routine was successful, of course - // AND if there is any request tree to process - // - if ((orionldState.httpStatusCode >= 200) && (orionldState.httpStatusCode <= 300) && (orionldState.noDbUpdate == false)) - { - if ((orionldState.serviceP != NULL) && (orionldState.serviceP->troeRoutine != NULL)) - { - // - // Also, if something went wrong during processing, the SR can flag this by setting the requestTree to NULL - // - if (orionldState.troeError == true) - LM_E(("Internal Error (something went wrong during TRoE processing)")); - else - { - // - // Special case - Entity creation with no attribute - // As both the entity id and the entity type have been removed from the payload body, the payload body is now empty. - // We still have to record the creation of the entity in the TRoE database! - // - // If the incoming request an empty array/object, then don't call the TRoE routine - // - EXCEPT if it's a POST /entities request (service routine is orionldPostEntities) - // - bool invokeTroe = false; - - if (orionldState.verb == HTTP_DELETE) invokeTroe = true; - if (orionldState.serviceP->serviceRoutine == orionldPostEntities) invokeTroe = true; - if ((orionldState.requestTree != NULL) && (orionldState.requestTree->value.firstChildP != NULL)) invokeTroe = true; - - if (invokeTroe == true) - { - PERFORMANCE(troeStart); - orionldState.serviceP->troeRoutine(); - PERFORMANCE(troeEnd); - } - } - } - } - - // - // Cleanup - // - orionldStateRelease(); - PERFORMANCE(requestPartEnd); return MHD_YES; diff --git a/src/lib/rest/rest.cpp b/src/lib/rest/rest.cpp index c223063667..5e25f1f66b 100644 --- a/src/lib/rest/rest.cpp +++ b/src/lib/rest/rest.cpp @@ -61,6 +61,7 @@ extern "C" #include "orionld/types/OrionldHeader.h" // orionldHeaderAdd #include "orionld/types/OrionldMimeType.h" // mimeTypeFromString #include "orionld/types/ApiVersion.h" // ApiVersion +#include "orionld/types/OrionLdRestService.h" // ORIONLD_URIPARAM_LIMIT, ... #include "orionld/common/orionldState.h" // orionldState, multitenancy, ... #include "orionld/common/performance.h" // REQUEST_PERFORMANCE #include "orionld/common/orionldError.h" // orionldError @@ -68,7 +69,7 @@ extern "C" #include "orionld/common/tenantList.h" // tenant0 #include "orionld/common/stringStrip.h" // stringStrip #include "orionld/http/verbGet.h" // verbGet -#include "orionld/mongoc/mongocConnectionRelease.h" // Own interface +#include "orionld/mongoc/mongocConnectionRelease.h" // mongocConnectionRelease #include "orionld/notifications/orionldAlterationsTreat.h" // orionldAlterationsTreat #include "orionld/mhd/mhdConnectionInit.h" // mhdConnectionInit #include "orionld/mhd/mhdConnectionPayloadRead.h" // mhdConnectionPayloadRead @@ -76,6 +77,7 @@ extern "C" #include "orionld/distOp/distOpListRelease.h" // distOpListRelease #include "orionld/service/orionldServiceNotFound.h" // orionldServiceNotFound #include "orionld/payloadCheck/pCheckUri.h" // pCheckUri +#include "orionld/serviceRoutines/orionldPostEntities.h" // orionldPostEntities #include "rest/HttpHeaders.h" // HTTP_* defines #include "rest/Verb.h" @@ -363,7 +365,7 @@ static MHD_Result httpHeaderGet(void* cbDataP, MHD_ValueKind kind, const char* k * * requestCompleted - */ -static void requestCompleted +void requestCompleted ( void* cls, MHD_Connection* connection, @@ -374,7 +376,6 @@ static void requestCompleted PERFORMANCE(requestCompletedStart); ConnectionInfo* ciP = (ConnectionInfo*) *con_cls; - const char* spath = ((orionldState.apiVersion != API_VERSION_NGSILD_V1) && (ciP->servicePathV.size() > 0))? ciP->servicePathV[0].c_str() : ""; struct timespec reqEndTime; // @@ -387,6 +388,46 @@ static void requestCompleted PERFORMANCE(notifEnd); } + // + // Call TRoE Routine (if there is one) to save the TRoE data. + // Only if the Service Routine was successful, of course + // AND if there is any request tree to process + // + if ((orionldState.httpStatusCode >= 200) && (orionldState.httpStatusCode <= 300) && (orionldState.noDbUpdate == false)) + { + if ((orionldState.serviceP != NULL) && (orionldState.serviceP->troeRoutine != NULL)) + { + // + // Also, if something went wrong during processing, the SR can flag this by setting the requestTree to NULL + // + if (orionldState.troeError == true) + LM_E(("Internal Error (something went wrong during TRoE processing)")); + else + { + // + // Special case - Entity creation with no attribute + // As both the entity id and the entity type have been removed from the payload body, the payload body is now empty. + // We still have to record the creation of the entity in the TRoE database! + // + // If the incoming request an empty array/object, then don't call the TRoE routine + // - EXCEPT if it's a POST /entities request (service routine is orionldPostEntities) + // + bool invokeTroe = false; + + if (orionldState.verb == HTTP_DELETE) invokeTroe = true; + if (orionldState.serviceP->serviceRoutine == orionldPostEntities) invokeTroe = true; + if ((orionldState.requestTree != NULL) && (orionldState.requestTree->value.firstChildP != NULL)) invokeTroe = true; + + if (invokeTroe == true) + { + PERFORMANCE(troeStart); + orionldState.serviceP->troeRoutine(); + PERFORMANCE(troeEnd); + } + } + } + } + if ((orionldState.in.payload != NULL) && (orionldState.in.payload != orionldState.preallocReqBuf)) { free(orionldState.in.payload); @@ -503,6 +544,7 @@ static void requestCompleted // if ((orionldState.apiVersion != API_VERSION_NGSILD_V1) && (metricsMgr.isOn())) { + const char* spath = (ciP->servicePathV.size() > 0)? ciP->servicePathV[0].c_str() : ""; metricsMgr.add(orionldState.tenantP->tenant, spath, METRIC_TRANS_IN, 1); if (orionldState.httpStatusCode >= 400) @@ -530,7 +572,7 @@ static void requestCompleted extern void delayedReleaseExecute(void); delayedReleaseExecute(); - if (orionldState.apiVersion != API_VERSION_NGSILD_V1) + if ((orionldState.apiVersion != API_VERSION_NGSILD_V1) && (ciP != NULL)) delete(ciP); kaBufferReset(&orionldState.kalloc, false); // 'false': it's reused, but in a different thread ... @@ -582,6 +624,11 @@ static void requestCompleted LM_T(LmtPerformance, ("TPUT: mongoConnect Accumulated: %f (%d calls)", performanceTimestamps.mongoConnectAccumulated, performanceTimestamps.getMongoConnectionCalls)); } #endif + + // + // Cleanup + // + orionldStateRelease(); }