Skip to content

Commit

Permalink
Extend information from leapp saved to leappdb (#847)
Browse files Browse the repository at this point in the history
This commit combines multiple additions to the leapp database.

First addition is tracking of entity metadata. The `Metadata` model
stores the metadata of entities such as `Actor` or `Workflow`. This data
is stored in a new table `metadata` of the `leapp.db` file.

  1. metadata of *discovered* actors.

     For an actor, the metadata stored contain:

	`class_name` 	- the name of the actor class
	`name`		- the name given to the actor
	`description`	- the actor's description
	`phase` 	- phase of execution of the actor
	`tags`		- names of any tags associated with an actor
	`consumes` 	- list of all messages the actor consumes
	`produces`	- list of all messages the actor produces
	`path`		- the path to the actor source file

  2. workflow metadata.

     For a workflow, the metadata stored contain:

       `name`		- name of the workflow
       `short_name` 	- short name of the workflow
       `tag`		- workflow tag
       `description`	- workflow description
       `phases` 	- all phases associated with the workflow

Next addition is tracking of dialog question. Previously leapp was not
able to detect the actual question asked from the user as it could be
generated dynamically when actor is called and depend on the
configuration of the user's system.

Last addition includes storing the actor exit status. Exit status is now
saved as an audit event `actor-exit-status`. Exit status 0 represents
successful execution or `StopActorExecution`/`StopActorExecutionError`,
while 1 indicates an unexpected and unhandled exception.

These changes collectively improve the metadata handling capabilities
of, ensuring accurate storage and retrieval of essential information for
various entities.

Jira: OAMG-8402
  • Loading branch information
dkubek committed May 13, 2024
1 parent b64c44b commit 91b309a
Show file tree
Hide file tree
Showing 24 changed files with 949 additions and 9 deletions.
13 changes: 10 additions & 3 deletions leapp/actors/__init__.py
Expand Up @@ -10,6 +10,7 @@
from leapp.models.error_severity import ErrorSeverity
from leapp.tags import Tag
from leapp.utils import get_api_models, path
from leapp.utils.audit import store_dialog
from leapp.utils.i18n import install_translation_for_actor
from leapp.utils.meta import get_flattened_subclasses
from leapp.workflows.api import WorkflowAPI
Expand Down Expand Up @@ -122,12 +123,17 @@ def get_answers(self, dialog):
:return: dictionary with the requested answers, None if not a defined dialog
"""
self._messaging.register_dialog(dialog, self)
answer = None
if dialog in type(self).dialogs:
if self.skip_dialogs:
# non-interactive mode of operation
return self._messaging.get_answers(dialog)
return self._messaging.request_answers(dialog)
return None
answer = self._messaging.get_answers(dialog)
else:
answer = self._messaging.request_answers(dialog)

store_dialog(dialog, answer)

return answer

def show_message(self, message):
"""
Expand Down Expand Up @@ -285,6 +291,7 @@ def get_actor_tool_path(self, name):
def run(self, *args):
""" Runs the actor calling the method :py:func:`process`. """
os.environ['LEAPP_CURRENT_ACTOR'] = self.name

try:
self.process(*args)
except StopActorExecution:
Expand Down
1 change: 1 addition & 0 deletions leapp/dialogs/dialog.py
Expand Up @@ -114,4 +114,5 @@ def request_answers(self, store, renderer):
self._store = store
renderer.render(self)
self._store = None

return store.get(self.scope, {})
3 changes: 3 additions & 0 deletions leapp/messaging/answerstore.py
Expand Up @@ -117,6 +117,9 @@ def get(self, scope, fallback=None):
# NOTE(ivasilev) self.storage.get() will return a DictProxy. To avoid TypeError during later
# JSON serialization a copy() should be invoked to get a shallow copy of data
answer = self._storage.get(scope, fallback).copy()

# NOTE(dkubek): It is possible that we do not need to save the 'answer'
# here as it is being stored with dialog question right after query
create_audit_entry('dialog-answer', {'scope': scope, 'fallback': fallback, 'answer': answer})
return answer

Expand Down
199 changes: 197 additions & 2 deletions leapp/utils/audit/__init__.py
Expand Up @@ -3,6 +3,7 @@
import json
import os
import sqlite3
import hashlib

from leapp.config import get_config
from leapp.compat import string_types
Expand Down Expand Up @@ -221,6 +222,72 @@ def do_store(self, connection):
self._data_source_id = cursor.fetchone()[0]


class Metadata(Storable):
"""
Metadata of an Entity
"""

def __init__(self, metadata=None, hash_id=None):
"""
:param metadata: Entity metadata
:type metadata: str
:param hash_id: SHA256 hash in hexadecimal representation of data
:type hash_id: str
"""
super(Metadata, self).__init__()
self.metadata = metadata
self.hash_id = hash_id

def do_store(self, connection):
super(Metadata, self).do_store(connection)
connection.execute('INSERT OR IGNORE INTO metadata (hash, metadata) VALUES(?, ?)',
(self.hash_id, self.metadata))


class Entity(Host):
"""
Leapp framework entity (e.g. actor, workflow)
"""

def __init__(self, context=None, hostname=None, kind=None, metadata=None, name=None):
"""
:param context: The execution context
:type context: str
:param hostname: Hostname of the system that produced the entry
:type hostname: str
:param kind: Kind of the entity for which metadata is stored
:type kind: str
:param metadata: Entity metadata
:type metadata: :py:class:`leapp.utils.audit.Metadata`
:param name: Name of the entity
:type name: str
"""
super(Entity, self).__init__(context=context, hostname=hostname)
self.kind = kind
self.name = name
self.metadata = metadata
self._entity_id = None

@property
def entity_id(self):
"""
Returns the id of the entry, which is only set when already stored.
:return: Integer id or None
"""
return self._entity_id

def do_store(self, connection):
super(Entity, self).do_store(connection)
self.metadata.do_store(connection)
connection.execute(
'INSERT OR IGNORE INTO entity (context, kind, name, metadata_hash) VALUES(?, ?, ?, ?)',
(self.context, self.kind, self.name, self.metadata.hash_id))
cursor = connection.execute(
'SELECT id FROM entity WHERE context = ? AND kind = ? AND name = ?',
(self.context, self.kind, self.name))
self._entity_id = cursor.fetchone()[0]


class Message(DataSource):
def __init__(self, stamp=None, msg_type=None, topic=None, data=None, actor=None, phase=None,
hostname=None, context=None):
Expand Down Expand Up @@ -267,6 +334,47 @@ def do_store(self, connection):
self._message_id = cursor.lastrowid


class Dialog(DataSource):
"""
Stores information about dialog questions and their answers
"""

def __init__(self, scope=None, data=None, actor=None, phase=None, hostname=None, context=None):
"""
:param scope: Dialog scope
:type scope: str
:param data: Payload data
:type data: dict
:param actor: Name of the actor that triggered the entry
:type actor: str
:param phase: In which phase of the workflow execution the dialog was triggered
:type phase: str
:param hostname: Hostname of the system that produced the message
:type hostname: str
:param context: The execution context
:type context: str
"""
super(Dialog, self).__init__(actor=actor, phase=phase, hostname=hostname, context=context)
self.scope = scope or ''
self.data = data
self._dialog_id = None

@property
def dialog_id(self):
"""
Returns the id of the entry, which is only set when already stored.
:return: Integer id or None
"""
return self._dialog_id

def do_store(self, connection):
super(Dialog, self).do_store(connection)
cursor = connection.execute(
'INSERT OR IGNORE INTO dialog (context, scope, data, data_source_id) VALUES(?, ?, ?, ?)',
(self.context, self.scope, json.dumps(self.data), self.data_source_id))
self._dialog_id = cursor.lastrowid


def create_audit_entry(event, data, message=None):
"""
Create an audit entry
Expand All @@ -291,10 +399,10 @@ def get_audit_entry(event, context):
"""
Retrieve audit entries stored in the database for the given context
:param context: The execution context
:type context: str
:param event: Event type identifier
:type event: str
:param context: The execution context
:type context: str
:return: list of dicts with id, time stamp, actor and phase fields
"""
with get_connection(None) as conn:
Expand Down Expand Up @@ -470,3 +578,90 @@ def get_checkpoints(context):
''', (context, _AUDIT_CHECKPOINT_EVENT))
cursor.row_factory = dict_factory
return cursor.fetchall()


def store_dialog(dialog, answer):
"""
Store ``dialog`` with accompanying ``answer``.
:param dialog: instance of a workflow to store.
:type dialog: :py:class:`leapp.dialogs.Dialog`
:param answer: Answer to for each component of the dialog
:type answer: dict
"""

component_keys = ('key', 'label', 'description', 'default', 'value', 'reason')
dialog_keys = ('title', 'reason') # + 'components'

tmp = dialog.serialize()
data = {
'components': [dict((key, component[key]) for key in component_keys) for component in tmp['components']],

# NOTE(dkubek): Storing answer here is redundant as it is already
# being stored in audit when we query from the answerstore, however,
# this keeps the information coupled with the question more closely
'answer': answer
}
data.update((key, tmp[key]) for key in dialog_keys)

e = Dialog(
scope=dialog.scope,
data=data,
context=os.environ['LEAPP_EXECUTION_ID'],
actor=os.environ['LEAPP_CURRENT_ACTOR'],
phase=os.environ['LEAPP_CURRENT_PHASE'],
hostname=os.environ['LEAPP_HOSTNAME'],
)
e.store()

return e


def store_workflow_metadata(workflow):
"""
Store the metadata of the given ``workflow`` into the database.
:param workflow: Workflow to store.
:type workflow: :py:class:`leapp.workflows.Workflow`
"""

metadata = json.dumps(type(workflow).serialize(), sort_keys=True)
metadata_hash_id = hashlib.sha256(metadata.encode('utf-8')).hexdigest()

md = Metadata(metadata=metadata, hash_id=metadata_hash_id)
ent = Entity(kind='workflow',
name=workflow.name,
context=os.environ['LEAPP_EXECUTION_ID'],
hostname=os.environ['LEAPP_HOSTNAME'],
metadata=md)
ent.store()


def store_actor_metadata(actor_definition, phase):
"""
Store the metadata of the given actor given as an ``actor_definition``
object into the database.
:param actor_definition: Actor to store
:type actor_definition: :py:class:`leapp.repository.actor_definition.ActorDefinition`
"""

_metadata = dict(actor_definition.discover())
_metadata.update({
'consumes': sorted(model.__name__ for model in _metadata.get('consumes', ())),
'produces': sorted(model.__name__ for model in _metadata.get('produces', ())),
'tags': sorted(tag.__name__ for tag in _metadata.get('tags', ())),
})
_metadata['phase'] = phase

actor_metadata_fields = ('class_name', 'name', 'description', 'phase', 'tags', 'consumes', 'produces', 'path')
metadata = json.dumps({field: _metadata[field] for field in actor_metadata_fields}, sort_keys=True)
metadata_hash_id = hashlib.sha256(metadata.encode('utf-8')).hexdigest()

md = Metadata(metadata=metadata, hash_id=metadata_hash_id)
ent = Entity(kind='actor',
name=actor_definition.name,
context=os.environ['LEAPP_EXECUTION_ID'],
hostname=os.environ['LEAPP_HOSTNAME'],
metadata=md)
ent.store()
22 changes: 22 additions & 0 deletions leapp/utils/audit/contextclone.py
Expand Up @@ -70,6 +70,26 @@ def _dup_audit(db, message, data_source, newcontext, oldcontext):
return lookup


def _dup_metadata(db, newcontext, oldcontext):
for row in _fetch_table_for_context(db, 'metadata', oldcontext):
# id context kind name metadata
row_id, kind, name, metadata = _row_tuple(row, 'id', 'kind', 'name', 'metadata')

db.execute(
'INSERT INTO metadata (context, kind, name, metadata) VALUES(?, ?, ?, ?)',
(newcontext, kind, name, metadata))


def _dup_dialog(db, data_source, newcontext, oldcontext):
for row in _fetch_table_for_context(db, 'dialog', oldcontext):
# id context scope data data_source_id
row_id, scope, data, data_source_id = _row_tuple(row, 'id', 'scope', 'data', 'data_source_id')

db.execute(
'INSERT INTO dialog (context, scope, data, data_source_id) VALUES(?, ?, ?, ?)',
(newcontext, scope, data, data_source[data_source_id]))


def clone_context(oldcontext, newcontext, use_db=None):
# Enter transaction - In case of any exception automatic rollback is issued
# and it is automatically committed if there was no exception
Expand All @@ -82,3 +102,5 @@ def clone_context(oldcontext, newcontext, use_db=None):
message = _dup_message(db=db, data_source=data_source, newcontext=newcontext, oldcontext=oldcontext)
# Last clone message entries and use the lookup table generated by the data_source and message duplications
_dup_audit(db=db, data_source=data_source, message=message, newcontext=newcontext, oldcontext=oldcontext)
_dup_metadata(db=db, oldcontext=oldcontext, newcontext=newcontext)
_dup_dialog(db=db, data_source=data_source, oldcontext=oldcontext, newcontext=newcontext)
22 changes: 20 additions & 2 deletions leapp/workflows/__init__.py
Expand Up @@ -11,7 +11,7 @@
from leapp.messaging.commands import SkipPhasesUntilCommand
from leapp.tags import ExperimentalTag
from leapp.utils import reboot_system
from leapp.utils.audit import checkpoint, get_errors
from leapp.utils.audit import checkpoint, get_errors, create_audit_entry, store_workflow_metadata, store_actor_metadata
from leapp.utils.meta import with_metaclass, get_flattened_subclasses
from leapp.utils.output import display_status_current_phase, display_status_current_actor
from leapp.workflows.phases import Phase
Expand Down Expand Up @@ -165,7 +165,7 @@ def __init__(self, logger=None, auto_reboot=False):
self.description = self.description or type(self).__doc__

for phase in self.phases:
phase.filter.tags += (self.tag,)
phase.filter.tags += (self.tag,) if self.tag not in phase.filter.tags else ()
self._phase_actors.append((
phase,
# filters all actors with the give tags
Expand Down Expand Up @@ -279,6 +279,8 @@ def run(self, context=None, until_phase=None, until_actor=None, skip_phases_unti
self.log.info('Starting workflow execution: {name} - ID: {id}'.format(
name=self.name, id=os.environ['LEAPP_EXECUTION_ID']))

store_workflow_metadata(self)

skip_phases_until = (skip_phases_until or '').lower()
needle_phase = until_phase or ''
needle_stage = None
Expand All @@ -295,6 +297,12 @@ def run(self, context=None, until_phase=None, until_actor=None, skip_phases_unti
if phase and not self.is_valid_phase(phase):
raise CommandError('Phase {phase} does not exist in the workflow'.format(phase=phase))

# Save metadata of all discovered actors
for phase in self._phase_actors:
for stage in phase[1:]:
for actor in stage.actors:
store_actor_metadata(actor, phase[0].name)

self._stop_after_phase_requested = False
for phase in self._phase_actors:
os.environ['LEAPP_CURRENT_PHASE'] = phase[0].name
Expand Down Expand Up @@ -332,10 +340,12 @@ def run(self, context=None, until_phase=None, until_actor=None, skip_phases_unti
display_status_current_actor(actor, designation=designation)
current_logger.info("Executing actor {actor} {designation}".format(designation=designation,
actor=actor.name))

messaging = InProcessMessaging(config_model=config_model, answer_store=self._answer_store)
messaging.load(actor.consumes)
instance = actor(logger=current_logger, messaging=messaging,
config_model=config_model, skip_dialogs=skip_dialogs)

try:
instance.run()
except BaseException as exc:
Expand All @@ -346,6 +356,14 @@ def run(self, context=None, until_phase=None, until_actor=None, skip_phases_unti
current_logger.error('Actor {actor} has crashed: {trace}'.format(actor=actor.name,
trace=exc.exception_info))
raise
finally:
# Set and unset the enviromental variable so that audit
# associates the entry with the correct data source
os.environ['LEAPP_CURRENT_ACTOR'] = actor.name
create_audit_entry(
event='actor-exit-status',
data={'exit_status': 1 if self._unhandled_exception else 0})
os.environ.pop('LEAPP_CURRENT_ACTOR')

self._stop_after_phase_requested = messaging.stop_after_phase or self._stop_after_phase_requested

Expand Down

0 comments on commit 91b309a

Please sign in to comment.