Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend information from leapp saved to leappdb #847

Merged
merged 4 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions leapp/actors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from leapp.models.error_severity import ErrorSeverity
from leapp.tags import Tag
from leapp.utils import get_api_models, path
from leapp.utils.audit import store_dialog
from leapp.utils.i18n import install_translation_for_actor
from leapp.utils.meta import get_flattened_subclasses
from leapp.workflows.api import WorkflowAPI
Expand Down Expand Up @@ -122,12 +123,17 @@ def get_answers(self, dialog):
:return: dictionary with the requested answers, None if not a defined dialog
"""
self._messaging.register_dialog(dialog, self)
answer = None
if dialog in type(self).dialogs:
if self.skip_dialogs:
# non-interactive mode of operation
return self._messaging.get_answers(dialog)
return self._messaging.request_answers(dialog)
return None
answer = self._messaging.get_answers(dialog)
else:
answer = self._messaging.request_answers(dialog)

store_dialog(dialog, answer)

return answer

def show_message(self, message):
"""
Expand Down Expand Up @@ -285,6 +291,7 @@ def get_actor_tool_path(self, name):
def run(self, *args):
""" Runs the actor calling the method :py:func:`process`. """
os.environ['LEAPP_CURRENT_ACTOR'] = self.name

try:
self.process(*args)
except StopActorExecution:
Expand Down
1 change: 1 addition & 0 deletions leapp/dialogs/dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,5 @@ def request_answers(self, store, renderer):
self._store = store
renderer.render(self)
self._store = None

return store.get(self.scope, {})
3 changes: 3 additions & 0 deletions leapp/messaging/answerstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def get(self, scope, fallback=None):
# NOTE(ivasilev) self.storage.get() will return a DictProxy. To avoid TypeError during later
# JSON serialization a copy() should be invoked to get a shallow copy of data
answer = self._storage.get(scope, fallback).copy()

# NOTE(dkubek): It is possible that we do not need to save the 'answer'
# here as it is being stored with dialog question right after query
create_audit_entry('dialog-answer', {'scope': scope, 'fallback': fallback, 'answer': answer})
return answer

Expand Down
199 changes: 197 additions & 2 deletions leapp/utils/audit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import os
import sqlite3
import hashlib

from leapp.config import get_config
from leapp.compat import string_types
Expand Down Expand Up @@ -221,6 +222,72 @@ def do_store(self, connection):
self._data_source_id = cursor.fetchone()[0]


class Metadata(Storable):
"""
Metadata of an Entity
"""

def __init__(self, metadata=None, hash_id=None):
"""
:param metadata: Entity metadata
:type metadata: str
:param hash_id: SHA256 hash in hexadecimal representation of data
:type hash_id: str
"""
super(Metadata, self).__init__()
self.metadata = metadata
self.hash_id = hash_id

def do_store(self, connection):
super(Metadata, self).do_store(connection)
connection.execute('INSERT OR IGNORE INTO metadata (hash, metadata) VALUES(?, ?)',
(self.hash_id, self.metadata))


class Entity(Host):
"""
Leapp framework entity (e.g. actor, workflow)
"""

def __init__(self, context=None, hostname=None, kind=None, metadata=None, name=None):
"""
:param context: The execution context
:type context: str
:param hostname: Hostname of the system that produced the entry
:type hostname: str
:param kind: Kind of the entity for which metadata is stored
:type kind: str
:param metadata: Entity metadata
:type metadata: :py:class:`leapp.utils.audit.Metadata`
:param name: Name of the entity
:type name: str
"""
super(Entity, self).__init__(context=context, hostname=hostname)
self.kind = kind
self.name = name
self.metadata = metadata
self._entity_id = None

@property
def entity_id(self):
"""
Returns the id of the entry, which is only set when already stored.
:return: Integer id or None
"""
return self._entity_id

def do_store(self, connection):
super(Entity, self).do_store(connection)
self.metadata.do_store(connection)
connection.execute(
'INSERT OR IGNORE INTO entity (context, kind, name, metadata_hash) VALUES(?, ?, ?, ?)',
(self.context, self.kind, self.name, self.metadata.hash_id))
cursor = connection.execute(
'SELECT id FROM entity WHERE context = ? AND kind = ? AND name = ?',
(self.context, self.kind, self.name))
self._entity_id = cursor.fetchone()[0]


class Message(DataSource):
def __init__(self, stamp=None, msg_type=None, topic=None, data=None, actor=None, phase=None,
hostname=None, context=None):
Expand Down Expand Up @@ -267,6 +334,47 @@ def do_store(self, connection):
self._message_id = cursor.lastrowid


class Dialog(DataSource):
"""
Stores information about dialog questions and their answers
"""

