Skip to content

Commit

Permalink
Refactor, to make dds notifications have TRoE and HTTP Notifications.…
Browse files Browse the repository at this point in the history
… Implemented, not very much tested
  • Loading branch information
kzangeli committed Nov 25, 2024
1 parent d30e236 commit e21d761
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 64 deletions.
8 changes: 7 additions & 1 deletion src/lib/orionld/dds/ddsConfigTopicToAttribute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,17 @@ char* ddsConfigTopicToAttribute(const char* topic, char** entityIdPP, char** ent

const char* path[4] = { "dds", "ngsild", "topics", NULL };
static KjNode* topicsP = kjNavigate(ddsConfigTree, path, NULL, NULL);
KjNode* topicP = kjLookup(topicsP, topic);

if (topicsP == NULL)
KT_RE(NULL, "the field dds/ngsild/topic not found in DDS config file (looking for topic '%s')", topic);

KjNode* topicP = kjLookup(topicsP, topic);

if (topicP == NULL)
KT_RE(NULL, "topic '%s' not found in DDS config file", topic);

KT_T(StDdsConfig, "Found topic '%s' in config file", topic);

KjNode* attributeP = kjLookup(topicP, "attribute");

if (attributeP == NULL)
Expand Down
15 changes: 12 additions & 3 deletions src/lib/orionld/dds/ddsInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ extern "C"
#include "kjson/KjNode.h" // KjNode
}

#include "logMsg/logMsg.h" // lmOut

