Skip to content

Commit

Permalink
Merge pull request #9 from knucklesuganda/fix/alchemy_repository
Browse files Browse the repository at this point in the history
Working Databases for SQLAlchemy, Redis, and Internal. Added custom specifications that allow users to quickly change Data Storages
  • Loading branch information
knucklesuganda authored Jan 19, 2023
2 parents 6c981d9 + b27d4e7 commit 17410fe
Show file tree
Hide file tree
Showing 61 changed files with 1,582 additions and 330 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Assimilator - the best Python patterns for the best projects

![](/images/logo.png)

## Install now
* `pip install py_assimilator`

Expand Down
2 changes: 1 addition & 1 deletion assimilator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ def optional_dependencies(error: str = "ignore"):
import assimilator.kafka

with optional_dependencies():
import assimilator.redis
import assimilator.redis_ as redis
16 changes: 16 additions & 0 deletions assimilator/alchemy/database/error_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sqlalchemy.exc import NoResultFound, IntegrityError, SQLAlchemyError

from assimilator.core.database.exceptions import DataLayerError, NotFoundError, InvalidQueryError
from assimilator.core.patterns.error_wrapper import ErrorWrapper


class AlchemyErrorWrapper(ErrorWrapper):
def __init__(self):
super(AlchemyErrorWrapper, self).__init__(error_mappings={
NoResultFound: NotFoundError,
IntegrityError: InvalidQueryError,
SQLAlchemyError: DataLayerError,
}, default_error=DataLayerError)


__all__ = ['AlchemyErrorWrapper']
137 changes: 102 additions & 35 deletions assimilator/alchemy/database/repository.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,125 @@
from typing import Type
from typing import Type, Union, Optional, TypeVar, Collection, Dict

from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import Query
from sqlalchemy import func, select, update, delete
from sqlalchemy.orm import Session, Query
from sqlalchemy.inspection import inspect

from assimilator.alchemy.database.error_wrapper import AlchemyErrorWrapper
from assimilator.core.database.exceptions import InvalidQueryError
from assimilator.alchemy.database.specifications import AlchemySpecificationList
from assimilator.core.database import BaseRepository, Specification, SpecificationList
from assimilator.core.database.exceptions import NotFoundError
from assimilator.core.database import Repository, SpecificationList, \
LazyCommand, SpecificationType, make_lazy
from assimilator.core.patterns.error_wrapper import ErrorWrapper


class AlchemyRepository(BaseRepository):
AlchemyModelT = TypeVar("AlchemyModelT")


class AlchemyRepository(Repository):
session: Session
model: Type[AlchemyModelT]

def __init__(
self, session, initial_query: Query = None, specifications: Type[SpecificationList] = AlchemySpecificationList,
self,
session: Session,
model: Type[AlchemyModelT],
initial_query: Query = None,
specifications: Type[SpecificationList] = AlchemySpecificationList,
error_wrapper: Optional[ErrorWrapper] = None,
):
super(AlchemyRepository, self).__init__(
session=session,
initial_query=initial_query,
model=model,
initial_query=initial_query if initial_query is not None else select(model),
specifications=specifications,
error_wrapper=error_wrapper or AlchemyErrorWrapper(),
)

def _execute_query(self, query):
return self.session.execute(query)

def get(self, *specifications: Specification, lazy=False):
try:
data = self._execute_query(self._apply_specifications(specifications))
if lazy:
return data

return data.one()[0]
@make_lazy
def get(
self,
*specifications: SpecificationType,
lazy: bool = False,
initial_query: Query = None,
) -> Union[AlchemyModelT, LazyCommand[AlchemyModelT]]:
query = self._apply_specifications(
query=self.get_initial_query(initial_query),
specifications=specifications,
)
return self.session.execute(query).one()[0]

@make_lazy
def filter(
self,
*specifications: SpecificationType,
lazy: bool = False,
initial_query: Query = None,
) -> Union[Collection[AlchemyModelT], LazyCommand[Collection[AlchemyModelT]]]:
query = self._apply_specifications(
query=self.get_initial_query(initial_query),
specifications=specifications,
)
return [result[0] for result in self.session.execute(query)]

