Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding a new Filter model for tables and sequences #686

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions configs/datacenter-name/database-name/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
},
"other_users": null
},
"tables": null,
"sequences": null
"tables": {
"include": [],
"exclude": []
},
"sequences": {
"include": [],
"exclude": []
}
}
14 changes: 13 additions & 1 deletion pgbelt/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
from collections.abc import Awaitable
from os.path import join
from typing import Optional # noqa: F401 # Needed until tiangolo/typer#522 is fixed)

from asyncpg import create_pool
from pgbelt.config.models import DbupgradeConfig
from pgbelt.config.remote import resolve_remote_config
from pgbelt.util import get_logger
from pgbelt.util.asyncfuncs import isdir
from pgbelt.util.asyncfuncs import isfile
from pgbelt.util.asyncfuncs import listdir
from pgbelt.util.postgres import analyze_table_pkeys


def get_config(
Expand Down Expand Up @@ -63,6 +64,17 @@
else:
await config.save()

async with create_pool(config.src.owner_uri, min_size=1) as src_pool:
_, list_of_tables, _ = await analyze_table_pkeys(src_pool, config.schema_name, logger)
config.tables = config.tables.apply(list_of_tables)
seqs = await src_pool.fetch(
f"""
SELECT sequence_name
FROM information_schema.sequences
WHERE sequence_schema = '{config.schema_name}';
""")

Check warning on line 75 in pgbelt/config/config.py

View check run for this annotation

Autodesk Chorus / security/bandit

B608: hardcoded_sql_expressions

Possible SQL injection vector through string-based query construction. secure coding id: PYTH-INJC-20.
config.sequences = config.sequences.apply(seqs)

return config


Expand Down
39 changes: 33 additions & 6 deletions pgbelt/config/models.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from __future__ import annotations

from logging import Logger
from os.path import join
from typing import Optional # noqa: F401 # Needed until tiangolo/typer#522 is fixed)

from typing import Any, Callable, Optional # noqa: F401 # Needed until tiangolo/typer#522 is fixed)
from asyncpg import Pool
from asyncpg import create_pool
from aiofiles import open as aopen
from aiofiles.os import remove
from pgbelt.util import get_logger
from pgbelt.util.asyncfuncs import makedirs
from pgbelt.util.postgres import analyze_table_pkeys
from pydantic import BaseModel
from pydantic import ValidationError
from pydantic import field_validator
Expand Down Expand Up @@ -107,6 +110,30 @@ def pglogical_uri(self) -> str:
return f"postgresql://{self.pglogical_user.name}:{password}@{self.ip}:{self.port}/{self.db}"


class FilterConfig(BaseModel):
"""
Represents a object that includes the include and the exclude lists.

include: list of string.
exclude: list of string.
"""

include: Optional[list[str]] = None
exclude: Optional[list[str]] = None

@classmethod
async def apply(self, list_of_items: list[str]) -> list[str]:
filtered = []
# Filter based on the include list if provided
if self.include is not None:
filtered = [item for item in list_of_items if item in self.include]

# Filter based on the exclude list if provided
if self.exclude is not None:
filtered = [item for item in list_of_items if item not in self.exclude]

return filtered

class DbupgradeConfig(BaseModel):
"""
Represents a migration to be performed.
Expand All @@ -115,17 +142,17 @@ class DbupgradeConfig(BaseModel):
dc: str A name used to identify the environment this database pair is in. Used in cli commands.
src: DbConfig The database we are moving data out of.
dst: DbConfig The database we are moving data into.
tables: Optional[list[str]] A list of tables to replicate. If not provided all tables in the named schema will be replicated.
sequences: Optional[list[str]] A list of sequences to replicate. If not provided all sequences in the named schema will be replicated.
tables: Optional[FilterConfig] A object for filters to filter out tables ending up with a list of tables to replicate. If not provided all tables in the named schema will be replicated.
sequences: Optional[FilterConfig] A object for filters to filter out sequences ending up with a list of sequences to replicate. If not provided all sequences in the named schema will be replicated.
schema_name: Optional[str] The schema to operate on. Defaults to "public".
"""

db: str
dc: str
src: Optional[DbConfig] = None
dst: Optional[DbConfig] = None
tables: Optional[list[str]] = None
sequences: Optional[list[str]] = None
tables: Optional[FilterConfig] = None
sequences: Optional[FilterConfig] = None
schema_name: Optional[str] = "public"

_not_empty = field_validator("db", "dc")(not_empty)
Expand Down
12 changes: 6 additions & 6 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ async def _create_dbupgradeconfigs() -> dict[str, DbupgradeConfig]:
db_upgrade_config_kwargs["schema_name"] = (
"non_public_schema" if "nonpublic" in s else "public"
)
db_upgrade_config_kwargs["tables"] = (
["UsersCapital", "existingSomethingIds"] if "exodus" in s else None
)
db_upgrade_config_kwargs["sequences"] = (
["userS_id_seq"] if "exodus" in s else None
)
db_upgrade_config_kwargs["tables"] = {
"include": ["UsersCapital", "existingSomethingIds"] if "exodus" in s else None
}
db_upgrade_config_kwargs["sequences"] = {
"include": ["userS_id_seq"] if "exodus" in s else None
}
config = DbupgradeConfig(**db_upgrade_config_kwargs)

# The IP addresses are set in the docker-compose file, so we can pull them out of the environment. They follow the following pattern:
Expand Down
Loading