Skip to content

Commit

Permalink
feat:pipeline plugin factory
Browse files Browse the repository at this point in the history
loads pipeline plugins from config 🎉

no longer tied to adapt/padatious
  • Loading branch information
JarbasAl committed Nov 2, 2024
1 parent e4473b4 commit fdb1275
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 193 deletions.
238 changes: 82 additions & 156 deletions ovos_core/intent_services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,23 @@
# limitations under the License.
#
from collections import defaultdict
from typing import Tuple, Callable, Union
from typing import Tuple, Callable, List, Union, Dict

from ocp_pipeline.opm import OCPPipelineMatcher
from ovos_adapt.opm import AdaptPipeline
from ovos_bus_client.message import Message
from ovos_bus_client.session import SessionManager
from ovos_bus_client.util import get_message_lang
from ovos_commonqa.opm import CommonQAService

from ovos_config.config import Configuration
from ovos_config.locale import setup_locale, get_valid_languages
from ovos_core.intent_services.converse_service import ConverseService
from ovos_core.intent_services.fallback_service import FallbackService
from ovos_core.intent_services.stop_service import StopService
from ovos_core.transformers import MetadataTransformersService, UtteranceTransformersService
from ovos_plugin_manager.pipeline import OVOSPipelineFactory
from ovos_plugin_manager.templates.pipeline import PipelineMatch, IntentHandlerMatch
from ovos_utils.lang import standardize_lang_tag
from ovos_utils.log import LOG, log_deprecation, deprecated
from ovos_utils.metrics import Stopwatch
from padacioso.opm import PadaciosoPipeline as PadaciosoService


class IntentService:
Expand All @@ -48,15 +46,9 @@ def __init__(self, bus, config=None):
# Dictionary for translating a skill id to a name
self.skill_names = {}

self._adapt_service = None
self._padatious_service = None
self._padacioso_service = None
self._fallback = None
self._converse = None
self._common_qa = None
self._stop = None
self._ocp = None
self._load_pipeline_plugins()
for p in OVOSPipelineFactory.get_installed_pipelines():
LOG.debug(f"Found pipeline: {p}")
OVOSPipelineFactory.create(use_cache=True, bus=self.bus) # pre-loa

self.utterance_plugins = UtteranceTransformersService(bus)
self.metadata_plugins = MetadataTransformersService(bus)
Expand All @@ -75,49 +67,11 @@ def __init__(self, bus, config=None):
# Intents API
self.registered_vocab = []
self.bus.on('intent.service.intent.get', self.handle_get_intent)
self.bus.on('intent.service.skills.get', self.handle_get_skills)
self.bus.on('mycroft.skills.loaded', self.update_skill_name_dict)

# internal, track skills that call self.deactivate to avoid reactivating them again
self._deactivations = defaultdict(list)
self.bus.on('intent.service.skills.deactivate', self._handle_deactivate)

def _load_pipeline_plugins(self):
# TODO - replace with plugin loader from OPM
self._adapt_service = AdaptPipeline(bus=self.bus, config=self.config.get("adapt", {}))
if "padatious" not in self.config:
self.config["padatious"] = Configuration().get("padatious", {})
try:
if self.config["padatious"].get("disabled"):
LOG.info("padatious forcefully disabled in config")
else:
from ovos_padatious.opm import PadatiousPipeline
self._padatious_service = PadatiousPipeline(self.bus, self.config["padatious"])
except ImportError:
LOG.error(f'Failed to create padatious intent handlers, padatious not installed')

self._padacioso_service = PadaciosoService(self.bus, self.config["padatious"])
self._fallback = FallbackService(self.bus)
self._converse = ConverseService(self.bus)
self._common_qa = CommonQAService(self.bus, self.config.get("common_query"))
self._stop = StopService(self.bus)
self._ocp = OCPPipelineMatcher(self.bus, config=self.config.get("OCP", {}))

def update_skill_name_dict(self, message):
"""Messagebus handler, updates dict of id to skill name conversions."""
self.skill_names[message.data['id']] = message.data['name']

def get_skill_name(self, skill_id):
"""Get skill name from skill ID.
Args:
skill_id: a skill id as encoded in Intent handlers.
Returns:
(str) Skill name or the skill id if the skill wasn't found
"""
return self.skill_names.get(skill_id, skill_id)