def update(self, obj: Optional[AlchemyModelT] = None, *specifications, **update_values) -> None:
if specifications:
if not update_values:
raise InvalidQueryError(
"You did not provide any update_values "
"to the update() yet provided specifications"
)

query: Query = self._apply_specifications(
query=self.get_initial_query(update(self.model)),
specifications=specifications,
)
self.session.execute(query.values(update_values))
elif obj is not None:
self.session.add(obj)

def save(self, obj: Optional[AlchemyModelT] = None, **data) -> AlchemyModelT:
if obj is None:
obj = self.model(**data)

except NoResultFound as exc:
raise NotFoundError(exc)
self.session.add(obj)
return obj

def filter(self, *specifications: Specification, lazy=False):
data = self._execute_query(self._apply_specifications(specifications))
if lazy:
return data
def refresh(self, obj: AlchemyModelT) -> None:
inspection = inspect(obj)

return [result[0] for result in data]
if inspection.transient or inspection.pending:
return
elif inspection.detached:
self.session.add(obj)

def update(self, obj):
""" We don't do anything, as the object is going to be updated with the obj.key = value """
self.session.refresh(obj)

def save(self, obj):
self.session.add(obj)
def delete(self, obj: Optional[AlchemyModelT] = None, *specifications: SpecificationType) -> None:
if specifications:
query: Query = self._apply_specifications(
query=self.get_initial_query(delete(self.model)),
specifications=specifications,
)
self.session.execute(query)
elif obj is not None:
self.session.delete(obj)

def is_modified(self, obj: AlchemyModelT) -> bool:
return self.session.is_modified(obj)

def refresh(self, obj):
self.session.refresh(obj)
@make_lazy
def count(self, *specifications: SpecificationType, lazy: bool = False) -> Union[LazyCommand[int], int]:
primary_keys = inspect(self.model).primary_key

def delete(self, obj):
self.session.delete(obj)
if not primary_keys:
raise InvalidQueryError("Your repository model does not have"
" any primary keys. We cannot use count()")

def is_modified(self, obj):
return self.session.is_modified(obj)
return self.get(
*specifications,
lazy=False,
query=select(func.count(getattr(self.model, primary_keys[0].name))),
)