#include "orionld/common/traceLevels.h" // kjTreeLog2
#include "orionld/common/orionldState.h" // configFile
#include "orionld/kjTree/kjNavigate.h" // kjNavigate
Expand Down Expand Up @@ -71,13 +73,20 @@ void ddsTypeNotification(const char* typeName, const char* topicName, const char
//
void ddsLog(const char* fileName, int lineNo, const char* funcName, int category, const char* msg)
{
int level = 0;
char severity = ddsCategoryToKlogSeverity(category, &level);

char* filename = (fileName != NULL)? (char*) fileName : (char*) "no-filename";
char* funcname = (funcName != NULL)? (char*) funcName : (char*) "no-funcname";

#if 1
int level = 0;
char severity = ddsCategoryToKlogSeverity(category, &level);

ktOut(filename, lineNo, funcname, severity, level, msg);
#else
int level = 'W';
char lmType = ddsCategoryToKlogSeverity(category, &level); // Think it will work also for LM
lmOut((char*) msg, lmType, filename, lineNo, funcname, level);
#endif
}


Expand Down
40 changes: 32 additions & 8 deletions src/lib/orionld/dds/ddsNotification.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,37 @@ extern "C"
#include "orionld/common/orionldState.h" // orionldState, kjTreeLog
#include "orionld/common/traceLevels.h" // KT_T trace levels
#include "orionld/common/tenantList.h" // tenant0
#include "orionld/types/OrionLdRestService.h" // OrionLdRestService, OrionLdRestServiceVector, OrionldServiceRoutine
#include "orionld/service/orionldServiceInit.h" // orionldRestServiceV
#include "orionld/serviceRoutines/orionldPutAttribute.h" // orionldPutAttribute
#include "orionld/mongoc/mongocConnectionRelease.h" // mongocConnectionRelease
#include "orionld/context/orionldContextItemExpand.h" // orionldContextItemExpand
#include "orionld/notifications/orionldAlterationsTreat.h" // orionldAlterationsTreat
#include "orionld/serviceRoutines/orionldPutAttribute.h" // orionldPutAttribute
#include "orionld/dds/kjTreeLog.h" // kjTreeLog2
#include "orionld/dds/ddsConfigTopicToAttribute.h" // ddsConfigTopicToAttribute
#include "orionld/dds/ddsNotification.h" // Own interface



// -----------------------------------------------------------------------------
//
// serviceLookupByRoutine -
//
static OrionLdRestService* serviceLookupByRoutine(OrionldServiceRoutine serviceRoutine, Verb verb)
{
OrionLdRestServiceVector* serviceVectorP = &orionldRestServiceV[verb];

for (int ix = 0; ix < serviceVectorP->services; ix++)
{
if (serviceVectorP->serviceV[ix].serviceRoutine == serviceRoutine)
return &serviceVectorP->serviceV[ix];
}

return NULL;
}



// -----------------------------------------------------------------------------
//
// ddsNotification -
Expand Down Expand Up @@ -123,19 +145,21 @@ void ddsNotification(const char* typeName, const char* topicName, const char* js
orionldState.in.pathAttrExpanded = (char*) topicName;
orionldState.ddsSample = true;
orionldState.ddsPublishTime = publishTime;
orionldState.apiVersion = API_VERSION_NGSILD_V1;

kjChildAdd(attrNodeP, participantIdNodeP);

KjNode* publishedAt = kjInteger(orionldState.kjsonP, "publishedAt", publishTime);
kjChildAdd(attrNodeP, publishedAt);

//
// If the entity does not exist, it needs to be created
// Except of course, if it is registered and exists elsewhere
//
orionldState.serviceP = serviceLookupByRoutine(orionldPutAttribute, HTTP_PUT);

orionldPutAttribute();

// Do what's needed from the function requestCompleted
if (orionldState.alterations != NULL)
orionldAlterationsTreat(orionldState.alterations);
//
// Cleanup
//
void* con_cls;
extern void requestCompleted(void* cls, MHD_Connection* connection, void** con_cls, MHD_RequestTerminationCode toe);
requestCompleted(NULL, NULL, &con_cls, MHD_REQUEST_TERMINATED_COMPLETED_OK);
}
48 changes: 0 additions & 48 deletions src/lib/orionld/mhd/mhdConnectionTreat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1369,54 +1369,6 @@ MHD_Result mhdConnectionTreat(void)
//
mhdReply(orionldState.responseTree); // orionldState.responsePayload freed and NULLed by mhdReply()


//
// FIXME: Delay until requestCompleted. The call to orionldStateRelease as well
//
// Call TRoE Routine (if there is one) to save the TRoE data.
// Only if the Service Routine was successful, of course
// AND if there is any request tree to process
//
if ((orionldState.httpStatusCode >= 200) && (orionldState.httpStatusCode <= 300) && (orionldState.noDbUpdate == false))
{
if ((orionldState.serviceP != NULL) && (orionldState.serviceP->troeRoutine != NULL))
{
//
// Also, if something went wrong during processing, the SR can flag this by setting the requestTree to NULL
//
if (orionldState.troeError == true)
LM_E(("Internal Error (something went wrong during TRoE processing)"));
else
{
//
// Special case - Entity creation with no attribute
// As both the entity id and the entity type have been removed from the payload body, the payload body is now empty.
// We still have to record the creation of the entity in the TRoE database!
//
// If the incoming request an empty array/object, then don't call the TRoE routine
// - EXCEPT if it's a POST /entities request (service routine is orionldPostEntities)
//
bool invokeTroe = false;

if (orionldState.verb == HTTP_DELETE) invokeTroe = true;
if (orionldState.serviceP->serviceRoutine == orionldPostEntities) invokeTroe = true;
if ((orionldState.requestTree != NULL) && (orionldState.requestTree->value.firstChildP != NULL)) invokeTroe = true;

if (invokeTroe == true)
{
PERFORMANCE(troeStart);
orionldState.serviceP->troeRoutine();
PERFORMANCE(troeEnd);
}
}
}
}

//
// Cleanup
//
orionldStateRelease();

PERFORMANCE(requestPartEnd);

return MHD_YES;
Expand Down
55 changes: 51 additions & 4 deletions src/lib/rest/rest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,23 @@ extern "C"
#include "orionld/types/OrionldHeader.h" // orionldHeaderAdd
#include "orionld/types/OrionldMimeType.h" // mimeTypeFromString
#include "orionld/types/ApiVersion.h" // ApiVersion
#include "orionld/types/OrionLdRestService.h" // ORIONLD_URIPARAM_LIMIT, ...
#include "orionld/common/orionldState.h" // orionldState, multitenancy, ...
#include "orionld/common/performance.h" // REQUEST_PERFORMANCE
#include "orionld/common/orionldError.h" // orionldError
#include "orionld/common/orionldTenantGet.h" // orionldTenantGet
#include "orionld/common/tenantList.h" // tenant0
#include "orionld/common/stringStrip.h" // stringStrip
#include "orionld/http/verbGet.h" // verbGet
#include "orionld/mongoc/mongocConnectionRelease.h" // Own interface
#include "orionld/mongoc/mongocConnectionRelease.h" // mongocConnectionRelease
#include "orionld/notifications/orionldAlterationsTreat.h" // orionldAlterationsTreat
#include "orionld/mhd/mhdConnectionInit.h" // mhdConnectionInit
#include "orionld/mhd/mhdConnectionPayloadRead.h" // mhdConnectionPayloadRead
#include "orionld/mhd/mhdConnectionTreat.h" // mhdConnectionTreat
#include "orionld/distOp/distOpListRelease.h" // distOpListRelease
#include "orionld/service/orionldServiceNotFound.h" // orionldServiceNotFound
#include "orionld/payloadCheck/pCheckUri.h" // pCheckUri
#include "orionld/serviceRoutines/orionldPostEntities.h" // orionldPostEntities

#include "rest/HttpHeaders.h" // HTTP_* defines
#include "rest/Verb.h"
Expand Down Expand Up @@ -363,7 +365,7 @@ static MHD_Result httpHeaderGet(void* cbDataP, MHD_ValueKind kind, const char* k
*
* requestCompleted -
*/
static void requestCompleted
void requestCompleted
(
void* cls,
MHD_Connection* connection,
Expand All @@ -374,7 +376,6 @@ static void requestCompleted
PERFORMANCE(requestCompletedStart);

ConnectionInfo* ciP = (ConnectionInfo*) *con_cls;
const char* spath = ((orionldState.apiVersion != API_VERSION_NGSILD_V1) && (ciP->servicePathV.size() > 0))? ciP->servicePathV[0].c_str() : "";
struct timespec reqEndTime;

//
Expand All @@ -387,6 +388,46 @@ static void requestCompleted
PERFORMANCE(notifEnd);
}

//
// Call TRoE Routine (if there is one) to save the TRoE data.
// Only if the Service Routine was successful, of course
// AND if there is any request tree to process
//
if ((orionldState.httpStatusCode >= 200) && (orionldState.httpStatusCode <= 300) && (orionldState.noDbUpdate == false))
{
if ((orionldState.serviceP != NULL) && (orionldState.serviceP->troeRoutine != NULL))
{
//
// Also, if something went wrong during processing, the SR can flag this by setting the requestTree to NULL
//
if (orionldState.troeError == true)
LM_E(("Internal Error (something went wrong during TRoE processing)"));
else
{
//
// Special case - Entity creation with no attribute
// As both the entity id and the entity type have been removed from the payload body, the payload body is now empty.
// We still have to record the creation of the entity in the TRoE database!
//
// If the incoming request an empty array/object, then don't call the TRoE routine
// - EXCEPT if it's a POST /entities request (service routine is orionldPostEntities)
//
bool invokeTroe = false;

if (orionldState.verb == HTTP_DELETE) invokeTroe = true;
if (orionldState.serviceP->serviceRoutine == orionldPostEntities) invokeTroe = true;
if ((orionldState.requestTree != NULL) && (orionldState.requestTree->value.firstChildP != NULL)) invokeTroe = true;

if (invokeTroe == true)
{
PERFORMANCE(troeStart);
orionldState.serviceP->troeRoutine();
PERFORMANCE(troeEnd);
}
}
}
}

if ((orionldState.in.payload != NULL) && (orionldState.in.payload != orionldState.preallocReqBuf))
{
free(orionldState.in.payload);
Expand Down Expand Up @@ -503,6 +544,7 @@ static void requestCompleted
//
if ((orionldState.apiVersion != API_VERSION_NGSILD_V1) && (metricsMgr.isOn()))
{
const char* spath = (ciP->servicePathV.size() > 0)? ciP->servicePathV[0].c_str() : "";
metricsMgr.add(orionldState.tenantP->tenant, spath, METRIC_TRANS_IN, 1);

if (orionldState.httpStatusCode >= 400)
Expand Down Expand Up @@ -530,7 +572,7 @@ static void requestCompleted
extern void delayedReleaseExecute(void);
delayedReleaseExecute();

if (orionldState.apiVersion != API_VERSION_NGSILD_V1)
if ((orionldState.apiVersion != API_VERSION_NGSILD_V1) && (ciP != NULL))
delete(ciP);

kaBufferReset(&orionldState.kalloc, false); // 'false': it's reused, but in a different thread ...
Expand Down Expand Up @@ -582,6 +624,11 @@ static void requestCompleted
LM_T(LmtPerformance, ("TPUT: mongoConnect Accumulated: %f (%d calls)", performanceTimestamps.mongoConnectAccumulated, performanceTimestamps.getMongoConnectionCalls));
}
#endif

//
// Cleanup
//
orionldStateRelease();
}


Expand Down

0 comments on commit e21d761

Please sign in to comment.