def _handle_transformers(self, message):
"""
Pipe utterance through transformer plugins to get more metadata.
Expand Down Expand Up @@ -159,56 +113,31 @@ def disambiguate_lang(message):

return default_lang

def get_pipeline(self, skips=None, session=None) -> Tuple[str, Callable]:
def get_pipeline(self, skips=None, session=None, skip_stage_matchers=False) -> List[Tuple[str, Callable]]:
"""return a list of matcher functions ordered by priority
utterances will be sent to each matcher in order until one can handle the utterance
the list can be configured in mycroft.conf under intents.pipeline,
in the future plugins will be supported for users to define their own pipeline"""
skips = skips or []

session = session or SessionManager.get()

# Create matchers
# TODO - from plugins
if self._padatious_service is None:
if any("padatious" in p for p in session.pipeline):
LOG.warning("padatious is not available! using padacioso in it's place, "
"intent matching will be extremely slow in comparison")
padatious_matcher = self._padacioso_service
else:
padatious_matcher = self._padatious_service

matchers = {
"converse": self._converse.converse_with_skills,
"stop_high": self._stop.match_stop_high,
"stop_medium": self._stop.match_stop_medium,
"stop_low": self._stop.match_stop_low,
"padatious_high": padatious_matcher.match_high,
"padacioso_high": self._padacioso_service.match_high,
"adapt_high": self._adapt_service.match_high,
"common_qa": self._common_qa.match,
"fallback_high": self._fallback.high_prio,
"padatious_medium": padatious_matcher.match_medium,
"padacioso_medium": self._padacioso_service.match_medium,
"adapt_medium": self._adapt_service.match_medium,
"fallback_medium": self._fallback.medium_prio,
"padatious_low": padatious_matcher.match_low,
"padacioso_low": self._padacioso_service.match_low,
"adapt_low": self._adapt_service.match_low,
"fallback_low": self._fallback.low_prio
}
if self._ocp is not None:
matchers.update({
"ocp_high": self._ocp.match_high,
"ocp_medium": self._ocp.match_medium,
"ocp_fallback": self._ocp.match_fallback,
"ocp_legacy": self._ocp.match_legacy})
skips = skips or []
pipeline = [k for k in session.pipeline if k not in skips]
if any(k not in matchers for k in pipeline):
if skips:
log_deprecation("'skips' kwarg has been deprecated!", "1.0.0")
skips = [OVOSPipelineFactory._MAP.get(p, p) for p in skips]

pipeline = [OVOSPipelineFactory._MAP.get(p, p) for p in session.pipeline
if p not in skips]

matchers = OVOSPipelineFactory.create(pipeline, use_cache=True, bus=self.bus,
skip_stage_matchers=skip_stage_matchers)

if any(k[0] not in pipeline for k in matchers):
LOG.warning(f"Requested some invalid pipeline components! "
f"filtered {[k for k in pipeline if k not in matchers]}")
pipeline = [k for k in pipeline if k in matchers]
LOG.debug(f"Session pipeline: {pipeline}")
return [(k, matchers[k]) for k in pipeline]
return matchers

@staticmethod
def _validate_session(message, lang):
Expand Down Expand Up @@ -446,11 +375,7 @@ def handle_get_intent(self, message):
sess = SessionManager.get(message)

# Loop through the matching functions until a match is found.
for pipeline, match_func in self.get_pipeline(skips=["converse",
"fallback_high",
"fallback_medium",
"fallback_low"],
session=sess):
for pipeline, match_func in self.get_pipeline(session=sess, skip_stage_matchers=True):
match = match_func([utterance], lang, message)
if match:
if match.match_type:
Expand All @@ -467,142 +392,143 @@ def handle_get_intent(self, message):
self.bus.emit(message.reply("intent.service.intent.reply",
{"intent": None}))

def handle_get_skills(self, message):
"""Send registered skills to caller.
Argument:
message: query message to reply to.
"""
self.bus.emit(message.reply("intent.service.skills.reply",
{"skills": self.skill_names}))

def shutdown(self):
self.utterance_plugins.shutdown()
self.metadata_plugins.shutdown()
self._adapt_service.shutdown()
self._padacioso_service.shutdown()
if self._padatious_service:
self._padatious_service.shutdown()
self._common_qa.shutdown()
self._converse.shutdown()
self._fallback.shutdown()
if self._ocp:
self._ocp.shutdown()
OVOSPipelineFactory.shutdown()

self.bus.remove('recognizer_loop:utterance', self.handle_utterance)
self.bus.remove('add_context', self.handle_add_context)
self.bus.remove('remove_context', self.handle_remove_context)
self.bus.remove('clear_context', self.handle_clear_context)
self.bus.remove('mycroft.skills.loaded', self.update_skill_name_dict)
self.bus.remove('intent.service.intent.get', self.handle_get_intent)
self.bus.remove('intent.service.skills.get', self.handle_get_skills)

###########
# DEPRECATED STUFF
@property
def registered_intents(self):
log_deprecation("direct access to self.adapt_service is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
lang = get_message_lang()
return [parser.__dict__
for parser in self._adapt_service.engines[lang].intent_parsers]
def registered_intents(self) -> List:
"""DEPRECATED"""
log_deprecation("'registered_intents' moved to ovos-adapt-pipeline-plugin", "1.0.0")
return []

@property
def adapt_service(self):
def skill_names(self) -> Dict:
"""DEPRECATED"""
log_deprecation("skill names have been replaced by skill_id", "1.0.0")
return {}

@skill_names.setter
def skill_names(self, v):
log_deprecation("skill names have been replaced by skill_id", "1.0.0")

@deprecated("skill names have been replaced by skill_id", "1.0.0")
def update_skill_name_dict(self, message):
"""DEPRECATED"""

@deprecated("skill names have been replaced by skill_id", "1.0.0")
def get_skill_name(self, skill_id):
"""DEPRECATED"""
return skill_id

@deprecated("skill names have been replaced by skill_id", "1.0.0")
def handle_get_skills(self, message):
"""DEPRECATED"""

@property
def adapt_service(self) -> None:
"""DEPRECATED"""
log_deprecation("direct access to self.adapt_service is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
return self._adapt_service
return None

@property
def padatious_service(self):
def padatious_service(self) -> None:
"""DEPRECATED"""
log_deprecation("direct access to self.padatious_service is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
return self._padatious_service
return None

@property
def padacioso_service(self):
def padacioso_service(self)-> None:
"""DEPRECATED"""
log_deprecation("direct access to self.padacioso_service is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
return self._padacioso_service
return None

@property
def fallback(self):

def fallback(self) -> None:
"""DEPRECATED"""
log_deprecation("direct access to self.fallback is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
return self._fallback
return None

@property
def converse(self):
def converse(self) -> None:
"""DEPRECATED"""
log_deprecation("direct access to self.converse is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
return self._converse
return None

@property
def common_qa(self):
def common_qa(self) -> None:
"""DEPRECATED"""
log_deprecation("direct access to self.common_qa is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
return self._common_qa
return None

@property
def stop(self):
def stop(self) -> None:
"""DEPRECATED"""
log_deprecation("direct access to self.stop is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
return self._stop
return None

@property
def ocp(self):
def ocp(self) -> None:
"""DEPRECATED"""
log_deprecation("direct access to self.ocp is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
return self._ocp
return None

@adapt_service.setter
def adapt_service(self, value):
log_deprecation("direct access to self.adapt_service is deprecated, "
log_deprecation("NOT SET! direct access to self.adapt_service is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
self._adapt_service = value

@padatious_service.setter
def padatious_service(self, value):
log_deprecation("direct access to self.padatious_service is deprecated, "
log_deprecation("NOT SET! direct access to self.padatious_service is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
self._padatious_service = value

@padacioso_service.setter
def padacioso_service(self, value):
log_deprecation("direct access to self.padacioso_service is deprecated, "
log_deprecation("NOT SET! direct access to self.padacioso_service is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
self._padacioso_service = value

@fallback.setter
def fallback(self, value):
log_deprecation("direct access to self.fallback is deprecated, "
log_deprecation("NOT SET! direct access to self.fallback is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
self._fallback = value

@converse.setter
def converse(self, value):
log_deprecation("direct access to self.converse is deprecated, "
log_deprecation("NOT SET! direct access to self.converse is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
self._converse = value

@common_qa.setter
def common_qa(self, value):
log_deprecation("direct access to self.common_qa is deprecated, "
log_deprecation("NOT SET! direct access to self.common_qa is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
self._common_qa = value

@stop.setter
def stop(self, value):
log_deprecation("direct access to self.stop is deprecated, "
log_deprecation("NOT SET! direct access to self.stop is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
self._stop = value

@ocp.setter
def ocp(self, value):
log_deprecation("direct access to self.ocp is deprecated, "
log_deprecation("NOT SET! direct access to self.ocp is deprecated, "
"pipelines are in the progress of being replaced with plugins", "1.0.0")
self._ocp = value

@deprecated("handle_get_adapt moved to adapt service, this method does nothing", "1.0.0")
def handle_get_adapt(self, message: Message):
Expand Down
Loading

0 comments on commit fdb1275

Please sign in to comment.