__all__ = [
Expand Down
57 changes: 48 additions & 9 deletions assimilator/alchemy/database/specifications.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,68 @@
from typing import Iterable, Collection
from typing import Collection, Optional

from sqlalchemy.orm import Query
from sqlalchemy.sql.operators import is_
from sqlalchemy import column, desc

from assimilator.core.database.specifications import Specification, specification, SpecificationList
from assimilator.core.database.specifications import specification, SpecificationList,\
filter_parameter_parser


class AlchemySpecification(Specification):
def apply(self, query: Query) -> Query:
return super(AlchemySpecification, self).apply(query)
alchemy_filter_mappings = {
"__gt": lambda field_, val: column(field_) > val,
"__gte": lambda field_, val: column(field_) >= val,
"__lt": lambda field_, val: column(field_) < val,
"__lte": lambda field_, val: column(field_) <= val,
"__not": lambda field_, val: column(field_) != val,
"__is": lambda field_, val: is_(column(field_, val)),
}


@specification
def alchemy_filter(*filters, query: Query, **filters_by) -> Query:
filters = list(filters)

for field, value in dict(filters_by).items():
_, parsed_filter = filter_parameter_parser(
field=field,
value=value,
filter_mappings=alchemy_filter_mappings,
)

if parsed_filter is not None:
filters.append(parsed_filter)
del filters_by[field]

return query.filter(*filters).filter_by(**filters_by)


@specification
def alchemy_order(*clauses: str, query: Query) -> Query:
return query.order_by(*clauses)
parsed_clauses = []

for clause in clauses:
if clause.startswith("-"):
parsed_clauses.append(desc(column(clause[1:])))
else:
parsed_clauses.append(clause)

return query.order_by(*parsed_clauses)


@specification
def alchemy_paginate(limit: int, offset: int, query: Query) -> Query:
return query.limit(limit).offset(offset)
def alchemy_paginate(
*,
limit: Optional[int] = None,
offset: Optional[int] = None,
query: Query,
) -> Query:

if limit is not None:
query = query.limit(limit)
if offset is not None:
query = query.offset(offset)

return query


@specification
Expand All @@ -45,7 +85,6 @@ class AlchemySpecificationList(SpecificationList):

__all__ = [
'AlchemySpecificationList',
'AlchemySpecification',
'alchemy_filter',
'alchemy_order',
'alchemy_paginate',
Expand Down
19 changes: 12 additions & 7 deletions assimilator/alchemy/database/unit_of_work.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from sqlalchemy.exc import IntegrityError

from assimilator.alchemy.database.repository import AlchemyRepository
from assimilator.alchemy.database.error_wrapper import AlchemyErrorWrapper
from assimilator.core.database.unit_of_work import UnitOfWork
from assimilator.core.database.exceptions import InvalidQueryError
from assimilator.core.patterns.error_wrapper import ErrorWrapper


class AlchemyUnitOfWork(UnitOfWork):
repository: AlchemyRepository

def __init__(self, repository: AlchemyRepository, error_wrapper: ErrorWrapper = None):
super(AlchemyUnitOfWork, self).__init__(
repository=repository,
error_wrapper=error_wrapper or AlchemyErrorWrapper(),
)

def begin(self):
self.repository.session.begin()

Expand All @@ -15,10 +23,7 @@ def close(self):
pass

def commit(self):
try:
self.repository.session.commit()
except IntegrityError as exc:
raise InvalidQueryError(exc)
self.repository.session.commit()


__all__ = [
Expand Down
26 changes: 24 additions & 2 deletions assimilator/alchemy/events/database/repository.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
from typing import Type, Optional

from sqlalchemy import Table
from sqlalchemy.orm import Query

from assimilator.alchemy.database.specifications import AlchemySpecificationList
from assimilator.alchemy.database.repository import AlchemyRepository
from assimilator.core.database import SpecificationList
from assimilator.core.patterns.error_wrapper import ErrorWrapper


class AlchemyOutboxRepository(AlchemyRepository):
def __init__(self, event_model, session, initial_query=None):
super(AlchemyOutboxRepository, self).__init__(session, initial_query)
def __init__(
self,
session,
event_model: Type[Table],
model: Type[Table],
initial_query: Optional[Query] = None,
specifications: Type[SpecificationList] = AlchemySpecificationList,
error_wrapper: ErrorWrapper = None,
):
super(AlchemyOutboxRepository, self).__init__(
session=session,
initial_query=initial_query,
model=model,
specifications=specifications,
error_wrapper=error_wrapper,
)
self.event_model = event_model

def save(self, obj):
Expand Down
25 changes: 13 additions & 12 deletions assimilator/alchemy/events/outbox_relay.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from sqlalchemy import Column, BigInteger, Text, DateTime

from assimilator.core.events import Event
from assimilator.core.events.events import Event
from assimilator.core.database.unit_of_work import UnitOfWork
from assimilator.core.events import OutboxRelay
from assimilator.core.events.events_bus import EventBus
from assimilator.core.events.events_bus import EventProducer


def create_outbox_event_model(Base):
Expand All @@ -24,22 +24,23 @@ def __init__(self, event: Event, *args, **kwargs):


class AlchemyOutboxRelay(OutboxRelay):
def __init__(self, outbox_event_model, uow: UnitOfWork, event_bus: EventBus):
super(AlchemyOutboxRelay, self).__init__(uow=uow, event_bus=event_bus)
def __init__(self, outbox_event_model, uow: UnitOfWork, producer: EventProducer):
super(AlchemyOutboxRelay, self).__init__(uow=uow, producer=producer)
self.outbox_event_model = outbox_event_model

def start(self):
while True:
with self.uow:
events = self.uow.repository.filter()
with self.producer:
while True:
with self.uow:
events = self.uow.repository.filter()

for event in events:
self.event_bus.produce(event)
for event in events:
self.producer.produce(event)

self.acknowledge(events)
self.uow.commit()
self.acknowledge(events)
self.uow.commit()

self.delay_function()
self.delay_function()

def delay_function(self):
raise NotImplementedError("delay_function() is not implemented")
Expand Down
1 change: 1 addition & 0 deletions assimilator/core/database/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from assimilator.core.database.repository import *
from assimilator.core.database.specifications import *
from assimilator.core.database.unit_of_work import *
from assimilator.core.database.exceptions import *
Loading

0 comments on commit 17410fe

Please sign in to comment.