From e86813170211ab3f4898d134a3318335f457b1db Mon Sep 17 00:00:00 2001 From: Amerik Singh Date: Thu, 17 Oct 2024 21:46:19 +0200 Subject: [PATCH] OA: RL PID Controller (SISO) #1050 --- .../control/controllers/oa_pid_controller.py | 149 +++++++++++++----- 1 file changed, 109 insertions(+), 40 deletions(-) diff --git a/src/mlpro/oa/control/controllers/oa_pid_controller.py b/src/mlpro/oa/control/controllers/oa_pid_controller.py index ab31fa1a6..6ca81ed23 100644 --- a/src/mlpro/oa/control/controllers/oa_pid_controller.py +++ b/src/mlpro/oa/control/controllers/oa_pid_controller.py @@ -1,21 +1,66 @@ +## ------------------------------------------------------------------------------------------------- +## -- Project : MLPro - The integrative middleware framework for standardized machine learning +## -- Package : mlpro.oa.control.controllers +## -- Module : oa_pid_controller.py +## ------------------------------------------------------------------------------------------------- +## -- History : +## -- yyyy-mm-dd Ver. Auth. Description +## -- 2024-09-01 0.0.0 DA Creation +## -- 2024-09-26 0.0.0 ASP Implementation RLPID, RLPIDOffPolicy +## -- 2024-10-17 0.0.0 ASP -Refactoring class RLPID +## -- -change class name RLPIDOffPolicy to OffPolicyRLPID +## ------------------------------------------------------------------------------------------------- + +""" +Ver. 0.0.0 (2024-09-01) + +This module provides an implementation of a OA PID controller. + +""" + from mlpro.bf.control.controllers.pid_controller import PIDController from mlpro.bf.ml.basics import * from mlpro.rl import Policy,SARSElement from mlpro.bf.control import ControlVariable, ControlledVariable -class RLPID(Policy): - def __init__(self, p_observation_space: MSpace, p_action_space: MSpace,pid_controller:PIDController ,policy:Policy,p_id=None, p_buffer_size: int = 1, p_ada: bool = True, p_visualize: bool = False, p_logging=Log.C_LOG_ALL ): + +## ------------------------------------------------------------------------------------------------- +## ------------------------------------------------------------------------------------------------- +class RLPID(Policy): + """ + Policy class for closed loop control + + Parameters + ---------- + p_pid_controller : PIDController, + Instance of PIDController + p_policy : Policy + Policy algorithm + """ + +## ------------------------------------------------------------------------------------------------- + def __init__(self, + p_observation_space: MSpace, + p_action_space: MSpace, + p_pid_controller:PIDController , + p_policy:Policy, + p_id=None, + p_buffer_size: int = 1, + p_ada: bool = True, + p_visualize: bool = False, + p_logging=Log.C_LOG_ALL ): + super().__init__(p_observation_space, p_action_space, p_id, p_buffer_size, p_ada, p_visualize, p_logging) - self._pid_controller = pid_controller - self._policy = policy - self._old_crtl_variable = None #None + self._pid_controller = p_pid_controller + self._policy = p_policy + self._crtl_variable_old = None #None self._action_space = p_action_space - ## ------------------------------------------------------------------------------------------------- +## ------------------------------------------------------------------------------------------------- def _init_hyperparam(self, **p_par): # 1 Create a dispatcher hyperparameter tuple for the RLPID policy @@ -28,28 +73,35 @@ def _init_hyperparam(self, **p_par): except: pass - ## ------------------------------------------------------------------------------------------------- +## ------------------------------------------------------------------------------------------------- def get_hyperparam(self) -> HyperParamTuple: return self._policy.get_hyperparam() - ## ------------------------------------------------------------------------------------------------- + +## ------------------------------------------------------------------------------------------------- def _update_hyperparameters(self) -> bool: - return self._policy._update_hyperparameters() + return self._policy._update_hyperparameters() + - ## ------------------------------------------------------------------------------------------------- - +## ------------------------------------------------------------------------------------------------- def _adapt(self, p_sars_elem: SARSElement) -> bool: + """ + Parameters: + p_sars_elem:SARSElement + Element of a SARSBuffer + """ is_adapted = False + #get SARS Elements p_state,p_crtl_variable,p_reward,p_state_new=tuple(p_sars_elem.get_data().values()) - if self._old_crtl_variable is not None: + if self._crtl_variable_old is not None: # create a new SARS p_sars_elem_new = SARSElement(p_state=p_state, - p_action=self._old_crtl_variable, + p_action=self._crtl_variable_old, p_reward=p_reward, p_state_new=p_state_new) @@ -57,10 +109,10 @@ def _adapt(self, p_sars_elem: SARSElement) -> bool: is_adapted = self._policy._adapt(p_sars_elem_new) # compute new action with new error value (second s of Sars element) - self._old_crtl_variable=self._policy.compute_action(p_obs=p_state_new) + self._crtl_variable_old=self._policy.compute_action(p_obs=p_state_new) #get the pid paramter values - pid_values = self._old_crtl_variable.get_feature_data().get_values() + pid_values = self._crtl_variable_old.get_feature_data().get_values() #set paramter pid self._pid_controller.set_parameter(p_param={"Kp":pid_values[0], @@ -68,12 +120,12 @@ def _adapt(self, p_sars_elem: SARSElement) -> bool: "Tv":pid_values[2]}) else: #compute new action with new error value (second s of Sars element) - self._old_crtl_variable = self._policy.compute_action(p_obs=p_state_new) + self._crtl_variable_old = self._policy.compute_action(p_obs=p_state_new) return is_adapted - ## ------------------------------------------------------------------------------------------------- +## ------------------------------------------------------------------------------------------------- def compute_action(self, p_obs: ControlledVariable) -> ControlVariable: #get action @@ -82,49 +134,66 @@ def compute_action(self, p_obs: ControlledVariable) -> ControlVariable: #return action return control_variable -class RLPIDOffPolicy(Policy): - def __init__(self, p_observation_space: MSpace, p_action_space: MSpace,pid_controller:PIDController ,p_id=None, p_buffer_size: int = 1, p_ada: bool = True, p_visualize: bool = False, p_logging=Log.C_LOG_ALL ): - super().__init__(p_observation_space, p_action_space, p_id, p_buffer_size, p_ada, p_visualize, p_logging) + + + +## ------------------------------------------------------------------------------------------------- +## ------------------------------------------------------------------------------------------------- +class OffPolicyRLPID(Policy): + """ + OFF Policy class for closed loop control + + Parameters + ---------- + p_pid_controller : PIDController, + Instance of PIDController + """ + + def __init__(self, p_observation_space: MSpace, + p_action_space: MSpace, + pid_controller:PIDController + ,p_id=None, + p_buffer_size: int = 1, + p_ada: bool = True, + p_visualize: bool = False, + p_logging=Log.C_LOG_ALL ): + + super().__init__(p_observation_space, + p_action_space, + p_id, + p_buffer_size, + p_ada, p_visualize, p_logging) self._pid_controller = pid_controller self._action_space = p_action_space.get_dim(p_id=0) - - - def _init_hyperparam(self, **p_par): - # create hp - # 1- add dim (Kp,Tn,Tv) in hp space - # 2- create hp tuple from hp space - # 3- set hp tuple values +## ------------------------------------------------------------------------------------------------- + def _init_hyperparam(self, **p_par): - # 1 + # 1 add dim (Kp,Tn,Tv) in hp space self._hyperparam_space.add_dim( self._action_space.get_dim(p_id=0)) self._hyperparam_space.add_dim(self._action_space.get_dim(p_id=1)) self._hyperparam_space.add_dim(self._action_space.get_dim(p_id=2)) - # 2 + # # 2- create hp tuple from hp space self._hyperparam_tuple = HyperParamTuple( p_set=self._hyperparam_space ) - #3 + # 3- set hp tuple values self._hyperparam_tuple.set_values(self._pid_controller.get_parameter_values()) - ## ------------------------------------------------------------------------------------------------- - - +## ------------------------------------------------------------------------------------------------- def _adapt(self, p_sars_elem: SARSElement) -> bool: return False + - ## ------------------------------------------------------------------------------------------------- - - +## ------------------------------------------------------------------------------------------------- def compute_action(self, p_obs: ControlledVariable) -> ControlVariable: - #get action - action=self._pid_controller.compute_action(p_ctrl_error=p_obs) + #compute control variable + control_variable=self._pid_controller.compute_output(p_ctrl_error=p_obs) - #return action - return action + return control_variable