diff --git a/README.md b/README.md index 856466d..0091d6e 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # Assimilator - the best Python patterns for the best projects +![](/images/logo.png) + ## Install now * `pip install py_assimilator` diff --git a/assimilator/__init__.py b/assimilator/__init__.py index 63846f2..1d9c335 100644 --- a/assimilator/__init__.py +++ b/assimilator/__init__.py @@ -24,4 +24,4 @@ def optional_dependencies(error: str = "ignore"): import assimilator.kafka with optional_dependencies(): - import assimilator.redis + import assimilator.redis_ as redis diff --git a/assimilator/alchemy/database/error_wrapper.py b/assimilator/alchemy/database/error_wrapper.py new file mode 100644 index 0000000..07a73c0 --- /dev/null +++ b/assimilator/alchemy/database/error_wrapper.py @@ -0,0 +1,16 @@ +from sqlalchemy.exc import NoResultFound, IntegrityError, SQLAlchemyError + +from assimilator.core.database.exceptions import DataLayerError, NotFoundError, InvalidQueryError +from assimilator.core.patterns.error_wrapper import ErrorWrapper + + +class AlchemyErrorWrapper(ErrorWrapper): + def __init__(self): + super(AlchemyErrorWrapper, self).__init__(error_mappings={ + NoResultFound: NotFoundError, + IntegrityError: InvalidQueryError, + SQLAlchemyError: DataLayerError, + }, default_error=DataLayerError) + + +__all__ = ['AlchemyErrorWrapper'] diff --git a/assimilator/alchemy/database/repository.py b/assimilator/alchemy/database/repository.py index 2b93b21..d75028b 100644 --- a/assimilator/alchemy/database/repository.py +++ b/assimilator/alchemy/database/repository.py @@ -1,58 +1,125 @@ -from typing import Type +from typing import Type, Union, Optional, TypeVar, Collection, Dict -from sqlalchemy.exc import NoResultFound -from sqlalchemy.orm import Query +from sqlalchemy import func, select, update, delete +from sqlalchemy.orm import Session, Query +from sqlalchemy.inspection import inspect +from assimilator.alchemy.database.error_wrapper import AlchemyErrorWrapper +from assimilator.core.database.exceptions import InvalidQueryError from assimilator.alchemy.database.specifications import AlchemySpecificationList -from assimilator.core.database import BaseRepository, Specification, SpecificationList -from assimilator.core.database.exceptions import NotFoundError +from assimilator.core.database import Repository, SpecificationList, \ + LazyCommand, SpecificationType, make_lazy +from assimilator.core.patterns.error_wrapper import ErrorWrapper -class AlchemyRepository(BaseRepository): +AlchemyModelT = TypeVar("AlchemyModelT") + + +class AlchemyRepository(Repository): + session: Session + model: Type[AlchemyModelT] + def __init__( - self, session, initial_query: Query = None, specifications: Type[SpecificationList] = AlchemySpecificationList, + self, + session: Session, + model: Type[AlchemyModelT], + initial_query: Query = None, + specifications: Type[SpecificationList] = AlchemySpecificationList, + error_wrapper: Optional[ErrorWrapper] = None, ): super(AlchemyRepository, self).__init__( session=session, - initial_query=initial_query, + model=model, + initial_query=initial_query if initial_query is not None else select(model), specifications=specifications, + error_wrapper=error_wrapper or AlchemyErrorWrapper(), ) - def _execute_query(self, query): - return self.session.execute(query) - - def get(self, *specifications: Specification, lazy=False): - try: - data = self._execute_query(self._apply_specifications(specifications)) - if lazy: - return data - - return data.one()[0] + @make_lazy + def get( + self, + *specifications: SpecificationType, + lazy: bool = False, + initial_query: Query = None, + ) -> Union[AlchemyModelT, LazyCommand[AlchemyModelT]]: + query = self._apply_specifications( + query=self.get_initial_query(initial_query), + specifications=specifications, + ) + return self.session.execute(query).one()[0] + + @make_lazy + def filter( + self, + *specifications: SpecificationType, + lazy: bool = False, + initial_query: Query = None, + ) -> Union[Collection[AlchemyModelT], LazyCommand[Collection[AlchemyModelT]]]: + query = self._apply_specifications( + query=self.get_initial_query(initial_query), + specifications=specifications, + ) + return [result[0] for result in self.session.execute(query)] + + def update(self, obj: Optional[AlchemyModelT] = None, *specifications, **update_values) -> None: + if specifications: + if not update_values: + raise InvalidQueryError( + "You did not provide any update_values " + "to the update() yet provided specifications" + ) + + query: Query = self._apply_specifications( + query=self.get_initial_query(update(self.model)), + specifications=specifications, + ) + self.session.execute(query.values(update_values)) + elif obj is not None: + self.session.add(obj) + + def save(self, obj: Optional[AlchemyModelT] = None, **data) -> AlchemyModelT: + if obj is None: + obj = self.model(**data) - except NoResultFound as exc: - raise NotFoundError(exc) + self.session.add(obj) + return obj - def filter(self, *specifications: Specification, lazy=False): - data = self._execute_query(self._apply_specifications(specifications)) - if lazy: - return data + def refresh(self, obj: AlchemyModelT) -> None: + inspection = inspect(obj) - return [result[0] for result in data] + if inspection.transient or inspection.pending: + return + elif inspection.detached: + self.session.add(obj) - def update(self, obj): - """ We don't do anything, as the object is going to be updated with the obj.key = value """ + self.session.refresh(obj) - def save(self, obj): - self.session.add(obj) + def delete(self, obj: Optional[AlchemyModelT] = None, *specifications: SpecificationType) -> None: + if specifications: + query: Query = self._apply_specifications( + query=self.get_initial_query(delete(self.model)), + specifications=specifications, + ) + self.session.execute(query) + elif obj is not None: + self.session.delete(obj) + + def is_modified(self, obj: AlchemyModelT) -> bool: + return self.session.is_modified(obj) - def refresh(self, obj): - self.session.refresh(obj) + @make_lazy + def count(self, *specifications: SpecificationType, lazy: bool = False) -> Union[LazyCommand[int], int]: + primary_keys = inspect(self.model).primary_key - def delete(self, obj): - self.session.delete(obj) + if not primary_keys: + raise InvalidQueryError("Your repository model does not have" + " any primary keys. We cannot use count()") - def is_modified(self, obj): - return self.session.is_modified(obj) + return self.get( + *specifications, + lazy=False, + query=select(func.count(getattr(self.model, primary_keys[0].name))), + ) __all__ = [ diff --git a/assimilator/alchemy/database/specifications.py b/assimilator/alchemy/database/specifications.py index e05eb2d..a79fbfa 100644 --- a/assimilator/alchemy/database/specifications.py +++ b/assimilator/alchemy/database/specifications.py @@ -1,28 +1,68 @@ -from typing import Iterable, Collection +from typing import Collection, Optional from sqlalchemy.orm import Query +from sqlalchemy.sql.operators import is_ +from sqlalchemy import column, desc -from assimilator.core.database.specifications import Specification, specification, SpecificationList +from assimilator.core.database.specifications import specification, SpecificationList,\ + filter_parameter_parser -class AlchemySpecification(Specification): - def apply(self, query: Query) -> Query: - return super(AlchemySpecification, self).apply(query) +alchemy_filter_mappings = { + "__gt": lambda field_, val: column(field_) > val, + "__gte": lambda field_, val: column(field_) >= val, + "__lt": lambda field_, val: column(field_) < val, + "__lte": lambda field_, val: column(field_) <= val, + "__not": lambda field_, val: column(field_) != val, + "__is": lambda field_, val: is_(column(field_, val)), +} @specification def alchemy_filter(*filters, query: Query, **filters_by) -> Query: + filters = list(filters) + + for field, value in dict(filters_by).items(): + _, parsed_filter = filter_parameter_parser( + field=field, + value=value, + filter_mappings=alchemy_filter_mappings, + ) + + if parsed_filter is not None: + filters.append(parsed_filter) + del filters_by[field] + return query.filter(*filters).filter_by(**filters_by) @specification def alchemy_order(*clauses: str, query: Query) -> Query: - return query.order_by(*clauses) + parsed_clauses = [] + + for clause in clauses: + if clause.startswith("-"): + parsed_clauses.append(desc(column(clause[1:]))) + else: + parsed_clauses.append(clause) + + return query.order_by(*parsed_clauses) @specification -def alchemy_paginate(limit: int, offset: int, query: Query) -> Query: - return query.limit(limit).offset(offset) +def alchemy_paginate( + *, + limit: Optional[int] = None, + offset: Optional[int] = None, + query: Query, +) -> Query: + + if limit is not None: + query = query.limit(limit) + if offset is not None: + query = query.offset(offset) + + return query @specification @@ -45,7 +85,6 @@ class AlchemySpecificationList(SpecificationList): __all__ = [ 'AlchemySpecificationList', - 'AlchemySpecification', 'alchemy_filter', 'alchemy_order', 'alchemy_paginate', diff --git a/assimilator/alchemy/database/unit_of_work.py b/assimilator/alchemy/database/unit_of_work.py index 4e3a5bf..b93fd2c 100644 --- a/assimilator/alchemy/database/unit_of_work.py +++ b/assimilator/alchemy/database/unit_of_work.py @@ -1,10 +1,18 @@ -from sqlalchemy.exc import IntegrityError - +from assimilator.alchemy.database.repository import AlchemyRepository +from assimilator.alchemy.database.error_wrapper import AlchemyErrorWrapper from assimilator.core.database.unit_of_work import UnitOfWork -from assimilator.core.database.exceptions import InvalidQueryError +from assimilator.core.patterns.error_wrapper import ErrorWrapper class AlchemyUnitOfWork(UnitOfWork): + repository: AlchemyRepository + + def __init__(self, repository: AlchemyRepository, error_wrapper: ErrorWrapper = None): + super(AlchemyUnitOfWork, self).__init__( + repository=repository, + error_wrapper=error_wrapper or AlchemyErrorWrapper(), + ) + def begin(self): self.repository.session.begin() @@ -15,10 +23,7 @@ def close(self): pass def commit(self): - try: - self.repository.session.commit() - except IntegrityError as exc: - raise InvalidQueryError(exc) + self.repository.session.commit() __all__ = [ diff --git a/assimilator/alchemy/events/database/repository.py b/assimilator/alchemy/events/database/repository.py index 8ed83b0..f9ab337 100644 --- a/assimilator/alchemy/events/database/repository.py +++ b/assimilator/alchemy/events/database/repository.py @@ -1,9 +1,31 @@ +from typing import Type, Optional + +from sqlalchemy import Table +from sqlalchemy.orm import Query + +from assimilator.alchemy.database.specifications import AlchemySpecificationList from assimilator.alchemy.database.repository import AlchemyRepository +from assimilator.core.database import SpecificationList +from assimilator.core.patterns.error_wrapper import ErrorWrapper class AlchemyOutboxRepository(AlchemyRepository): - def __init__(self, event_model, session, initial_query=None): - super(AlchemyOutboxRepository, self).__init__(session, initial_query) + def __init__( + self, + session, + event_model: Type[Table], + model: Type[Table], + initial_query: Optional[Query] = None, + specifications: Type[SpecificationList] = AlchemySpecificationList, + error_wrapper: ErrorWrapper = None, + ): + super(AlchemyOutboxRepository, self).__init__( + session=session, + initial_query=initial_query, + model=model, + specifications=specifications, + error_wrapper=error_wrapper, + ) self.event_model = event_model def save(self, obj): diff --git a/assimilator/alchemy/events/outbox_relay.py b/assimilator/alchemy/events/outbox_relay.py index 278c9ed..0c65ef2 100644 --- a/assimilator/alchemy/events/outbox_relay.py +++ b/assimilator/alchemy/events/outbox_relay.py @@ -1,9 +1,9 @@ from sqlalchemy import Column, BigInteger, Text, DateTime -from assimilator.core.events import Event +from assimilator.core.events.events import Event from assimilator.core.database.unit_of_work import UnitOfWork from assimilator.core.events import OutboxRelay -from assimilator.core.events.events_bus import EventBus +from assimilator.core.events.events_bus import EventProducer def create_outbox_event_model(Base): @@ -24,22 +24,23 @@ def __init__(self, event: Event, *args, **kwargs): class AlchemyOutboxRelay(OutboxRelay): - def __init__(self, outbox_event_model, uow: UnitOfWork, event_bus: EventBus): - super(AlchemyOutboxRelay, self).__init__(uow=uow, event_bus=event_bus) + def __init__(self, outbox_event_model, uow: UnitOfWork, producer: EventProducer): + super(AlchemyOutboxRelay, self).__init__(uow=uow, producer=producer) self.outbox_event_model = outbox_event_model def start(self): - while True: - with self.uow: - events = self.uow.repository.filter() + with self.producer: + while True: + with self.uow: + events = self.uow.repository.filter() - for event in events: - self.event_bus.produce(event) + for event in events: + self.producer.produce(event) - self.acknowledge(events) - self.uow.commit() + self.acknowledge(events) + self.uow.commit() - self.delay_function() + self.delay_function() def delay_function(self): raise NotImplementedError("delay_function() is not implemented") diff --git a/assimilator/core/database/__init__.py b/assimilator/core/database/__init__.py index c7ad9ac..8bd88e3 100644 --- a/assimilator/core/database/__init__.py +++ b/assimilator/core/database/__init__.py @@ -1,3 +1,4 @@ from assimilator.core.database.repository import * from assimilator.core.database.specifications import * from assimilator.core.database.unit_of_work import * +from assimilator.core.database.exceptions import * diff --git a/assimilator/core/database/exceptions.py b/assimilator/core/database/exceptions.py index c055049..8da175f 100644 --- a/assimilator/core/database/exceptions.py +++ b/assimilator/core/database/exceptions.py @@ -1,12 +1,12 @@ class DataLayerError(Exception): - pass + """ Any error related to Repository, UnitOfWork, Model """ class NotFoundError(DataLayerError): - pass + """ Results are not found """ class InvalidQueryError(DataLayerError): - pass + """ The query to the data storage supplied was invalid """ diff --git a/assimilator/core/database/repository.py b/assimilator/core/database/repository.py index 475624b..c2722d5 100644 --- a/assimilator/core/database/repository.py +++ b/assimilator/core/database/repository.py @@ -1,69 +1,127 @@ +from functools import wraps from abc import ABC, abstractmethod -from typing import Union, Any, Optional, Callable, Iterable, Type +from typing import TypeVar, Callable, Generic, final, \ + Union, Optional, Iterable, Type, Collection +from assimilator.core.patterns import ErrorWrapper +from assimilator.core.patterns.lazy_command import LazyCommand from assimilator.core.database.specifications import SpecificationList, SpecificationType -class LazyCommand: - def __init__(self, command: Callable, *args, **kwargs): - self.command = command - self.args = args - self.kwargs = kwargs +def make_lazy(func: Callable): - def __call__(self): - return self.command(*self.args, **self.kwargs) + @wraps(func) + def make_lazy_wrapper( + self, + *specifications: SpecificationType, + lazy: bool = False, + initial_query: QueryT = None, + ): + if lazy: + return LazyCommand(func, self, *specifications, lazy=False, initial_query=initial_query) + return func(self, *specifications, lazy=False, initial_query=initial_query) + return make_lazy_wrapper -class BaseRepository(ABC): - def __init__(self, session: Any, specifications: Type[SpecificationList], initial_query: Optional[Any] = None): + +QueryT = TypeVar("QueryT") +ModelT = TypeVar("ModelT") +SessionT = TypeVar("SessionT") + + +class Repository(Generic[SessionT, ModelT, QueryT], ABC): + def __init__( + self, + session: SessionT, + model: Type[ModelT], + specifications: Type[SpecificationList], + initial_query: Optional[SessionT] = None, + error_wrapper: Optional[ErrorWrapper] = None, + ): self.session = session - self.initial_query = initial_query + self.model = model + self.__initial_query: QueryT = initial_query self.specifications = specifications - def _get_initial_query(self): - if self.initial_query is not None: - return self.initial_query + self.error_wrapper = error_wrapper or ErrorWrapper() + self.get = self.error_wrapper.decorate(self.get) + self.filter = self.error_wrapper.decorate(self.filter) + self.save = self.error_wrapper.decorate(self.save) + self.delete = self.error_wrapper.decorate(self.delete) + self.update = self.error_wrapper.decorate(self.update) + self.is_modified = self.error_wrapper.decorate(self.is_modified) + self.refresh = self.error_wrapper.decorate(self.refresh) + self.count = self.error_wrapper.decorate(self.count) + + @property + def specs(self) -> Type[SpecificationList]: + """ That property is used to shorten the full name of the self.specifications. """ + return self.specifications + + def get_initial_query(self, override_query: Optional[QueryT] = None) -> QueryT: + if override_query is not None: + return override_query + elif self.__initial_query is not None: + return self.__initial_query else: raise NotImplementedError("You must either pass the initial query or define get_initial_query()") - def _apply_specifications(self, specifications: Iterable[SpecificationType]) -> Any: - query = self._get_initial_query() - + @final + def _apply_specifications(self, query: QueryT, specifications: Iterable[SpecificationType]) -> QueryT: for specification in specifications: query = specification(query) return query @abstractmethod - def get(self, *specifications: SpecificationType, lazy: bool = False) -> Union[LazyCommand, Any]: + def get( + self, + *specifications: SpecificationType, + lazy: bool = False, + initial_query: QueryT = None, + ) -> Union[ModelT, LazyCommand[ModelT]]: raise NotImplementedError("get() is not implemented()") @abstractmethod - def filter(self, *specifications: SpecificationType, lazy: bool = False) -> Union[LazyCommand, Iterable]: + def filter( + self, + *specifications: SpecificationType, + lazy: bool = False, + initial_query: QueryT = None, + ) -> Union[Collection[ModelT], LazyCommand[Collection[ModelT]]]: raise NotImplementedError("filter() is not implemented()") @abstractmethod - def save(self, obj) -> None: + def save(self, obj: Optional[ModelT] = None, **obj_data) -> ModelT: raise NotImplementedError("save() is not implemented in the repository") @abstractmethod - def delete(self, obj) -> None: + def delete(self, obj: Optional[ModelT] = None, *specifications: SpecificationType) -> None: raise NotImplementedError("delete() is not implemented in the repository") @abstractmethod - def update(self, obj) -> None: + def update(self, obj: Optional[ModelT] = None, *specifications: SpecificationType, **update_values) -> None: raise NotImplementedError("update() is not implemented in the repository") @abstractmethod - def is_modified(self, obj) -> bool: + def is_modified(self, obj: ModelT) -> bool: raise NotImplementedError("is_modified() is not implemented in the repository") @abstractmethod - def refresh(self, obj) -> None: + def refresh(self, obj: ModelT) -> None: raise NotImplementedError("refresh() is not implemented in the repository") + @abstractmethod + def count( + self, + *specifications: SpecificationType, + lazy: bool = False, + ) -> Union[LazyCommand[int], int]: + raise NotImplementedError("count() is not implemented in the repository") + __all__ = [ 'LazyCommand', - 'BaseRepository', + 'Repository', + 'make_lazy', ] diff --git a/assimilator/core/database/specifications.py b/assimilator/core/database/specifications.py index 5d38ab2..2657690 100644 --- a/assimilator/core/database/specifications.py +++ b/assimilator/core/database/specifications.py @@ -1,25 +1,41 @@ from abc import ABC, abstractmethod -from typing import Callable, Union +from typing import Callable, Union, TypeVar, Dict, Any, Tuple, Optional from functools import wraps +QueryT = TypeVar("QueryT") + + class Specification(ABC): @abstractmethod - def apply(self, query): + def apply(self, query: QueryT) -> QueryT: raise NotImplementedError("Specification must specify apply()") - def __call__(self, query): + def __call__(self, query: QueryT) -> QueryT: return self.apply(query) +def filter_parameter_parser( + field: str, + value: Any, + filter_mappings: Dict[str, Callable], +) -> Tuple[Optional[str], Optional[Any]]: + for filter_ending, filter_func in filter_mappings.items(): + if field.endswith(filter_ending): + return filter_ending, filter_func(field.replace(filter_ending, ""), value) + + return None, None + + def specification(func: Callable): def create_specification(*args, **kwargs): @wraps(func) - def created_specification(query): + def created_specification(query: QueryT) -> QueryT: return func(*args, **kwargs, query=query) return created_specification + return create_specification @@ -38,4 +54,5 @@ class SpecificationList: 'Specification', 'specification', 'SpecificationType', + 'filter_parameter_parser', ] diff --git a/assimilator/core/database/unit_of_work.py b/assimilator/core/database/unit_of_work.py index 12dd059..637b353 100644 --- a/assimilator/core/database/unit_of_work.py +++ b/assimilator/core/database/unit_of_work.py @@ -1,11 +1,22 @@ from abc import ABC, abstractmethod +from typing import Optional -from assimilator.core.database.repository import BaseRepository +from assimilator.core.database.repository import Repository +from assimilator.core.patterns import ErrorWrapper class UnitOfWork(ABC): - def __init__(self, repository: BaseRepository): + error_wrapper: ErrorWrapper = ErrorWrapper() + + def __init__(self, repository: Repository, error_wrapper: Optional[ErrorWrapper] = None): self.repository = repository + if error_wrapper is not None: + self.error_wrapper = error_wrapper + + self.begin = self.error_wrapper.decorate(self.begin) + self.rollback = self.error_wrapper.decorate(self.rollback) + self.commit = self.error_wrapper.decorate(self.commit) + self.close = self.error_wrapper.decorate(self.close) @abstractmethod def begin(self): diff --git a/assimilator/core/events/__init__.py b/assimilator/core/events/__init__.py index 0e082f0..960a193 100644 --- a/assimilator/core/events/__init__.py +++ b/assimilator/core/events/__init__.py @@ -1,3 +1,4 @@ from assimilator.core.events.outbox_relay import * from assimilator.core.events.events import * from assimilator.core.events.exceptions import * +from assimilator.core.events.events_bus import * diff --git a/assimilator/core/events/outbox_relay.py b/assimilator/core/events/outbox_relay.py index 3f37ff1..6752c46 100644 --- a/assimilator/core/events/outbox_relay.py +++ b/assimilator/core/events/outbox_relay.py @@ -1,18 +1,20 @@ from abc import ABC +from typing import Iterable from assimilator.core.database.unit_of_work import UnitOfWork -from assimilator.core.events.events_bus import EventBus +from assimilator.core.events.events import Event +from assimilator.core.events.events_bus import EventProducer class OutboxRelay(ABC): - def __init__(self, uow: UnitOfWork, event_bus: EventBus): + def __init__(self, uow: UnitOfWork, producer: EventProducer): self.uow = uow - self.event_bus = event_bus + self.producer = producer def start(self): raise NotImplementedError("start() is not implemented") - def acknowledge(self, events): + def acknowledge(self, events: Iterable[Event]): raise NotImplementedError("acknowledge() is not implemented") diff --git a/assimilator/core/patterns/__init__.py b/assimilator/core/patterns/__init__.py index e7bfb8e..aac852b 100644 --- a/assimilator/core/patterns/__init__.py +++ b/assimilator/core/patterns/__init__.py @@ -1,2 +1,4 @@ -import assimilator.core.patterns.context_managers -import assimilator.core.patterns.mixins +from assimilator.core.patterns.context_managers import * +from assimilator.core.patterns.mixins import * +from assimilator.core.patterns.error_wrapper import * +from assimilator.core.patterns.lazy_command import * diff --git a/assimilator/core/patterns/error_wrapper.py b/assimilator/core/patterns/error_wrapper.py new file mode 100644 index 0000000..f475ef6 --- /dev/null +++ b/assimilator/core/patterns/error_wrapper.py @@ -0,0 +1,47 @@ +from functools import wraps +from typing import Dict, Type, Optional, Callable, Container + + +class ErrorWrapper: + def __init__( + self, + error_mappings: Optional[Dict[Type[Exception], Type[Exception]]] = None, + default_error: Optional[Type[Exception]] = None, + skipped_errors: Optional[Container[Type[Exception]]] = None, + ): + self.error_mappings = error_mappings or {} + self.default_error = default_error + + self.skipped_errors = skipped_errors or set() + self.skipped_errors = set(self.skipped_errors) + self.skipped_errors.add(BaseException) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_val is None: + return + elif exc_type in self.skipped_errors: + raise exc_val + + wrapped_error = self.error_mappings.get(exc_type) + + if wrapped_error is not None: + raise wrapped_error(exc_val) + elif self.default_error is not None: + raise self.default_error(exc_val) + + raise exc_val # No wrapping error was found + + def decorate(self, func: Callable) -> Callable: + + @wraps(func) + def wrapper(*args, **kwargs): + with self: + return func(*args, **kwargs) + + return wrapper + + +__all__ = ['ErrorWrapper'] diff --git a/assimilator/core/patterns/lazy_command.py b/assimilator/core/patterns/lazy_command.py new file mode 100644 index 0000000..9c03e02 --- /dev/null +++ b/assimilator/core/patterns/lazy_command.py @@ -0,0 +1,38 @@ +from typing import Union, Callable, Iterable, Container, TypeVar, Generic, Iterator + +T = TypeVar("T") + + +class LazyCommand(Generic[T]): + def __init__(self, command: Callable, *args, **kwargs): + self.command = command + self.args = args + self.kwargs = kwargs + self._results: T = None + + def __call__(self) -> Union[Container[T], T]: + if self._results is not None: + return self._results + + self._results = self.command(*self.args, **self.kwargs) + return self._results + + def __iter__(self) -> Iterator[T]: + results = self() + + if not isinstance(results, Iterable): # get() command + raise StopIteration("Results are not iterable") + + return iter(results) # filter() command + + def __bool__(self): + return bool(self()) + + def __str__(self): + return f"Lazy<{self.command}(*{self.args}, **{self.kwargs})>" + + def __repr__(self): + return str(self) + + +__all__ = ['LazyCommand'] diff --git a/assimilator/core/patterns/mixins.py b/assimilator/core/patterns/mixins.py index 447c4cb..29f9739 100644 --- a/assimilator/core/patterns/mixins.py +++ b/assimilator/core/patterns/mixins.py @@ -1,14 +1,17 @@ import json -from typing import Type +from typing import Type, TypeVar from pydantic import ValidationError, BaseModel from assimilator.core.exceptions import ParsingError +T = TypeVar("T", bound=BaseModel) + + class JSONParsedMixin: @classmethod - def from_json(cls: Type['BaseModel'], data: str) -> 'BaseModel': + def from_json(cls: Type[T], data: str) -> T: try: return cls(**json.loads(data)) except ValidationError as exc: diff --git a/assimilator/core/services/crud.py b/assimilator/core/services/crud.py index b8b19ac..8984c06 100644 --- a/assimilator/core/services/crud.py +++ b/assimilator/core/services/crud.py @@ -5,44 +5,42 @@ class CRUDService(Service): - def __init__(self, uow: UnitOfWork, model: Type): - self.uow = uow - self.specifications = self.uow.repository.specifications + def __init__(self, model: Type): self.model = model - def create(self, obj_data: dict): - with self.uow: + def create(self, obj_data: dict, uow: UnitOfWork): + with uow: obj = self.model(**obj_data) - self.uow.repository.save(obj) - self.uow.commit() + uow.repository.save(obj) + uow.commit() - self.uow.repository.refresh(obj) + uow.repository.refresh(obj) return obj - def update(self, update_data: dict, *filters, **kwargs_filters): - with self.uow: + def update(self, update_data: dict, uow: UnitOfWork, *filters, **kwargs_filters): + with uow: obj = self.get(*filters, **kwargs_filters) for key, value in update_data.items(): setattr(obj, key, value) - self.uow.repository.update(obj) - self.uow.commit() + uow.repository.update(obj) + uow.commit() - self.uow.repository.refresh(obj) + uow.repository.refresh(obj) return obj - def list(self, *filters, lazy: bool = False, **kwargs_filters): - return self.uow.repository.get(self.specifications.filter(*filters, **kwargs_filters), lazy=lazy) + def list(self, *filters, uow: UnitOfWork, lazy: bool = False, **kwargs_filters): + return uow.repository.get(uow.repository.specs.filter(*filters, **kwargs_filters), lazy=lazy) - def get(self, *filters, lazy: bool = False, **kwargs_filters): - return self.uow.repository.filter(self.specifications.filter(*filters, **kwargs_filters), lazy=lazy) + def get(self, *filters, uow: UnitOfWork, lazy: bool = False, **kwargs_filters): + return uow.repository.filter(uow.repository.specs.filter(*filters, **kwargs_filters), lazy=lazy) - def delete(self, *filters, **kwargs_filters): - with self.uow: + def delete(self, uow: UnitOfWork, *filters, **kwargs_filters): + with uow: obj = self.get(*filters, **kwargs_filters) - self.uow.repository.delete(obj) - self.uow.commit() + uow.repository.delete(obj) + uow.commit() __all__ = [ diff --git a/assimilator/internal/database/error_wrapper.py b/assimilator/internal/database/error_wrapper.py new file mode 100644 index 0000000..d6589c6 --- /dev/null +++ b/assimilator/internal/database/error_wrapper.py @@ -0,0 +1,13 @@ +from assimilator.core.database.exceptions import DataLayerError, NotFoundError +from assimilator.core.patterns.error_wrapper import ErrorWrapper + + +class InternalErrorWrapper(ErrorWrapper): + def __init__(self): + super(InternalErrorWrapper, self).__init__(error_mappings={ + KeyError: NotFoundError, + TypeError: NotFoundError, + }, default_error=DataLayerError) + + +__all__ = ['InternalErrorWrapper'] diff --git a/assimilator/internal/database/models.py b/assimilator/internal/database/models.py index 37ba4e1..5029040 100644 --- a/assimilator/internal/database/models.py +++ b/assimilator/internal/database/models.py @@ -1,7 +1,29 @@ -from typing import Any +from typing import TypeVar +from uuid import uuid4 from pydantic import BaseModel +from assimilator.core.patterns.mixins import JSONParsedMixin -class InternalModel(BaseModel): - id: Any + +T = TypeVar("T") +ComparableT = TypeVar("ComparableT") + + +class InternalModel(JSONParsedMixin, BaseModel): + id: T + + class Meta: + autogenerate_id = True + + def generate_id(self, *args, **kwargs) -> T: + return str(uuid4()) + + def __init__(self, *args, **kwargs): + if self.Meta.autogenerate_id and (kwargs.get('id') is None): + kwargs['id'] = self.generate_id(*args, **kwargs) + + super(InternalModel, self).__init__(*args, **kwargs) + + +__all__ = ['InternalModel'] diff --git a/assimilator/internal/database/repository.py b/assimilator/internal/database/repository.py index 0529099..196d656 100644 --- a/assimilator/internal/database/repository.py +++ b/assimilator/internal/database/repository.py @@ -1,52 +1,105 @@ -import re -from typing import Type - -from assimilator.core.database import BaseRepository, SpecificationList -from assimilator.core.database.exceptions import NotFoundError -from assimilator.internal.database.specifications import InternalSpecification, InternalSpecificationList - - -class InternalRepository(BaseRepository): - def __init__(self, session: dict, specifications: Type[SpecificationList] = InternalSpecificationList): - super(InternalRepository, self).__init__(session=session, initial_query='', specifications=specifications) - - def get(self, *specifications: InternalSpecification, lazy: bool = False): - try: - return self.session[self._apply_specifications(specifications)] - except (KeyError, TypeError) as exc: - raise NotFoundError(exc) - - def filter(self, *specifications: InternalSpecification, lazy: bool = False): - if not specifications: - return list(self.session.values()) - - key_mask = self._apply_specifications(specifications) - if lazy: - return key_mask - - models = [] - for key, value in self.session.items(): - if not re.match(key, key_mask): - pass - - models.append(value) - - return models - - def save(self, obj): - self.session[str(obj.id)] = obj - - def delete(self, obj): - del self.session[str(obj.id)] - - def update(self, obj): - self.session[str(obj.id)] = obj - - def is_modified(self, obj): - return self.get(self.specifications.filter(obj.id)) == obj - - def refresh(self, obj): - obj.value = self.get(self.specifications.filter(obj.id)) +from typing import Type, Union, Optional, TypeVar, List + +from assimilator.internal.database.models import InternalModel +from assimilator.core.patterns.error_wrapper import ErrorWrapper +from assimilator.internal.database.error_wrapper import InternalErrorWrapper +from assimilator.core.database import Repository, SpecificationType, LazyCommand, \ + make_lazy, InvalidQueryError +from assimilator.internal.database.specifications import InternalSpecificationList + +ModelT = TypeVar("ModelT", bound=InternalModel) + + +class InternalRepository(Repository): + session: dict + model: Type[ModelT] + + def __init__( + self, + session: dict, + model: Type[ModelT], + initial_query: Optional[str] = '', + specifications: Type[InternalSpecificationList] = InternalSpecificationList, + error_wrapper: Optional[ErrorWrapper] = None, + ): + super(InternalRepository, self).__init__( + model=model, + session=session, + initial_query=initial_query, + specifications=specifications, + error_wrapper=error_wrapper or InternalErrorWrapper(), + ) + + @make_lazy + def get( + self, + *specifications: SpecificationType, + lazy: bool = False, + initial_query: Optional[str] = None, + ) -> Union[LazyCommand[ModelT], ModelT]: + query = self._apply_specifications( + query=self.get_initial_query(initial_query), + specifications=specifications, + ) + return self.session[query] + + @make_lazy + def filter( + self, + *specifications: SpecificationType, + lazy: bool = False, + initial_query: Optional[str] = None, + ) -> Union[LazyCommand[List[ModelT]], List[ModelT]]: + return self._apply_specifications( + query=list(self.session.values()), + specifications=specifications, + ) + + def save(self, obj: Optional[ModelT] = None, **obj_data) -> ModelT: + if obj is None: + obj = self.model(**obj_data) + + self.session[obj.id] = obj + return obj + + def delete(self, obj: Optional[ModelT] = None, *specifications: SpecificationType) -> None: + if specifications: + for model in self.filter(*specifications, lazy=True): + del self.session[model.id] + elif obj is not None: + del self.session[obj.id] + + def update( + self, obj: Optional[ModelT] = None, *specifications: SpecificationType, **update_values, + ) -> None: + if specifications: + if not update_values: + raise InvalidQueryError( + "You did not provide any update_values " + "to the update() yet provided specifications" + ) + + for model in self.filter(*specifications, lazy=True): + model.__dict__.update(update_values) + self.save(model) + + elif obj is not None: + self.save(obj) + + def is_modified(self, obj: ModelT) -> bool: + return self.get(self.specs.filter(id=obj.id)) == obj + + def refresh(self, obj: ModelT) -> None: + fresh_obj = self.get(self.specs.filter(id=obj.id), lazy=False) + obj.__dict__.update(fresh_obj.__dict__) + + @make_lazy + def count( + self, *specifications: SpecificationType, lazy: bool = False, + ) -> Union[LazyCommand[int], int]: + if specifications: + return len(self.filter(*specifications, lazy=False)) + return len(self.session) __all__ = [ diff --git a/assimilator/internal/database/specifications.py b/assimilator/internal/database/specifications.py index 55e8450..2cb97d9 100644 --- a/assimilator/internal/database/specifications.py +++ b/assimilator/internal/database/specifications.py @@ -1,28 +1,88 @@ -from assimilator.core.database import Specification, specification, SpecificationList +import operator +from typing import List, Iterable, Any, Union, Callable, Optional +from assimilator.core.database import specification, SpecificationList, filter_parameter_parser +from assimilator.internal.database.models import InternalModel -class InternalSpecification(Specification): - def apply(self, query: str) -> str: # returns the str key - return super(InternalSpecification, self).apply(query) + +QueryT = Union[str, List[InternalModel]] +internal_filter_mappings = { + "__gt": lambda field, value: operator.gt, + "__gte": lambda field, value: operator.ge, + "__lt": lambda field, value: operator.lt, + "__lte": lambda field, value: operator.le, + "__not": lambda field, value: operator.not_, + "__is": lambda field, value: operator.is_, +} @specification -def internal_filter(key: str, query: str) -> str: - return f"{query}{key}" +def internal_filter(*filters, query: QueryT, **filters_by) -> Iterable[InternalModel]: + if not (filters_by or filters): # no filters present + return query + elif isinstance(query, str): + id_ = filters_by.get('id') + return f'{query}{"".join(filters)}{id_ if id_ else ""}' + + parsed_arguments: List[(str, Callable, Any)] = [] + + for field, value in dict(filters_by).items(): + ending, parsed_filter = filter_parameter_parser( + field=field, + value=value, + filter_mappings=internal_filter_mappings, + ) + + parsed_arguments.append(( + field.replace(ending, "") if (ending is not None) else field, + parsed_filter or operator.eq, + value, + )) + + return list(filter( + lambda model: all( + operation_(getattr(model, field_), val) + for field_, operation_, val in parsed_arguments + ), + query, + )) @specification -def internal_order(*args, query: str, **kwargs) -> str: +def internal_order(*args, query: QueryT, **kwargs) -> Iterable[InternalModel]: + if isinstance(query, str): + return query + + fields = (*args, *kwargs.keys()) + + if not any(field.startswith("-") for field in fields): + query.sort(key=lambda item: [getattr(item, argument) for argument in fields]) + return query + + for field in fields: + reverse = field.startswith("-") + query.sort(key=lambda item: getattr(item, field.strip("-")), reverse=reverse) + return query @specification -def internal_paginate(*args, query: str, **kwargs) -> str: - return query +def internal_paginate( + *, + limit: Optional[int] = None, + offset: Optional[int] = None, + query: QueryT = None, +) -> Iterable[InternalModel]: + if query is None: + raise ValueError("Query must not be None in the specification!") + if isinstance(query, str): + return query + + return query[offset:limit] @specification -def internal_join(*args, query: str, **kwargs) -> str: +def internal_join(*args, query: QueryT, **kwargs) -> Any: return query @@ -34,7 +94,6 @@ class InternalSpecificationList(SpecificationList): __all__ = [ - 'InternalSpecification', 'internal_filter', 'internal_order', 'internal_paginate', diff --git a/assimilator/internal/database/unit_of_work.py b/assimilator/internal/database/unit_of_work.py index c4ffacb..26ec9fd 100644 --- a/assimilator/internal/database/unit_of_work.py +++ b/assimilator/internal/database/unit_of_work.py @@ -2,12 +2,19 @@ from typing import Optional from assimilator.core.database import UnitOfWork -from assimilator.core.database.repository import BaseRepository +from assimilator.core.database.repository import Repository +from assimilator.internal.database.repository import InternalRepository +from assimilator.internal.database.error_wrapper import InternalErrorWrapper class InternalUnitOfWork(UnitOfWork): - def __init__(self, repository: BaseRepository): - super(InternalUnitOfWork, self).__init__(repository) + repository: InternalRepository + + def __init__(self, repository: Repository): + super(InternalUnitOfWork, self).__init__( + repository=repository, + error_wrapper=InternalErrorWrapper(), + ) self._saved_data: Optional[dict] = None def begin(self): diff --git a/assimilator/internal/events/events_bus.py b/assimilator/internal/events/events_bus.py index 27d7e66..73b29aa 100644 --- a/assimilator/internal/events/events_bus.py +++ b/assimilator/internal/events/events_bus.py @@ -1,4 +1,4 @@ -from assimilator.core.events import Event +from assimilator.core.events.events import Event from assimilator.core.events.events_bus import EventConsumer, EventProducer diff --git a/assimilator/redis/__init__.py b/assimilator/redis/__init__.py deleted file mode 100644 index 0ef6354..0000000 --- a/assimilator/redis/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from assimilator.redis.database import * -from assimilator.redis.events import * diff --git a/assimilator/redis/database/__init__.py b/assimilator/redis/database/__init__.py deleted file mode 100644 index 761266a..0000000 --- a/assimilator/redis/database/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from assimilator.redis.database.models import * -from assimilator.redis.database.repository import * -from assimilator.redis.database.unit_of_work import * diff --git a/assimilator/redis/database/models.py b/assimilator/redis/database/models.py deleted file mode 100644 index 2feab4f..0000000 --- a/assimilator/redis/database/models.py +++ /dev/null @@ -1,15 +0,0 @@ -from typing import Optional - -from pydantic import BaseModel - -from assimilator.core.patterns.mixins import JSONParsedMixin - - -class RedisModel(JSONParsedMixin, BaseModel): - id: int - expire_in: Optional[int] = None - - -__all__ = [ - 'RedisModel', -] diff --git a/assimilator/redis/database/repository.py b/assimilator/redis/database/repository.py deleted file mode 100644 index 34fafb8..0000000 --- a/assimilator/redis/database/repository.py +++ /dev/null @@ -1,55 +0,0 @@ -from typing import Type - -import redis - -from assimilator.core.database import SpecificationList -from assimilator.redis.database import RedisModel -from assimilator.core.database.repository import BaseRepository, LazyCommand -from assimilator.internal.database.specifications import InternalSpecification, InternalSpecificationList - - -class RedisRepository(BaseRepository): - def __init__( - self, - session: redis.Redis, - model: Type[RedisModel], - specifications: Type[SpecificationList] = InternalSpecificationList, - ): - super(RedisRepository, self).__init__(session, initial_query='', specifications=specifications) - self.model = model - - def get(self, *specifications: InternalSpecification, lazy: bool = False): - key_name = self._apply_specifications(specifications) - if lazy: - return LazyCommand(self.session.get, key_name) - return self.session.get(key_name) - - def filter(self, *specifications: InternalSpecification, lazy: bool = False): - key_name = self._apply_specifications(specifications) - if lazy: - return LazyCommand(self.session.keys, key_name) - - return [self.model.from_json(value) for value in self.session.mget(self.session.keys(key_name))] - - def save(self, obj: RedisModel): - self.session.set(str(obj.id), obj.json(), ex=obj.expire_in) - - def delete(self, obj: RedisModel): - self.session.delete(str(obj.id)) - - def update(self, obj: RedisModel): - self.save(obj) - - def is_modified(self, obj: RedisModel): - return self.get(self.specifications.filter(obj.id), lazy=False) == obj - - def refresh(self, obj: RedisModel): - fresh_obj = self.get(self.specifications.filter(obj.id), lazy=False) - - for key, value in fresh_obj.dict().items(): - setattr(obj, key, value) - - -__all__ = [ - 'RedisRepository', -] diff --git a/assimilator/redis/database/unit_of_work.py b/assimilator/redis/database/unit_of_work.py deleted file mode 100644 index 21c0a45..0000000 --- a/assimilator/redis/database/unit_of_work.py +++ /dev/null @@ -1,25 +0,0 @@ -from assimilator.core.database.unit_of_work import UnitOfWork - - -class RedisUnitOfWork(UnitOfWork): - _saved_session = None - - def begin(self): - self._saved_session = self.repository.session - self.repository.session = self.repository.session.pipeline() - - def rollback(self): - self.repository.session.discard() - - def commit(self): - self.repository.session.execute() - - def close(self): - self.repository.session.reset() - self.repository.session = self._saved_session - self._saved_session = None - - -__all__ = [ - 'RedisUnitOfWork', -] diff --git a/assimilator/redis/events/__init__.py b/assimilator/redis/events/__init__.py deleted file mode 100644 index 9a112fd..0000000 --- a/assimilator/redis/events/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from assimilator.redis.events.events_bus import * diff --git a/assimilator/redis_/__init__.py b/assimilator/redis_/__init__.py new file mode 100644 index 0000000..66696f9 --- /dev/null +++ b/assimilator/redis_/__init__.py @@ -0,0 +1,2 @@ +from assimilator.redis_.database import * +from assimilator.redis_.events import * diff --git a/assimilator/redis_/database/__init__.py b/assimilator/redis_/database/__init__.py new file mode 100644 index 0000000..e8c3980 --- /dev/null +++ b/assimilator/redis_/database/__init__.py @@ -0,0 +1,3 @@ +from assimilator.redis_.database.models import * +from assimilator.redis_.database.repository import * +from assimilator.redis_.database.unit_of_work import * diff --git a/assimilator/redis_/database/models.py b/assimilator/redis_/database/models.py new file mode 100644 index 0000000..4d578da --- /dev/null +++ b/assimilator/redis_/database/models.py @@ -0,0 +1,16 @@ +from typing import Optional + +from assimilator.internal.database.models import InternalModel + + +class RedisModel(InternalModel): + expire_in: Optional[int] = None + expire_in_px: Optional[int] = None + only_update: Optional[bool] = False # Same as xx in redis set. Only set if key exists + only_create: Optional[bool] = False # Same as nx in redis set. Only set if key does not exist + keep_ttl: Optional[bool] = False + + +__all__ = [ + 'RedisModel', +] diff --git a/assimilator/redis_/database/repository.py b/assimilator/redis_/database/repository.py new file mode 100644 index 0000000..15606b9 --- /dev/null +++ b/assimilator/redis_/database/repository.py @@ -0,0 +1,159 @@ +from typing import Type, Union, Optional, TypeVar, Collection + +from redis import Redis +from redis.client import Pipeline + +from assimilator.redis_.database import RedisModel +from assimilator.core.database.exceptions import DataLayerError, NotFoundError, InvalidQueryError +from assimilator.core.patterns.error_wrapper import ErrorWrapper +from assimilator.core.database import ( + SpecificationList, + SpecificationType, + Repository, + LazyCommand, + make_lazy, +) +from assimilator.internal.database.specifications import InternalSpecificationList + +RedisModelT = TypeVar("RedisModelT", bound=RedisModel) + + +class RedisRepository(Repository): + session: Redis + transaction: Union[Pipeline, Redis] + model: Type[RedisModelT] + + def __init__( + self, + session: Redis, + model: Type[RedisModelT], + initial_query: Optional[str] = '', + specifications: Type[SpecificationList] = InternalSpecificationList, + error_wrapper: Optional[ErrorWrapper] = None, + use_double_filter: bool = True, + ): + super(RedisRepository, self).__init__( + session=session, + model=model, + initial_query=initial_query, + specifications=specifications, + error_wrapper=error_wrapper or ErrorWrapper( + default_error=DataLayerError, + skipped_errors=(NotFoundError,) + ) + ) + self.transaction = session + self.use_double_specifications = use_double_filter + + @make_lazy + def get( + self, + *specifications: SpecificationType, + lazy: bool = False, + initial_query: Optional[str] = None, + ) -> Union[LazyCommand[RedisModelT], RedisModelT]: + query = self._apply_specifications( + query=self.get_initial_query(initial_query), + specifications=specifications, + ) + + found_obj = self.session.get(query) + if found_obj is None: + raise NotFoundError(f"Redis model was not found") + + return self.model.from_json(found_obj) + + @make_lazy + def filter( + self, + *specifications: SpecificationType, + lazy: bool = False, + initial_query: Optional[str] = None, + ) -> Union[LazyCommand[Collection[RedisModelT]], Collection[RedisModelT]]: + if self.use_double_specifications and specifications: + key_name = self._apply_specifications( + query=self.get_initial_query(initial_query), + specifications=specifications, + ) or "*" + else: + key_name = "*" + + key_name = key_name or "*" + + models = self.session.mget(self.session.keys(key_name)) + return list(self._apply_specifications(specifications=specifications, query=[ + self.model.from_json(value) for value in models + ])) + + def save(self, obj: Optional[RedisModelT] = None, **obj_data) -> RedisModelT: + if obj is None: + obj = self.model(**obj_data) + + self.transaction.set( + name=obj.id, + value=obj.json(), + ex=obj.expire_in, + px=obj.expire_in_px, + nx=obj.only_create, + xx=obj.only_update, + keepttl=obj.keep_ttl, + ) + return obj + + def delete(self, obj: Optional[RedisModelT] = None, *specifications: SpecificationType) -> None: + if specifications: + models = self.filter(*specifications) # TODO: ADD ONLY SPECIFICATIONS + self.transaction.delete(*[str(model.id) for model in models]) + elif obj is not None: + self.transaction.delete(obj.id) + + def update( + self, + obj: Optional[RedisModelT] = None, + *specifications: SpecificationType, + **update_values, + ) -> None: + if specifications: + if not update_values: + raise InvalidQueryError( + "You did not provide any update_values " + "to the update() yet provided specifications" + ) + + models = self.filter(*specifications, lazy=False) + updated_models = {} + + for model in models: + model.__dict__.update(update_values) + updated_models = {model.id: model} + + self.transaction.mset(updated_models) + + elif obj is not None: + obj.only_update = True + self.save(obj) + + def is_modified(self, obj: RedisModelT) -> None: + return self.get(self.specifications.filter(obj.id), lazy=False) == obj + + def refresh(self, obj: RedisModelT) -> None: + fresh_obj = self.get(self.specifications.filter(obj.id), lazy=False) + + for key, value in fresh_obj.dict().items(): + setattr(obj, key, value) + + @make_lazy + def count(self, *specifications: SpecificationType, lazy: bool = False) -> Union[LazyCommand[int], int]: + if specifications: + return self.session.dbsize() + + filter_query = self._apply_specifications( + query=self.get_initial_query(), + specifications=specifications, + ) + return len(self.session.keys(filter_query)) + + +__all__ = [ + 'RedisRepository', +] diff --git a/assimilator/redis_/database/unit_of_work.py b/assimilator/redis_/database/unit_of_work.py new file mode 100644 index 0000000..d65c0db --- /dev/null +++ b/assimilator/redis_/database/unit_of_work.py @@ -0,0 +1,26 @@ +from redis.client import Pipeline + +from assimilator.core.database.unit_of_work import UnitOfWork +from redis_.database.repository import RedisRepository + + +class RedisUnitOfWork(UnitOfWork): + repository: RedisRepository + + def begin(self): + self.repository.transaction = self.repository.session.pipeline() + + def rollback(self): + self.repository.transaction.discard() + + def commit(self): + self.repository.transaction.execute() + + def close(self): + self.repository.transaction.reset() + self.repository.transaction = self.repository.session + + +__all__ = [ + 'RedisUnitOfWork', +] diff --git a/assimilator/redis_/events/__init__.py b/assimilator/redis_/events/__init__.py new file mode 100644 index 0000000..e4d2cef --- /dev/null +++ b/assimilator/redis_/events/__init__.py @@ -0,0 +1 @@ +from assimilator.redis_.events.events_bus import * diff --git a/assimilator/redis/events/events_bus.py b/assimilator/redis_/events/events_bus.py similarity index 89% rename from assimilator/redis/events/events_bus.py rename to assimilator/redis_/events/events_bus.py index b42568d..45bb9a6 100644 --- a/assimilator/redis/events/events_bus.py +++ b/assimilator/redis_/events/events_bus.py @@ -1,13 +1,13 @@ from typing import Iterable -import redis +from redis import Redis from assimilator.core.events import Event, ExternalEvent from assimilator.core.events.events_bus import EventConsumer, EventProducer class RedisEventConsumer(EventConsumer): - def __init__(self, channels: Iterable[str], session: redis.Redis): + def __init__(self, channels: Iterable[str], session: Redis): self.session = session self.channels = channels @@ -36,7 +36,7 @@ def delay_function(self): class RedisEventProducer(EventProducer): - def __init__(self, channel: str, session: redis.Redis): + def __init__(self, channel: str, session: Redis): self.session = session self.channel = channel diff --git a/docs/images/logo.png b/docs/images/logo.png new file mode 100644 index 0000000..f75fc0c Binary files /dev/null and b/docs/images/logo.png differ diff --git a/docs/images/logo.svg b/docs/images/logo.svg new file mode 100644 index 0000000..e530c36 --- /dev/null +++ b/docs/images/logo.svg @@ -0,0 +1,14 @@ + + + + + + + + diff --git a/docs/images/logo_white.svg b/docs/images/logo_white.svg new file mode 100644 index 0000000..d8fddfa --- /dev/null +++ b/docs/images/logo_white.svg @@ -0,0 +1,4 @@ + + + + diff --git a/docs/index.md b/docs/index.md index 856466d..0091d6e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,5 +1,7 @@ # Assimilator - the best Python patterns for the best projects +![](/images/logo.png) + ## Install now * `pip install py_assimilator` diff --git a/docs/patterns/database.md b/docs/patterns/database.md index f554c40..ba8cda1 100644 --- a/docs/patterns/database.md +++ b/docs/patterns/database.md @@ -8,7 +8,7 @@ functions that help us change and query our data from any source. The beauty of is that you can use it with SQL, text files, cache, S3, external API's or any kind of data storage. -###### `__init__(session, specifications: SpecificationList, initial_query)` +###### `__init__()` - `session` - each repository has a session that works as the primary data source. It can be your database connection, a text file or a data structure. - `initial_query` - the initial query that you use in the data storage. We will show how it works later. It can be an SQL query, a key in the dictionary or anything else. - `specifications: SpecificationList` - an object that contains links to the specifications that are used to create your queries @@ -16,7 +16,7 @@ is that you can use it with SQL, text files, cache, S3, external API's or any ki ###### `_get_initial_query()` returns the initial query used in the `_apply_specifications()` -###### `_apply_specifications(specifications: Iterable[Specifications]) -> query` +###### `_apply_specifications()` Applies Specifications to the query. **Must not be used directly.** apply specifications gets a list of specifications and applies them to the query returned in _get_initial_query(). The idea is the following: each specification gets a query and @@ -27,60 +27,71 @@ specifications provided by the user. else that specifies what kind of data we want. -###### `get(*specifications: Specification, lazy: bool = False) -> Union[LazyCommand, Any]:` +###### `get()` get is the function used to query the data storage and return one entity. You supply a list of specifications that get you the entity from the storage. - `specifications: Specifications` - specifications that can be used to specify some conditions in the query - `lazy: bool` - whether you want to execute your query straight away or just build it for the future +- `initial_query = None` - if you want to change the initial query for this query only, then you can provide it as an argument -###### `filter(self, *specifications: Specification, lazy: bool = False) -> Union[LazyCommand, Iterable]:` +###### `filter()` filters is the function used to query the data storage and return many entities. You supply a list of specifications that filter entities in the storage. - `specifications: Specifications` - specifications that can be used to specify some conditions in the query - `lazy: bool` - whether you want to execute your query straight away or just build it for the future +- `initial_query = None` - if you want to change the initial query for this query only, then you can provide it as an argument - -###### `save(self, obj) -> None:` +###### `save()` Adds the objects to the session, so you can commit it latter. This method should not change the final state of the storage, we have UnitOfWork for this(*do not commit your changes, just add them*). -###### `delete(self, obj) -> None:` +###### `delete()` Deletes the objects from the session, so you can commit it latter. This method should not change the final state of the storage, we have UnitOfWork for this(*do not commit your changes, just delete them from your session*). -###### `update(self, obj) -> None:` +###### `update()` Updates the objects in the session, so you can commit it latter. This method should not change the final state of the storage, we have UnitOfWork for this(*do not commit your changes, just update them in your session*). -###### `is_modified(self, obj) -> bool:` +###### `is_modified()` Checks whether an obj was modified or not. If any value changes within the object, then it must return True -###### `refresh(self, obj) -> None:` +###### `refresh()` Updates the object values with the values in the data storage. That can be useful if you want to create an object and get its id that was generated in the storage, or if you just want to have the latest saved version of the object. +###### `count()` +Counts the objects while applying specifications to the query. Give no specifications to +count the whole data storage. +- `specifications: Specifications` - specifications that can be used to specify some conditions in the query +- `lazy: bool` - whether you want to execute your query straight away or just build it for the future + ### Creating your own repository: If you want to create your own repository, then you are going to have to override all the functions above. But, please, do not make new functions available to the outer world. You can do this: + ```python -from assimilator.core.database import BaseRepository +from assimilator.core.database import Repository + -class UserRepository(BaseRepository): +class UserRepository(Repository): def _users_private_func(self): # Cannot be called outside return 'Do something' ``` And call that function inside of your repository. But, never do this: + ```python -from assimilator.core.database import BaseRepository +from assimilator.core.database import Repository + -class UserRepository(BaseRepository): +class UserRepository(Repository): def get_ser_by_id(self): # Cannot be called outside return self.get(filter_specification(id=1)) @@ -88,19 +99,18 @@ class UserRepository(BaseRepository): ``` Since it is going to be really hard for you to replace one repository to another. Example: - ```python -from assimilator.core.database import BaseRepository +from assimilator.core.database import Repository from users.repository import UserRepository from products.repository import ProductRepository -def get_by_id(id, repository: BaseRepository) +def get_by_id(id, repository: Repository): return repository.get(filter_specification(id=1)) get_by_id(UserRepository()) -get_by_id(ProductRepository()) +get_by_id(ProductRepository()) # You can call the function with both repositories, and it will probably work fine ``` @@ -200,12 +210,16 @@ user = repository.get( ## SpecificationList SpecificationList is a static class that contains basic specifications for our repository. -Specifications: -- `filter()` - filters the data -- `order()` - filters the data -- `paginate()` - paginates the data(limits the results, offsets them) -- `join()` - joins entities together(join a table, get related data) +Specifications: +###### `filter()` +filters the data +###### `order()` +orders the data +###### `paginate()` +paginates the data(limits the results, offsets them). +###### `join()` +joins entities together(join a table, get related data). The reason we use `SpecificationList` is because we want to have an abstraction for our specifications. Take two examples: @@ -278,6 +292,35 @@ Once you have done that, the repository will use your specifications. > Of course, you can still use specifications directly, but if you ever need to change > the repository, then it may be a problem. + +## LazyCommand +Sometimes we don't want to execute the query right away. For example, for optimization purposes or +some other purpose that requires us to delay the execution. In that case, you want to find `lazy` argument +in the function that you are calling and set it to `True`. After that, a `LazyCommand` is going to be returned. That +object allows you to call it as a function or iterate over it to get the results: + +```python +from assimilator.core.database import Repository + + +def print_all_usernames(repository: Repository): + for user in repository.filter(lazy=True): + print(user.username) + # we don't want to receive a list of all the users, but want to iterate + # through it and only get 1 user at a time + + +def count_users_if_argument_true(do_count, repository: Repository): + count_command = repository.count(lazy=True) + # turn on lazy and get LazyCommand + + if do_count: + return count_command() # call for the result + return -1 + +``` + + ## Unit of Work Unit of work allows us to work with transactions and repositories that change the data. The problem with repository is the transaction management. We want to make our transaction management @@ -292,8 +335,9 @@ They allow us to do the following: 5. Unit of work closes the transaction -###### `__init__(repository: BaseRepository)` -The repository is provided in the UnitOfWork when we create it. The session +###### `__init__()` + +- `repository: BaseRepository` - The repository is provided in the UnitOfWork when we create it. The session to the data storage is going to be taken from the repository. ###### `begin()` @@ -308,7 +352,7 @@ Saves the changes to the data storage. While the repository only adds the tempor function is responsible for the final save. _You need to call it yourself, it will not be called automatically like rollback()_ ###### `close()` -closes the transaction. The function is called automatically. +Closes the transaction. The function is called automatically. #### Here is how you can use UnitOfWork in your code: @@ -346,4 +390,3 @@ def create_user(username: str, uow: UnitOfWork): 1 / 0 # ZeroDivisionError. UnitOfWork calls rollback automatically. uow.commit() # nothing is saved, since the rollback was called. ``` -p diff --git a/docs/patterns/events.md b/docs/patterns/events.md new file mode 100644 index 0000000..4f09ed7 --- /dev/null +++ b/docs/patterns/events.md @@ -0,0 +1,290 @@ +# Events patterns + +## Events and how they work +Event shows changes in your system and listeners(consumers) respond to them. Events contain all the possible things that +other parts of the system may need once they respond to them. That is useful in lots of systems, and this page will describe +the basics of assimilator events. Events use [Pydantic](https://docs.pydantic.dev/) module to ease the process of creation, integration +and parsing. + + +## Event based systems with Assimilator + +1. `Event` - representation of a change in your system that carries all the useful data +2. `EventProducer` - something that produces events(executes the changes in the system and shows it with events) +3. `EventConsumer` - something that waits for the producer to emit various events for it to consume them and execute various operations based on the other changes +4. `EventBus` - combines both producers and consumers in one Entity that can produce and consume simultaneously + + +## Event example with user registration: +1. User sends his registration data to our website +2. We create a new user in the database and emit an `UserCreated` event using an `EventProducer` +3. `EventConsumers` listen to our `UserCreated` event and executes all the operations that must be done once the user is registered + + +## Event +###### `id: int` +Unique identification for the event. +###### `event_name: str` +Name of the event. We can have different events in our system. For example, if we +have an event for User creation and an event for User deletion, then we can name them: + +- User creation: event_name = user_created +- User deletion: event_name = user_deleted + +Those names can help us sort and only listen to specific kind of events. All the names +must be in the past, since an event is the change in the past. +###### `event_date: datetime` +Date of the event. You don't need to change this field since it is assigned by default when an event +is created. + +###### `from_json()` +`from_json()` is a function that is used to convert json data to an event. +That method is in the `JSONParsedMixin` class, and it allows us to quickly convert json +to a Python object. + +- `cls: Type['BaseModel']` - Any [Pydantic](https://docs.pydantic.dev/) class, but typically an `Event` +- `data: str` - json data for our event + +## Create a custom event + +`events.py`: +```python +from assimilator.core.events import Event + +class UserCreated(Event): + user_id: int + username: str + email: str # all the data that could be useful is in the event. + # Since Event is a Pydantic model, we can just create new fields like this + +``` + +`logic.py`: +```python +from assimilator.core.database import UnitOfWork + +from events import UserCreated +from models import User + + +def create_user(username: str, email: str, uow: UnitOfWork): + with uow: + user = User(username=username, email=email) + uow.repository.save(user) + uow.commit() + + # Refresh the object and get the user id from the database + uow.repository.refresh(user) + + event = UserCreated( # we create an event + user_id=user.id, + username=user.username, + email=user.email, + ) +``` + +In that example, we only create an event without publishing it anywhere. Find out how to emit your events below. + +## EventConsumer +`EventConsumer` reads all the incoming events and yields them to the functions that use it. + +###### `start()` +Starts the event consumer by connecting to all the required systems + +###### `close()` +Closes the consumer and finishes the work + +###### `consume()` +Yields incoming events + +> `EventConsumer` uses `StartCloseContextMixin` class that allows us to use context managers(with) +> without calling `start()` or `close()` ourselves + +Here is an example of how you would create and use your `EventConsumer`: + +`events_bus.py`: +```python +from assimilator.core.events import EventConsumer, ExternalEvent + + +class MyEventConsumer(EventConsumer): + def __init__(self, api): + # some object that connects to an external system + self.api = api + + def start(self) -> None: + self.api.connect() + + def close(self) -> None: + self.api.disconnect() + + def consume(self): + while True: + message = self.api.listen() # we receive a message from the API + yield ExternalEvent(**message.convert_to_json()) # parse it +``` + +`logic.py`: + +```python +from events_bus import MyEventConsumer + + +def consume_events(consumer: MyEventConsumer): + with consumer: + for event in events_bus.consume(): + if event.event_name == "user_created": + user_created_handler(UserCreated(**event.data)) + elif event.event_name == "user_deleted": + user_deleted_handler(UserDeleted(**event.data)) +``` + +We create a new `EventConsumer` called `MyEventConsumer`. Then, we use an `api` object +to implement all the functions. After that, we use it in `logic.py` file where we consume +all the events and handle them depending on the `event_name`. + +As you have already noticed, we use something called an `ExternalEvent`. We do that +because all the events that are coming from external sources are unidentified and can only +use specific later. `ExternalEvent` contains all the event data in the `data: dict` field which +can be used later. + +## ExternalEvent +When we listen to external systems, it is sometimes hard to make an event class +that represents a specific class. That is why we use an `ExternalEvent`. It contains all the data +in the `data: dict` field, which can be accessed later in order to use an event class that represents +that specific event. + +- `data: dict` - all the data for the event + +## AckEvent +`AckEvent` is an event that has acknowledgement in it. If you want to show that your event +was processed(acknowledged), then use `AckEvent`. + +- `ack: bool` - whether an event was processed. `False` by default + +## EventProducer +`EventProducer` is the class that produces all the events and sends them. + +###### `start()` +Starts the event producer by connecting to all the required systems. + +###### `close()` +Closes the producer and finishes the work. + +###### `produce()` +Sends an event to an external system for it to be consumed. + +- `event: Event` - the event that must be sent. + +> `EventProducer` uses `StartCloseContextMixin` class that allows us to use context managers(with) +> without calling `start()` or `close()` ourselves + +Here is an example of how you would create and use your `EventProducer`: + +`events_bus.py`: +```python +from assimilator.core.events import EventProducer + + +class MyEventProducer(EventProducer): + def __init__(self, api): + # some object that connects to an external system + self.api = api + + def start(self) -> None: + self.api.connect() + + def close(self) -> None: + self.api.disconnect() + + def produce(self, event: Event) -> None: + self.api.send_event(event.json()) # parse event to json and send it + +``` + +`logic.py`: + +```python +from events_bus import MyEventProducer +from events import UserCreated +from models import User + + +def create_user( + username: str, + email: str, + uow: UnitOfWork, + producer: MyEventProducer, +): + with uow: + user = User(username=username, email=email) + uow.repository.save(user) + uow.commit() + + # Refresh the object and get the user id from the database + uow.repository.refresh(user) + + with producer: + producer.produce( + UserCreated( # we create an event + user_id=user.id, + username=user.username, + email=user.email, + ) + ) # send an event to an external system + +``` + +> `ExternalEvent` must not be used in the producer, since when we emit the events we are the ones +> creating them, so we have a separate class for them with all the data inside. + + +## EventBus +`EventBus` combines both `EventProducer` and `EventConsumer` together. You can use those +classes separately, but sometimes you need one object that combines them. + +###### `__init__()` + +- `consumer: EventConsumer` - the consumer that we want to use +- `producer: EventProducer` - the producer that we want to use + + +###### `produce()` +produces the event using `producer` + +- `event: Event` - an event that has to be emitted + +###### `consume()` +consumes the events using `consumer`. Returns an `Iterator[Event]` + + +## Event fails and transaction management +Sometimes we want to be sure that our events are emitted. But, if we use normal +event producers and Unit Of Work separately, we may run into a problem: + +1) User is created(added in the database and unit of work committed it) +2) Event producer encounters an error(the event is not published) +3) Inconsistency: User exists, but consumers do not know about that + +Because of that, we may employ Outbox Relay. It is a pattern that allows us +to save all the events in the database in the same transaction as the main entity. Then, +another program(thread, task, function) gets all the events from the database and ensures that +they are published. We basically save the events to the database in one transaction, emit them in a separate +thing and delete them afterwards. + +## OutboxRelay +This class gets all the events using `UnitOfWork` provided, emits all events, and acknowledges them. + +###### `__init__()` +- `uow: UnitOfWork` - unit of work that is used in order to get the events, acknowledge them +- `producer: EventProducer` - event producer that we use to publish the events + +###### `start()` +Start the relay. This function must run forever, must get the events from the repository from unit of work, +and produce the events. After that, it must call `acknowledge()` to show that these events are produced. + +###### acknowledge() +Acknowledges the events in the database. It might change a boolean column for these events, +might delete them, but the idea is that those events will not be produced twice. + +- `events: Iterable[Event]` - events that must be acknowledged diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/alchemy/__init__.py b/examples/alchemy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/alchemy/database/__init__.py b/examples/alchemy/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/alchemy/database/dependencies.py b/examples/alchemy/database/dependencies.py new file mode 100644 index 0000000..b117b78 --- /dev/null +++ b/examples/alchemy/database/dependencies.py @@ -0,0 +1,13 @@ +from assimilator.alchemy.database import AlchemyRepository, AlchemyUnitOfWork +from examples.alchemy.database.models import User, DatabaseSession + + +def create_repository(): # factory for RedisRepository + return AlchemyRepository(session=DatabaseSession(), model=User) + + +def create_uow(): # factory for RedisUnitOfWork + return AlchemyUnitOfWork(repository=create_repository()) + + +__all__ = ['create_repository', 'create_uow'] diff --git a/examples/alchemy/database/main.py b/examples/alchemy/database/main.py new file mode 100644 index 0000000..1829043 --- /dev/null +++ b/examples/alchemy/database/main.py @@ -0,0 +1,112 @@ +import random +import string + +from assimilator.core.database import UnitOfWork, Repository, NotFoundError +from assimilator.core.patterns import LazyCommand +from assimilator.core.database import DataLayerError + +from dependencies import create_uow +from examples.alchemy.database.models import User + + +def create_user(username: str, email: str, balance: float, uow: UnitOfWork) -> int: + with uow: + new_user = User(username=username, email=email, balance=balance) + uow.repository.save(new_user) + uow.commit() + return new_user.id # id generated by default. But, we can change that behaviour if we need + + +def read_user(id: int, repository: Repository) -> User: + return repository.get(repository.specs.filter(id=id)) + + +def buy_product(user_id: int, product_price: int, uow: UnitOfWork): + with uow: + found_user = read_user(id=user_id, repository=uow.repository) + + found_user.balance -= product_price + uow.repository.update(found_user) + uow.commit() + + +def refresh_user(old_user: User, repository: Repository) -> User: + try: + repository.refresh(old_user) + except DataLayerError: + pass + + return old_user + + +def create_many_users(uow): + with uow: + for i in range(100): + new_user = User( + username="".join(random.sample(string.ascii_letters, 10)), + email=f"{''.join(random.sample(string.ascii_letters, 10))}@gmail.com", + balance=random.randint(0, 100), + ) + uow.repository.save(new_user) + + uow.commit() # Commit is only called once! + + +def show_rich_users(balance: int, repository: Repository): + users: LazyCommand[User] = repository.filter( + repository.specs.filter(balance__gt=balance), + repository.specs.paginate(limit=10), + lazy=True, + ) + + for rich_user in users: + print("The user", rich_user.username, "is rich!", "Balance:", rich_user.balance) + + +def delete_user(id: int, uow: UnitOfWork): + with uow: + user_to_delete = uow.repository.get( + uow.repository.specs.filter(id=id) + ) + uow.repository.delete(user_to_delete) + uow.commit() + + +def order_users(repository: Repository): + for i, ordered_user in enumerate(repository.filter( + repository.specs.order('id', '-balance'), + repository.specs.paginate(offset=20, limit=40), + )): + print(f"User {i} ordered by id and balance:", ordered_user.username, ordered_user.balance) + + +if __name__ == '__main__': + new_user_id = create_user( + username="Andrey", + email="python.on.papyrus@gmail.com", + balance=1000, + uow=create_uow(), + ) + + print(f"User with '{new_user_id}' was created") + + user = read_user(id=new_user_id, repository=create_uow().repository) + print("User returned from Redis:", user) + + buy_product(user_id=new_user_id, product_price=100, uow=create_uow()) + + updated_user = refresh_user(user, repository=create_uow().repository) + print("User balance after product purchase:", updated_user.balance) + + create_many_users(create_uow()) + show_rich_users(balance=90, repository=create_uow().repository) + + delete_user(id=new_user_id, uow=create_uow()) + print("User is deleted from the storage!") + + try: + read_user(id=new_user_id, repository=create_uow().repository) + except NotFoundError as error: + print("User was not found due to his deletion! Error:", error) + + order_users(repository=create_uow().repository) diff --git a/examples/alchemy/database/models.py b/examples/alchemy/database/models.py new file mode 100644 index 0000000..0fac7b1 --- /dev/null +++ b/examples/alchemy/database/models.py @@ -0,0 +1,24 @@ +from sqlalchemy import create_engine, Column, Integer, String, Float +from sqlalchemy.orm import declarative_base, sessionmaker + + +engine = create_engine(url="sqlite:///:memory:") +DatabaseSession = sessionmaker(bind=engine) +Base = declarative_base() + + +class User(Base): + __tablename__ = "users" + + id = Column(Integer(), primary_key=True) + username = Column(String()) + email = Column(String()) + balance = Column(Float()) + + def __str__(self): + return f"{self.id} {self.username} {self.email}" + + +Base.metadata.create_all(engine) + +__all__ = ['User', 'DatabaseSession'] diff --git a/examples/redis/__init__.py b/examples/redis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/redis/database/__init__.py b/examples/redis/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/redis/database/dependencies.py b/examples/redis/database/dependencies.py new file mode 100644 index 0000000..d9172d3 --- /dev/null +++ b/examples/redis/database/dependencies.py @@ -0,0 +1,23 @@ +from redis.client import Redis + +from assimilator.redis_ import RedisRepository, RedisUnitOfWork +from examples.redis.database.models import RedisUser +from internal import InternalRepository, InternalUnitOfWork + +# from examples.alchemy.database.dependencies import create_uow, create_repository + +session = Redis() +# session = {} + + +def create_repository(): # factory for RedisRepository + # return InternalRepository(session, model=RedisUser) + return RedisRepository(session=session, model=RedisUser) + + +def create_uow(): # factory for RedisUnitOfWork + # return InternalUnitOfWork(repository=create_repository()) + return RedisUnitOfWork(repository=create_repository()) + + +__all__ = ['create_repository', 'create_uow'] diff --git a/examples/redis/database/main.py b/examples/redis/database/main.py new file mode 100644 index 0000000..7db098b --- /dev/null +++ b/examples/redis/database/main.py @@ -0,0 +1,106 @@ +import random +import string +from uuid import UUID + +from assimilator.core.database import UnitOfWork, Repository, NotFoundError +from assimilator.core.patterns import LazyCommand + +from dependencies import create_uow +from examples.redis.database.models import RedisUser + + +def create_user(username: str, email: str, balance: float, uow: UnitOfWork) -> UUID: + with uow: + new_user = uow.repository.save(username=username, email=email, balance=balance) + uow.commit() + return new_user.id # id generated by default. But, we can change that behaviour if we need + + +def read_user(id: UUID, repository: Repository) -> RedisUser: + return repository.get(repository.specs.filter(id=id)) + + +def buy_product(user_id: UUID, product_price: int, uow: UnitOfWork): + with uow: + found_user = read_user(id=user_id, repository=uow.repository) + + found_user.balance -= product_price + uow.repository.update(found_user) + uow.commit() + + +def refresh_user(old_user: RedisUser, repository: Repository) -> RedisUser: + #repository.refresh(old_user) + return old_user + + +def create_many_users(uow): + with uow: + for i in range(100): + uow.repository.save( + username="".join(random.sample(string.ascii_letters, 10)), + email=f"{''.join(random.sample(string.ascii_letters, 10))}@gmail.com", + balance=random.randint(0, 100), + ) + + uow.commit() # Commit is only called once! + + +def show_rich_users(balance: int, repository: Repository): + users: LazyCommand[RedisUser] = repository.filter( + repository.specs.filter(balance__gt=balance), + repository.specs.paginate(limit=10), + lazy=True, + ) + + for rich_user in users: + print("The user", rich_user.username, "is rich!", "Balance:", rich_user.balance) + + +def delete_user(id: UUID, uow: UnitOfWork): + with uow: + user_to_delete = uow.repository.get( + uow.repository.specs.filter(id=id) + ) + uow.repository.delete(user_to_delete) + uow.commit() + + +def order_users(repository: Repository): + for i, ordered_user in enumerate(repository.filter( + repository.specs.order('id', '-balance'), + repository.specs.paginate(offset=20, limit=40), + )): + print(f"User {i} ordered by id and balance:", ordered_user.username, ordered_user.balance) + + +if __name__ == '__main__': + new_user_id = create_user( + username="Andrey", + email="python.on.papyrus@gmail.com", + balance=1000, + uow=create_uow(), + ) + + print(f"User with '{new_user_id}' was created") + + user = read_user(id=new_user_id, repository=create_uow().repository) + print("User returned from Redis:", user) + + buy_product(user_id=new_user_id, product_price=100, uow=create_uow()) + + updated_user = refresh_user(user, repository=create_uow().repository) + print("User balance after product purchase:", updated_user.balance) + + create_many_users(create_uow()) + show_rich_users(balance=90, repository=create_uow().repository) + + delete_user(id=new_user_id, uow=create_uow()) + print("User is deleted from the storage!") + + try: + read_user(id=new_user_id, repository=create_uow().repository) + except NotFoundError as error: + print("User was not found due to his deletion! Error:", error) + + order_users(repository=create_uow().repository) diff --git a/examples/redis/database/models.py b/examples/redis/database/models.py new file mode 100644 index 0000000..bde8322 --- /dev/null +++ b/examples/redis/database/models.py @@ -0,0 +1,10 @@ +from assimilator.redis_.database import RedisModel + + +class RedisUser(RedisModel): + username: str + email: str + balance: float = 0 + + +__all__ = ['RedisUser'] diff --git a/mkdocs.yml b/mkdocs.yml index 0bf67f1..d38048d 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -1,6 +1,27 @@ site_name: Assimilator - the best Python patterns +site_description: Assimilator Python framework, Domain-Driven Design, DDD, high performance, easy to learn, fast to code theme: name: material + logo: images/logo.png + features: + - search.suggest + - search.highlight + - content.tabs.link + palette: + - media: '(prefers-color-scheme: light)' + scheme: default + primary: deep purple + accent: purple + toggle: + icon: material/lightbulb + name: Switch to light mode + - media: '(prefers-color-scheme: dark)' + scheme: slate + primary: deep purple + accent: purple + toggle: + icon: material/lightbulb-outline + name: Switch to dark mode nav: - Introduction: index.md - Concepts: concepts.md @@ -21,5 +42,5 @@ nav: - Advanced topics: - Core: core/core.md repo_url: https://github.com/knucklesuganda/py_assimilator -site_description: PyAssimilator allows you to write the best patterns in your projects. -site_author: Andrey Ivanov +repo_name: knucklesuganda/py_assimilator +site_author: Andrey Ivanov | Python diff --git a/setup.py b/setup.py index fdbb6a1..2c35109 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='py_assimilator', - version='0.2.3', + version='0.3.0', author='Andrey Ivanov', author_email='python.on.papyrus@gmail.com', url='https://pypi.python.org/pypi/py_assimilator/', diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/core/__init__.py b/tests/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/core/patterns/__init__.py b/tests/core/patterns/__init__.py new file mode 100644 index 0000000..e69de29