Skip to content

Commit

Permalink
execution of performance tests, correction of minor bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
joao-alegria committed Jun 22, 2021
1 parent a41eb8e commit c2ec6cd
Show file tree
Hide file tree
Showing 125 changed files with 1,617 additions and 634 deletions.
3 changes: 2 additions & 1 deletion coordinator/api/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def removeVs(vsiId):
"""

try:
message=vsService.removeVSI(current_user.name, vsiId)
force=request.args.get("force")
message=vsService.removeVSI(current_user.name, vsiId, force=force)
return jsonify({"message":message}),200
except Exception as e:
return jsonify({"message":"Error: "+str(e)}),500
8 changes: 4 additions & 4 deletions coordinator/db/persistance.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from sqlalchemy import Table, Column, Integer, Float, String, Boolean, ForeignKey, create_engine, inspect
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.orm import relationship, sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
import json
import config
Expand All @@ -22,7 +22,7 @@ def createDB(self):
if config.ENVIRONMENT=="testing":
self.engine = create_engine('sqlite:///:memory:')
else:
self.engine = create_engine('postgresql://'+str(config.POSTGRES_USER)+':'+str(config.POSTGRES_PASS)+'@'+str(config.POSTGRES_IP)+':'+str(config.POSTGRES_PORT)+'/'+str(config.POSTGRES_DB))
self.engine = create_engine('postgresql://'+str(config.POSTGRES_USER)+':'+str(config.POSTGRES_PASS)+'@'+str(config.POSTGRES_IP)+':'+str(config.POSTGRES_PORT)+'/'+str(config.POSTGRES_DB), pool_size=50)
try:
self.Base.metadata.create_all(self.engine)
except Exception as e:
Expand All @@ -34,8 +34,8 @@ def createDB(self):
conn.close()
Base.metadata.create_all(self.engine)

Session = sessionmaker(bind=self.engine)
self.session=Session()
# Session =scoped_session(sessionmaker(bind=self.engine))
self.session=scoped_session(sessionmaker(bind=self.engine))


def removeDB(self):
Expand Down
2 changes: 1 addition & 1 deletion coordinator/rabbitmq/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def callback(self, ch, method, properties, body):
if data["msgType"]=="statusUpdate":
service.changeStatusVSI(data)
except Exception as e:
logging.error("Error while processing message: {}".format(body))
logging.error("Error while processing message: {}".format(body)+" | "+str(e))

def stop(self):
try:
Expand Down
23 changes: 13 additions & 10 deletions coordinator/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,25 @@ def modifyVSI(tenantName, vsiId, request):
# messaging.publish2Exchange('vsLCM_'+str(vsiId),json.dumps(message))
messaging.publish2Exchange('vsLCM_Management',json.dumps(message))

def removeVSI(tenantName, vsiId):
def removeVSI(tenantName, vsiId, force=False):
messaging=Messaging()
vsi=DB.session.query(VerticalServiceInstance).filter(VerticalServiceInstance.tenantId==tenantName,VerticalServiceInstance.vsiId==vsiId).first()
if vsi!=None:
# vsi=DB.session.query(VerticalServiceInstance).filter(VerticalServiceInstance.tenantId==tenantName,VerticalServiceInstance.vsiId==vsiId).first()
# if vsi!=None:
#send message to manager
message={"msgType":"removeVSI", "vsiId":vsiId, "tenantId":tenantName}
messaging.publish2Exchange('vsLCM_Management',json.dumps(message))
DB.delete(vsi)
return "Success"
return "VSI "+ str(vsiId)+ " not found"
message={"msgType":"removeVSI", "vsiId":vsiId, "tenantId":tenantName, "force":force}
messaging.publish2Exchange('vsLCM_Management',json.dumps(message))
# DB.delete(vsi)
return "Success"
# return "VSI "+ str(vsiId)+ " not found"

def changeStatusVSI(data):
vsiId=data["data"]["vsiId"]
vsi=DB.session.query(VerticalServiceInstance).filter(VerticalServiceInstance.vsiId==vsiId).first()
if "fail" not in vsi.status.lower():
if "status" in data["data"]:
vsi.status=data["data"]["status"]
vsi.statusMessage=data["data"]["message"]
DB.persist(vsi)
vsi.statusMessage=data["data"]["message"]
DB.persist(vsi)
if "status" in data["data"] and "terminated" in data["data"]["status"]:
vsi=DB.session.query(VerticalServiceInstance).filter(VerticalServiceInstance.vsiId==vsiId).first()
DB.delete(vsi)
1 change: 1 addition & 0 deletions domain/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ COPY . .
RUN apt-get update
RUN apt-get install -y python3.8
RUN apt-get install -y python3-pip
RUN apt-get install -y libpq-dev
RUN pip3 install -r requirements.txt
RUN chmod +x installDrivers.sh
RUN ./installDrivers.sh
Expand Down
17 changes: 6 additions & 11 deletions domain/driver/osm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def getNS(domainIp, nsId):
def instantiateNS(domainIp, nsName, nsdName, account, additionalConf=None):
tmpClient = client.Client(host=domainIp)
#valueExtraction
# requests.post("http://192.168.0.100:9999/stopTimer/"+nsName, data={"timestamp":str(round(time.time()*1000))})
# requests.post("http://192.168.0.100:9999/stopTimer/1", data={"timestamp":str(round(time.time()*1000))})
return tmpClient.ns.create(nsd_name=nsdName, nsr_name=nsName, account=account, config=additionalConf)

def sendActionNS(domainIp, nsId, additionalConf=None):
Expand All @@ -25,6 +25,8 @@ def modifyNS(domainIp):

def terminateNS(domainIp, nsName):
tmpClient = client.Client(host=domainIp)
#valueExtraction
# requests.post("http://192.168.0.100:9999/stopTimer/2", data={"timestamp":str(round(time.time()*1000))})
return tmpClient.ns.delete(nsName)

def getNSI(domainIp, nsiId):
Expand All @@ -34,7 +36,7 @@ def getNSI(domainIp, nsiId):
def instantiateNSI(domainIp,nsiName,nstName,account, additionalConf=None):
tmpClient = client.Client(host=domainIp)
#valueExtraction
# requests.post("http://192.168.0.100:9999/stopTimer/"+nsiName, data={"timestamp":str(round(time.time()*1000))})
# requests.post("http://192.168.0.100:9999/stopTimer/1", data={"timestamp":str(round(time.time()*1000))})
tmpClient.nsi.create(nst_name=nstName,nsi_name=nsiName,account=account,config=additionalConf)

nsiId=tmpClient.nsi.get(nsiName)["id"]
Expand All @@ -51,13 +53,6 @@ def modifyNSI(domainIp):

def terminateNSI(domainIp,nsiName):
tmpClient = client.Client(host=domainIp)
#valueExtraction
# requests.post("http://192.168.0.100:9999/stopTimer/2", data={"timestamp":str(round(time.time()*1000))})
return tmpClient.nsi.delete(nsiName)

# def instantiateNSSI(domainIp):
# return

# def modifyNSSI(domainIp):
# return

# def terminateNSSI(domainIp):
# return
92 changes: 69 additions & 23 deletions manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def addVSI(self, vsiId, composition):
self.vs[vsiId]=composition

def removeVSI(self, vsiId):
del self.vs[vsiId]
if vsiId in self.vs:
del self.vs[vsiId]

def stop(self):
self.runControl=False
Expand Down Expand Up @@ -66,32 +67,52 @@ def __init__(self, vsiId, vsiRequest, pollingThread):
# th.start()

def processAction(self, data):
serviceComposition={}
tmp=redis.getHashValue("serviceComposition", self.vsiId)
if tmp!=None:
serviceComposition=json.loads(tmp)


if data["msgType"]=="nsInfo":
statusUpdate={"msgType":"statusUpdate", "data":{"vsiId":self.vsiId, "status":data["data"]["nsInfo"]["operational-status"], "message":data["data"]["nsId"]+": "+data["data"]["nsInfo"]["detailed-status"]}}
self.messaging.publish2Queue("vsCoordinator", json.dumps(statusUpdate))
if "running" in data["data"]["nsInfo"]["operational-status"].lower():
if serviceComposition=={}:
redis.setKeyValue("serviceComposition", self.vsiId,json.dumps({data["data"]["nsId"]:{"status":"instantiated"}}))
else:
serviceComposition[data["data"]["nsId"]]["status"]="instantiated"
redis.setKeyValue("serviceComposition", self.vsiId,json.dumps(serviceComposition))

tunnelActive=redis.getHashValue("interdomainTunnel",self.vsiId).decode("UTF-8")
if self.interdomain and tunnelActive=="off":
primitiveData={"data":{"primitiveName":"getvnfinfo", "primitiveTarget":data["data"]["nsId"], "primitiveInternalTarget":"1"}}
self.processVsiPrimitive(primitiveData)
elif "terminated" in data["data"]["nsiInfo"]["operational-status"].lower():
self.tearDown(data["data"]["nsId"])
return
elif data["msgType"]=="nsiInfo":
statusUpdate={"msgType":"statusUpdate", "data":{"vsiId":self.vsiId, "status":data["data"]["nsiInfo"]["operational-status"], "message":data["data"]["nsiId"]+": "+data["data"]["nsiInfo"]["detailed-status"]}}
self.messaging.publish2Queue("vsCoordinator", json.dumps(statusUpdate))
if "running" in data["data"]["nsiInfo"]["operational-status"].lower():
if serviceComposition=={}:
redis.setKeyValue("serviceComposition", self.vsiId,json.dumps({data["data"]["nsiId"]:{"status":"instantiated"}}))
else:
serviceComposition[data["data"]["nsiId"]]["status"]="instantiated"
redis.setKeyValue("serviceComposition", self.vsiId,json.dumps(serviceComposition))

tunnelActive=redis.getHashValue("interdomainTunnel",self.vsiId).decode("UTF-8")
if self.interdomain and tunnelActive=="off":
primitiveData={"data":{"primitiveName":"getvnfinfo", "primitiveTarget":data["data"]["nsiId"], "primitiveInternalTarget":"1"}}
self.processVsiPrimitive(primitiveData)
elif "terminated" in data["data"]["nsiInfo"]["operational-status"].lower():
self.tearDown(data["data"]["nsiId"])
return
elif data["msgType"]=="updateResourcesNfvoIds":
nfvoData=data["data"]

serviceComposition=redis.getHashValue("serviceComposition", self.vsiId)
if serviceComposition==None:
if serviceComposition=={}:
redis.setKeyValue("serviceComposition", self.vsiId,json.dumps({nfvoData["componentName"]:{"nfvoId":nfvoData["componentId"]}}))
else:
serviceComposition=json.loads(serviceComposition)
serviceComposition[nfvoData["componentName"]]["nfvoId"]=nfvoData["componentId"]
redis.setKeyValue("serviceComposition", self.vsiId,json.dumps(serviceComposition))

Expand Down Expand Up @@ -121,9 +142,6 @@ def processAction(self, data):
createVsi=redis.getHashValue("createVSI",self.vsiId).decode("UTF-8")

if createVsi=="create" and set(["catalogueInfo","domainInfo","tenantInfo","placementInfo"]).issubset(receivedData):
print(self.vsiId)
print(createVsi)
print(receivedData)
redis.setKeyValue("createVSI", self.vsiId, "alreadyCreated")
self.instantiateVSI()
return
Expand Down Expand Up @@ -171,6 +189,7 @@ def instantiateVSI(self):
componentConfigs=vsiRequest["data"]["additionalConf"]
if catalogueInfo["data"]["vs_blueprint_info"]["vs_blueprint"]["inter_site"]:
self.interdomain=True
config={}
for componentConf in componentConfigs:
if componentConf["componentName"]==componentName:
config=json.loads(componentConf["conf"])
Expand Down Expand Up @@ -236,11 +255,11 @@ def instantiateVSI(self):

serviceComposition=redis.getHashValue("serviceComposition", self.vsiId)
if serviceComposition==None:
serviceComposition={componentName:{"sliceEnabled":creationData["sliceEnabled"],"domainId":domainId}}
serviceComposition={componentName:{"sliceEnabled":creationData["sliceEnabled"],"domainId":domainId, "status":"instantiating"}}
redis.setKeyValue("serviceComposition", self.vsiId,json.dumps(serviceComposition))
else:
serviceComposition=json.loads(serviceComposition)
serviceComposition[componentName]={"sliceEnabled":creationData["sliceEnabled"],"domainId":domainId}
serviceComposition[componentName]={"sliceEnabled":creationData["sliceEnabled"],"domainId":domainId,"status":"instantiating"}
redis.setKeyValue("serviceComposition", self.vsiId,json.dumps(serviceComposition))

message["vsiId"]=str(self.vsiId)
Expand Down Expand Up @@ -277,7 +296,7 @@ def processVsiPrimitive(self, data):
actions[action["action_id"]]=action["parameters"]

if data["data"]["primitiveName"] not in actions:
statusUpdate={"msgType":"statusUpdate","vsiId":self.vsiId, "status":"Invalid Primitive", "message":"Primitive "+data["data"]["primitiveName"]+" is not defined in the VSB."}
statusUpdate={"msgType":"statusUpdate","data":{"vsiId":self.vsiId, "status":"Invalid Primitive", "message":"Primitive "+data["data"]["primitiveName"]+" is not defined in the VSB."}}
self.messaging.publish2Queue("vsCoordinator", json.dumps(statusUpdate))
return

Expand Down Expand Up @@ -314,7 +333,7 @@ def processVsiPrimitive(self, data):
message["data"]["primitiveName"]=data["data"]["primitiveName"]
self.messaging.publish2Queue("vsDomain", json.dumps(message))
else:
statusUpdate={"msgType":"statusUpdate","vsiId":self.vsiId, "status":"Invalid Primitive Trigger", "message":"Triggered primitive before VSI deployment finalized."}
statusUpdate={"msgType":"statusUpdate","data":{"vsiId":self.vsiId, "status":"Invalid Primitive Trigger", "message":"Triggered primitive before VSI deployment finalized."}}
self.messaging.publish2Queue("vsCoordinator", json.dumps(statusUpdate))

def interdomainHandler(self,data):
Expand Down Expand Up @@ -352,7 +371,7 @@ def interdomainHandler(self,data):
actions[action["action_id"]]=action["parameters"]

if "addpeer" not in actions:
statusUpdate={"msgType":"statusUpdate","vsiId":self.vsiId, "status":"Invalid Primitive", "message":"addpeer primitive not present in the blueprint."}
statusUpdate={"msgType":"statusUpdate","data":{"vsiId":self.vsiId, "status":"Invalid Primitive", "message":"addpeer primitive not present in the blueprint."}}
self.messaging.publish2Queue("vsCoordinator", json.dumps(statusUpdate))
return

Expand Down Expand Up @@ -383,32 +402,59 @@ def interdomainHandler(self,data):

requests.post("http://192.168.0.100:9999/stopTimer/1", data={"timestamp":str(round(time.time()*1000))})

def tearDown(self):
logging.info("Tearing down CSMF of VSI "+str(self.vsiId))
def deleteVsi(self, force=False):
serviceComposition={}
tmp=redis.getHashValue("serviceComposition", self.vsiId)
if tmp!=None:
serviceComposition=json.loads(tmp)

# self.pollingThread.stop()
self.pollingThread.removeVSI(self.vsiId)

for component, componentData in serviceComposition.items():
serviceComposition[component]["status"]="terminating"
if componentData["sliceEnabled"]:
message={"vsiId":self.vsiId,"msgType":"deleteNsi", "data":{"domainId":componentData["domainId"], "nsiId":component}}
else:
message={"vsiId":self.vsiId,"msgType":"deleteNs", "data":{"domainId":componentData["domainId"], "nsId":component}}
self.messaging.publish2Queue("vsDomain", json.dumps(message))

redis.setKeyValue("serviceComposition", self.vsiId,json.dumps(serviceComposition))

statusUpdate={"vsiId":self.vsiId, "status":"terminating"}
statusUpdate={"msgType":"statusUpdate","data":{"vsiId":self.vsiId, "status":"terminating", "message":"Terminating Vertical Service Instance."}}
self.messaging.publish2Queue("vsCoordinator", json.dumps(statusUpdate))

redis.deleteKey(self.vsiId)
redis.deleteHash("serviceComposition",self.vsiId)
redis.deleteHash("interdomainInfo",self.vsiId)
redis.deleteHash("createVSI",self.vsiId)
redis.deleteHash("interdomainTunnel",self.vsiId)
if serviceComposition=={} or force:
if force:
self.tearDown(None, force=force)
else:
self.tearDown(None)


def tearDown(self, componentName, force=False):
serviceComposition={}
tmp=redis.getHashValue("serviceComposition", self.vsiId)
if tmp!=None:
serviceComposition=json.loads(tmp)

if componentName in serviceComposition:
serviceComposition[componentName]["status"]="terminated"
redis.setKeyValue("serviceComposition", self.vsiId,json.dumps(serviceComposition))

terminated=True
for component in serviceComposition:
terminated = terminated and serviceComposition[component]["status"]=="terminated"

if terminated or force:
logging.info("Tearing down CSMF of VSI "+str(self.vsiId))
self.pollingThread.removeVSI(self.vsiId)
redis.deleteKey(self.vsiId)
redis.deleteHash("serviceComposition",self.vsiId)
redis.deleteHash("interdomainInfo",self.vsiId)
redis.deleteHash("createVSI",self.vsiId)
redis.deleteHash("interdomainTunnel",self.vsiId)

statusUpdate={"msgType":"statusUpdate","data":{"vsiId":self.vsiId, "status":"terminated","message":"Vertical Service Instance Terminated."}}
self.messaging.publish2Queue("vsCoordinator", json.dumps(statusUpdate))

# requests.post("http://192.168.0.100:9999/stopTimer/2", data={"timestamp":str(round(time.time()*1000))})

# self.stop()

Expand Down
36 changes: 17 additions & 19 deletions manager/rabbitmq/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from threading import Thread
import logging
import manager
import redisHandler as redis


class MessageReceiver(Thread):

Expand All @@ -23,7 +25,7 @@ def callback(self, channel, method_frame, header_frame, body):
if data["msgType"]=="createVSI":
self.newCSMF(data)
elif data["msgType"]=="removeVSI":
self.tearDownCSMF(data)
self.deleteVsi(data)
else:
vsiId=data["vsiId"]
if vsiId in self.csmfs:
Expand Down Expand Up @@ -55,26 +57,22 @@ def newCSMF(self,data):
self.csmfs[data["vsiId"]]=csmf
# self.messaging.consumeQueue("managementQueue-vsLCM_"+str(data["vsiId"]), self.callback, ack=False)

def tearDownCSMF(self,data):
vsiId=str(data["vsiId"])
def deleteVsi(self,data):
vsiId=data["vsiId"]
if vsiId in self.csmfs:
self.csmfs[vsiId].tearDown()
del self.csmfs[vsiId]
self.csmfs[vsiId].deleteVsi(force=data["force"])
# del self.csmfs[vsiId]
else:
self.forceDelete(vsiId)
logging.info("VSI Id not found during tearDown: "+str(vsiId))

# def newCsmfMessage(data):
# vsiId=int(data["vsiId"])
# if vsiId in csmfs:
# csmfs[vsiId].newMessage(data)
# else:
# logging.warning("VSI Id not found: "+str(vsiId))
def forceDelete(self, vsiId):
self.pollingThread.removeVSI(vsiId)
redis.deleteKey(vsiId)
redis.deleteHash("serviceComposition",vsiId)
redis.deleteHash("interdomainInfo",vsiId)
redis.deleteHash("createVSI",vsiId)
redis.deleteHash("interdomainTunnel",vsiId)

def newVnfInfo(self,data):
vsiId=data["vsiId"]
if vsiId in self.csmfs:
self.csmfs[vsiId].interdomainHandler(data)
else:
logging.warning("VSI Id not found during newVnfInfo: "+str(vsiId))
return {"error":True,"message": "Error: VSI Id not found during newVnfInfo: "+str(vsiId)}
return {"error":False,"message": "Acknowledge"}
statusUpdate={"msgType":"statusUpdate","data":{"vsiId":vsiId, "status":"terminated","message":"Vertical Service Instance Terminated."}}
self.messaging.publish2Queue("vsCoordinator", json.dumps(statusUpdate))
Binary file not shown.
File renamed without changes.
Loading

0 comments on commit c2ec6cd

Please sign in to comment.