def __init__(self, scope=None, data=None, actor=None, phase=None, hostname=None, context=None):
"""
:param scope: Dialog scope
:type scope: str
:param data: Payload data
:type data: dict
:param actor: Name of the actor that triggered the entry
:type actor: str
:param phase: In which phase of the workflow execution the dialog was triggered
:type phase: str
:param hostname: Hostname of the system that produced the message
:type hostname: str
:param context: The execution context
:type context: str
"""
super(Dialog, self).__init__(actor=actor, phase=phase, hostname=hostname, context=context)
self.scope = scope or ''
self.data = data
self._dialog_id = None

@property
def dialog_id(self):
"""
Returns the id of the entry, which is only set when already stored.
:return: Integer id or None
"""
return self._dialog_id

def do_store(self, connection):
super(Dialog, self).do_store(connection)
cursor = connection.execute(
'INSERT OR IGNORE INTO dialog (context, scope, data, data_source_id) VALUES(?, ?, ?, ?)',
(self.context, self.scope, json.dumps(self.data), self.data_source_id))
self._dialog_id = cursor.lastrowid


def create_audit_entry(event, data, message=None):
"""
Create an audit entry
Expand All @@ -291,10 +399,10 @@ def get_audit_entry(event, context):
"""
Retrieve audit entries stored in the database for the given context

:param context: The execution context
:type context: str
:param event: Event type identifier
:type event: str
:param context: The execution context
:type context: str
:return: list of dicts with id, time stamp, actor and phase fields
"""
with get_connection(None) as conn:
Expand Down Expand Up @@ -470,3 +578,90 @@ def get_checkpoints(context):
''', (context, _AUDIT_CHECKPOINT_EVENT))
cursor.row_factory = dict_factory
return cursor.fetchall()


def store_dialog(dialog, answer):
"""
Store ``dialog`` with accompanying ``answer``.

:param dialog: instance of a workflow to store.
:type dialog: :py:class:`leapp.dialogs.Dialog`
:param answer: Answer to for each component of the dialog
:type answer: dict
"""

component_keys = ('key', 'label', 'description', 'default', 'value', 'reason')
dialog_keys = ('title', 'reason') # + 'components'

tmp = dialog.serialize()
data = {
'components': [dict((key, component[key]) for key in component_keys) for component in tmp['components']],

# NOTE(dkubek): Storing answer here is redundant as it is already
# being stored in audit when we query from the answerstore, however,
# this keeps the information coupled with the question more closely
'answer': answer
}
data.update((key, tmp[key]) for key in dialog_keys)

e = Dialog(
scope=dialog.scope,
data=data,
context=os.environ['LEAPP_EXECUTION_ID'],
actor=os.environ['LEAPP_CURRENT_ACTOR'],
phase=os.environ['LEAPP_CURRENT_PHASE'],
hostname=os.environ['LEAPP_HOSTNAME'],
)
e.store()

return e


def store_workflow_metadata(workflow):
"""
Store the metadata of the given ``workflow`` into the database.

:param workflow: Workflow to store.
:type workflow: :py:class:`leapp.workflows.Workflow`
"""

metadata = json.dumps(type(workflow).serialize(), sort_keys=True)
metadata_hash_id = hashlib.sha256(metadata.encode('utf-8')).hexdigest()

md = Metadata(metadata=metadata, hash_id=metadata_hash_id)
ent = Entity(kind='workflow',
name=workflow.name,
context=os.environ['LEAPP_EXECUTION_ID'],
hostname=os.environ['LEAPP_HOSTNAME'],
metadata=md)
ent.store()


def store_actor_metadata(actor_definition, phase):
"""
Store the metadata of the given actor given as an ``actor_definition``
object into the database.

