diff --git a/api/tacticalrmm/core/views.py b/api/tacticalrmm/core/views.py index a20d33f905..670806171b 100644 --- a/api/tacticalrmm/core/views.py +++ b/api/tacticalrmm/core/views.py @@ -448,6 +448,19 @@ def post(self, request): instance_type = serializer.validated_data.get("run_instance_type", None) instance_id = serializer.validated_data.get("run_instance_id", None) + # make sure user has permissions to run against client/agent/site + if instance_type == "agent": + if not _has_perm_on_agent(request.user, instance_id): + raise PermissionDenied() + + elif instance_type == "site": + if not _has_perm_on_site(request.user, instance_id): + raise PermissionDenied() + + elif instance_type == "client": + if not _has_perm_on_client(request.user, instance_id): + raise PermissionDenied() + result, replaced_url, replaced_body = run_test_url_rest_action( url=url, body=body, @@ -457,6 +470,17 @@ def post(self, request): instance_id=instance_id, ) + AuditLog.audit_url_action_test( + username=request.user.username, + url=url, + body=replaced_body, + headers=headers, + instance_type=instance_type, + instance_id=instance_id, + debug_info={"ip": request._client_ip}, + ) + + return Response({"url": replaced_url, "result": result, "body": replaced_body}) @@ -476,7 +500,12 @@ def post(self, request): shell=request.data["shell"], ) - # TODO add auditing + AuditLog.audit_test_script_run( + username=request.user.username, + agent=None, + script_body=request.data["code"], + debug_info={"ip": request._client_ip}, + ) ret = { "stdout": stdout, diff --git a/api/tacticalrmm/logs/models.py b/api/tacticalrmm/logs/models.py index f747b9a8b3..975353c001 100644 --- a/api/tacticalrmm/logs/models.py +++ b/api/tacticalrmm/logs/models.py @@ -1,5 +1,5 @@ from abc import abstractmethod -from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union, cast, Literal from django.db import models @@ -159,6 +159,26 @@ def audit_script_run( debug_info=debug_info, ) + @staticmethod + def audit_test_script_run( + username: str, + script_body: str, + agent: Optional["Agent"], + debug_info: Dict[Any, Any] = {}, + ) -> None: + + debug_info["script_body"] = script_body + + AuditLog.objects.create( + agent=agent.hostname if agent else "Tactical RMM Server", + agent_id=agent.agent_id if agent else "N/A", + username=username, + object_type=AuditObjType.AGENT, + action=AuditActionType.EXEC_SCRIPT, + message=f'{username} tested a script on {agent.hostname if agent else "Tactical RMM Server"}', + debug_info=debug_info, + ) + @staticmethod def audit_user_failed_login(username: str, debug_info: Dict[Any, Any] = {}) -> None: AuditLog.objects.create( @@ -214,6 +234,42 @@ def audit_url_action( debug_info=debug_info, ) + @staticmethod + def audit_url_action_test( + username: str, + url: str, + body: str, + headers: Dict[Any, Any], + instance_type: Literal["agent", "client", "site"], + instance_id: int, + debug_info: Dict[Any, Any] = {}, + ) -> None: + from agents.models import Agent + + debug_info["body"] = body + debug_info["headers"] = headers + + if instance_type == "agent": + instance = Agent.objects.get(pk=instance_id) + + elif instance_type == "site": + instance = Site.objects.get(pk=instance_id) + + elif instance_type == "client": + instance = Client.objects.get(pk=instance_id) + + name = instance.hostname if isinstance(instance, Agent) else instance.name + classname = type(instance).__name__ + AuditLog.objects.create( + username=username, + agent=name if isinstance(instance, Agent) else None, + agent_id=instance.agent_id if isinstance(instance, Agent) else None, + object_type=classname.lower(), + action=AuditActionType.URL_ACTION, + message=f"{username} tested url action: {url} on {classname}: {name}", + debug_info=debug_info, + ) + @staticmethod def audit_bulk_action( username: str, diff --git a/api/tacticalrmm/scripts/views.py b/api/tacticalrmm/scripts/views.py index 71a7f53407..f3c1c75fdd 100644 --- a/api/tacticalrmm/scripts/views.py +++ b/api/tacticalrmm/scripts/views.py @@ -12,6 +12,7 @@ from tacticalrmm.helpers import notify_error from .models import Script, ScriptSnippet +from logs.models import AuditLog from .permissions import ScriptsPerms from .serializers import ( ScriptSerializer, @@ -153,12 +154,14 @@ def post(self, request, agent_id): agent, request.data["shell"], request.data["env_vars"] ) + script_body = Script.replace_with_snippets(request.data["code"]) + data = { "func": "runscriptfull", "timeout": request.data["timeout"], "script_args": parsed_args, "payload": { - "code": Script.replace_with_snippets(request.data["code"]), + "code": script_body, "shell": request.data["shell"], }, "run_as_user": request.data["run_as_user"], @@ -171,6 +174,13 @@ def post(self, request, agent_id): agent.nats_cmd(data, timeout=request.data["timeout"], wait=True) ) + AuditLog.audit_test_script_run( + username=request.user.username, + agent=None, + script_body=script_body, + debug_info={"ip": request._client_ip}, + ) + return Response(r)