forked from N5GEH/n5geh.services.controller
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathController.py
278 lines (244 loc) · 12.6 KB
/
Controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""
The framework for controller that interact with FIWARE.
@author: jdu
"""
import os.path
import warnings
from abc import ABC, abstractmethod
import requests
from filip.clients.ngsi_v2 import ContextBrokerClient, QuantumLeapClient
from filip.models.base import FiwareHeader
from filip.models.ngsi_v2.context import NamedCommand, ContextEntity, NamedContextAttribute
from typing import Any, Dict, Optional, List, Union
from pydantic import parse_file_as
import time
from keycloak_token_handler.keycloak_python import KeycloakPython
class Controller4Fiware(ABC):
"""
Controller4Fiware is an abstract class. It contains several abstract methods, which
must be implemented in a controller subclass. This script already gives recommended
structures for these methods.
There are several TODOs in this script, which indicate the tasks that are required during
the implementation.
Please note the following points:
1. For each type of concrete controller, the input and output structure should be the same
2. If the number of variables must change, a new controller type should be implemented
3. The declaration of variables happens in configure files
"""
def __init__(self, config_path=None):
"""
Args:
config_path: the root path of the configuration files
"""
input_path = os.path.join(config_path, "input.json")
output_path = os.path.join(config_path, "output.json")
command_path = os.path.join(config_path, "command.json")
controller_path = os.path.join(config_path, "controller.json")
# Config file of the controller must contain the initial value of parameters
self.controller_entity = parse_file_as(ContextEntity, controller_path)
# TODO check the command entity. All attributes should have the type "command".
self.input_entities = parse_file_as(List[ContextEntity], input_path)
self.output_entities = parse_file_as(List[ContextEntity], output_path)
self.command_entities = parse_file_as(List[ContextEntity], command_path)
# TODO define external inputs if any, e.g. temperature forecast
# self.temp_forecast = None
self.active = True # the controller will be deactivated if set to False
# Read from ENV
self.sampling_time = float(os.getenv("SAMPLING_TIME", 0.5))
assert self.sampling_time >= 0.1, "Controller sampling time must be larger than 0.1 sec"
controller_entity_dict = self.controller_entity.dict()
controller_entity_dict["id"] = os.getenv("CONTROLLER_ENTITY_ID", "urn:ngsi-ld:Controller:001")
controller_entity_dict["type"] = os.getenv("CONTROLLER_ENTITY_TYPE", "Controller")
self.controller_entity = ContextEntity(**controller_entity_dict)
self.fiware_params = dict()
self.fiware_params["ql_url"] = os.getenv("QL_URL", "http://localhost:8668")
self.fiware_params["cb_url"] = os.getenv("CB_URL", "http://localhost:1026")
self.fiware_params["service"] = os.getenv("FIWARE_SERVICE", "controller")
self.fiware_params["service_path"] = os.getenv("FIWARE_SERVICE_PATH", "/")
# Create the fiware header
fiware_header = FiwareHeader(service=self.fiware_params['service'],
service_path=self.fiware_params['service_path'])
# Create orion context broker client
s = requests.Session()
self.ORION_CB = ContextBrokerClient(url=self.fiware_params['cb_url'], fiware_header=fiware_header,
session=s)
self.QL_CB = QuantumLeapClient(url=self.fiware_params['ql_url'], fiware_header=fiware_header)
# settings for security mode
self.security_mode = os.getenv("SECURITY_MODE", 'False').lower() in ('true', '1', 'yes')
self.token = (None, None)
# Get token from keycloak in security mode
if self.security_mode:
self.kcp = KeycloakPython()
# Get initial token
self.token = self.kcp.get_access_token()
def update_token(self):
"""
Update the token if necessary. Write the latest token into the
header of CB client.
"""
token = self.kcp.check_update_token_validity(input_token=self.token, min_valid_time=60)
if all(token): # if a valid token is returned
self.token = token
# Update the header with token
self.ORION_CB.headers.update(
{"Authorization": f"Bearer {self.token[0]}"})
def read_controller_parameter(self):
"""
Read the controller parameters from Fiware platform
"""
for _param in self.controller_entity.get_attributes():
print(f"read {_param.name} from {self.controller_entity.id} with type {self.controller_entity.type}")
_param.value = self.ORION_CB.get_attribute_value(entity_id=self.controller_entity.id,
entity_type=self.controller_entity.type,
attr_name=_param.name)
self.controller_entity.update_attribute(attrs=[_param])
def read_input_variable(self):
"""
Read input variables from Fiware platform
"""
try:
for entity in self.input_entities:
for _input in entity.get_attributes():
print(f"read {_input.name} from id {entity.id} and type {entity.type}")
_input.value = self.ORION_CB.get_attribute_value(entity_id=entity.id,
entity_type=entity.type,
attr_name=_input.name)
entity.update_attribute(attrs=[_input])
except requests.exceptions.HTTPError as err:
msg = err.args[0]
if "NOT FOUND" not in msg.upper():
raise
self.active = False
print(msg)
print("Input entities/attributes not fond, controller stop", flush=True)
else:
# if no error
self.active = True
def send_output_variable(self):
"""
Send output variables to Fiware platform
NOTICE: output variables are normal attributes of context entities, which will not be forwarded to
devices
"""
try:
for entity in self.output_entities:
for _output in entity.get_attributes():
print(f"update output {_output.name} of id {entity.id} with type {entity.type}")
self.ORION_CB.update_attribute_value(entity_id=entity.id,
attr_name=_output.name,
value=_output.value,
entity_type=entity.type)
except requests.exceptions.HTTPError as err:
msg = err.args[0]
if "NOT FOUND" not in msg.upper():
raise
self.active = False
print(msg)
print("Output entities/attributes not fond", flush=True)
def send_commands(self):
"""
Send commands to Fiware platform. The commands will be forwarded to the corresponding actuators.
"""
try:
for entity in self.command_entities:
for _comm in entity.get_attributes():
print(f"send command {_comm.name} to id {entity.id} with type {entity.type}")
_comm = NamedCommand(**_comm.dict())
self.ORION_CB.post_command(entity_id=entity.id,
entity_type=entity.type,
command=_comm)
except requests.exceptions.HTTPError as err:
msg = err.args[0]
if "NOT FOUND" not in msg.upper():
raise
self.active = False
print(msg)
print("Commands cannot be sent", flush=True)
def create_controller_entity(self):
"""
Create the controller entity while starting. The controller parameters and their initial values are
defined in the config/controller.json. The entity id and entity type of the controller are defined
in environment variables.
"""
try:
controller_entity_exist = self.ORION_CB.get_entity(entity_id=self.controller_entity.id,
entity_type=self.controller_entity.type)
print('Entity id already assigned', flush=True)
# check the structure, extra attributes are allowed
keys = controller_entity_exist.dict().keys()
for key in self.controller_entity.dict().keys():
if key not in keys:
raise NameError(f'The existing entity has a different structure. Please delete it or change the id')
else:
print("The existing entity contains all expected attributes", flush=True)
except requests.exceptions.HTTPError as err:
msg = err.args[0]
if "NOT FOUND" not in msg.upper():
raise # throw other errors except "entity not found"
print('Create new PID entity', flush=True)
self.ORION_CB.post_entity(entity=self.controller_entity, update=True)
def hold_sampling_time(self, start_time: float):
"""
Wait in each control cycle until the sampling time (or cycle time) is up. If the algorithm takes
more time than the sampling time, a warning will be given.
Args:
start_time:
"""
if (time.time()-start_time) > self.sampling_time:
warnings.warn("The processing time is longer than the sampling time. The sampling time must be increased!")
while (time.time()-start_time) < self.sampling_time:
time.sleep(0.01)
else:
return
@abstractmethod
def control_algorithm(self):
"""
This abstract method must be implemented in subclass.
The outputs/commands are calculated in this method based on the current values of inputs and parameters.
"""
# calculate the output and commands base on the input, controller parameters and external input
...
# For MIMO system, the best practice is to update the value of outputs/commands with following code
for entity in self.command_entities:
for _comm in entity.get_attributes():
print(f"calculate command {_comm.name} to id {entity.id} with type {entity.type}")
_comm.value = ... # TODO
entity.update_attribute([_comm])
for entity in self.output_entities:
for _output in entity.get_attributes():
print(f"calculate command {_output.name} to id {entity.id} with type {entity.type}")
_output.value = ... # TODO
entity.update_attribute([_output])
@abstractmethod
def control_cycle(self):
"""
This abstract method must be implemented in subclass.
This abstract method already defines a basic structure of the control cycle. Basically, all the following
invoked functions/methods should be used. Other functions/methods can also be added in the implementation
of the subclass.
"""
try:
while True:
start_time = time.time()
# Update token if run in security mode
if self.security_mode:
self.update_token()
# update the input
self.read_input_variable()
# get external input
... # TODO
# update the controller parameters
self.read_controller_parameter()
# execute only when the controller is activated
if self.active:
# calculate the output and commands
self.control_algorithm()
# send output
self.send_output_variable()
# send commands
self.send_commands()
# wait until next cycle
self.hold_sampling_time(start_time=start_time)
except Exception as ex:
print(ex)
raise