:param actor_definition: Actor to store
:type actor_definition: :py:class:`leapp.repository.actor_definition.ActorDefinition`
"""

_metadata = dict(actor_definition.discover())
_metadata.update({
'consumes': sorted(model.__name__ for model in _metadata.get('consumes', ())),
'produces': sorted(model.__name__ for model in _metadata.get('produces', ())),
'tags': sorted(tag.__name__ for tag in _metadata.get('tags', ())),
})
_metadata['phase'] = phase

actor_metadata_fields = ('class_name', 'name', 'description', 'phase', 'tags', 'consumes', 'produces', 'path')
metadata = json.dumps({field: _metadata[field] for field in actor_metadata_fields}, sort_keys=True)
metadata_hash_id = hashlib.sha256(metadata.encode('utf-8')).hexdigest()

md = Metadata(metadata=metadata, hash_id=metadata_hash_id)
ent = Entity(kind='actor',
name=actor_definition.name,
context=os.environ['LEAPP_EXECUTION_ID'],
hostname=os.environ['LEAPP_HOSTNAME'],
metadata=md)
ent.store()
22 changes: 22 additions & 0 deletions leapp/utils/audit/contextclone.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ def _dup_audit(db, message, data_source, newcontext, oldcontext):
return lookup


def _dup_metadata(db, newcontext, oldcontext):
for row in _fetch_table_for_context(db, 'metadata', oldcontext):
# id context kind name metadata
row_id, kind, name, metadata = _row_tuple(row, 'id', 'kind', 'name', 'metadata')

db.execute(
'INSERT INTO metadata (context, kind, name, metadata) VALUES(?, ?, ?, ?)',
(newcontext, kind, name, metadata))


def _dup_dialog(db, data_source, newcontext, oldcontext):
for row in _fetch_table_for_context(db, 'dialog', oldcontext):
# id context scope data data_source_id
row_id, scope, data, data_source_id = _row_tuple(row, 'id', 'scope', 'data', 'data_source_id')

db.execute(
'INSERT INTO dialog (context, scope, data, data_source_id) VALUES(?, ?, ?, ?)',
(newcontext, scope, data, data_source[data_source_id]))


def clone_context(oldcontext, newcontext, use_db=None):
# Enter transaction - In case of any exception automatic rollback is issued
# and it is automatically committed if there was no exception
Expand All @@ -82,3 +102,5 @@ def clone_context(oldcontext, newcontext, use_db=None):
message = _dup_message(db=db, data_source=data_source, newcontext=newcontext, oldcontext=oldcontext)
# Last clone message entries and use the lookup table generated by the data_source and message duplications
_dup_audit(db=db, data_source=data_source, message=message, newcontext=newcontext, oldcontext=oldcontext)
_dup_metadata(db=db, oldcontext=oldcontext, newcontext=newcontext)
_dup_dialog(db=db, data_source=data_source, oldcontext=oldcontext, newcontext=newcontext)
22 changes: 20 additions & 2 deletions leapp/workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from leapp.messaging.commands import SkipPhasesUntilCommand
from leapp.tags import ExperimentalTag
from leapp.utils import reboot_system
from leapp.utils.audit import checkpoint, get_errors
from leapp.utils.audit import checkpoint, get_errors, create_audit_entry, store_workflow_metadata, store_actor_metadata
from leapp.utils.meta import with_metaclass, get_flattened_subclasses
from leapp.utils.output import display_status_current_phase, display_status_current_actor
from leapp.workflows.phases import Phase
Expand Down Expand Up @@ -165,7 +165,7 @@ def __init__(self, logger=None, auto_reboot=False):
self.description = self.description or type(self).__doc__

for phase in self.phases:
phase.filter.tags += (self.tag,)
phase.filter.tags += (self.tag,) if self.tag not in phase.filter.tags else ()
self._phase_actors.append((
phase,
# filters all actors with the give tags
Expand Down Expand Up @@ -279,6 +279,8 @@ def run(self, context=None, until_phase=None, until_actor=None, skip_phases_unti
self.log.info('Starting workflow execution: {name} - ID: {id}'.format(
name=self.name, id=os.environ['LEAPP_EXECUTION_ID']))

store_workflow_metadata(self)

skip_phases_until = (skip_phases_until or '').lower()
needle_phase = until_phase or ''
needle_stage = None
Expand All @@ -295,6 +297,12 @@ def run(self, context=None, until_phase=None, until_actor=None, skip_phases_unti
if phase and not self.is_valid_phase(phase):
raise CommandError('Phase {phase} does not exist in the workflow'.format(phase=phase))

# Save metadata of all discovered actors
for phase in self._phase_actors:
for stage in phase[1:]:
for actor in stage.actors:
store_actor_metadata(actor, phase[0].name)

self._stop_after_phase_requested = False
for phase in self._phase_actors:
os.environ['LEAPP_CURRENT_PHASE'] = phase[0].name
Expand Down Expand Up @@ -332,10 +340,12 @@ def run(self, context=None, until_phase=None, until_actor=None, skip_phases_unti
display_status_current_actor(actor, designation=designation)
current_logger.info("Executing actor {actor} {designation}".format(designation=designation,
actor=actor.name))

messaging = InProcessMessaging(config_model=config_model, answer_store=self._answer_store)
messaging.load(actor.consumes)
instance = actor(logger=current_logger, messaging=messaging,
config_model=config_model, skip_dialogs=skip_dialogs)

try:
instance.run()
except BaseException as exc:
Expand All @@ -346,6 +356,14 @@ def run(self, context=None, until_phase=None, until_actor=None, skip_phases_unti
current_logger.error('Actor {actor} has crashed: {trace}'.format(actor=actor.name,
trace=exc.exception_info))
raise
finally:
# Set and unset the enviromental variable so that audit
# associates the entry with the correct data source
os.environ['LEAPP_CURRENT_ACTOR'] = actor.name
create_audit_entry(
event='actor-exit-status',
data={'exit_status': 1 if self._unhandled_exception else 0})
os.environ.pop('LEAPP_CURRENT_ACTOR')

self._stop_after_phase_requested = messaging.stop_after_phase or self._stop_after_phase_requested

Expand Down
Loading
Loading