From 4a4293bb6b28e03149f9521d638423d5a8bf1be6 Mon Sep 17 00:00:00 2001 From: almogeldabach Date: Wed, 26 Feb 2025 15:07:13 +0200 Subject: [PATCH 1/2] adding a new Filter model for tables and sequences Adding logic to apply the filter to the tables and sequences on start and then filling the results on the config --- pgbelt/config/config.py | 14 +++++++++++++- pgbelt/config/models.py | 38 ++++++++++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/pgbelt/config/config.py b/pgbelt/config/config.py index 9af298d..34ce1cd 100644 --- a/pgbelt/config/config.py +++ b/pgbelt/config/config.py @@ -3,13 +3,14 @@ from collections.abc import Awaitable from os.path import join from typing import Optional # noqa: F401 # Needed until tiangolo/typer#522 is fixed) - +from asyncpg import create_pool from pgbelt.config.models import DbupgradeConfig from pgbelt.config.remote import resolve_remote_config from pgbelt.util import get_logger from pgbelt.util.asyncfuncs import isdir from pgbelt.util.asyncfuncs import isfile from pgbelt.util.asyncfuncs import listdir +from pgbelt.util.postgres import analyze_table_pkeys def get_config( @@ -63,6 +64,17 @@ async def get_config_async( else: await config.save() + async with create_pool(config.src.owner_uri, min_size=1) as src_pool: + _, list_of_tables, _ = await analyze_table_pkeys(src_pool, config.schema_name, logger) + config.tables = config.tables.apply(list_of_tables) + seqs = await src_pool.fetch( + f""" + SELECT sequence_name + FROM information_schema.sequences + WHERE sequence_schema = '{config.schema_name}'; + """) + config.sequences = config.sequences.apply(seqs) + return config diff --git a/pgbelt/config/models.py b/pgbelt/config/models.py index b0dfe35..7cf01c6 100644 --- a/pgbelt/config/models.py +++ b/pgbelt/config/models.py @@ -1,12 +1,15 @@ from __future__ import annotations +from logging import Logger from os.path import join -from typing import Optional # noqa: F401 # Needed until tiangolo/typer#522 is fixed) - +from typing import Any, Callable, Optional # noqa: F401 # Needed until tiangolo/typer#522 is fixed) +from asyncpg import Pool +from asyncpg import create_pool from aiofiles import open as aopen from aiofiles.os import remove from pgbelt.util import get_logger from pgbelt.util.asyncfuncs import makedirs +from pgbelt.util.postgres import analyze_table_pkeys from pydantic import BaseModel from pydantic import ValidationError from pydantic import field_validator @@ -107,6 +110,29 @@ def pglogical_uri(self) -> str: return f"postgresql://{self.pglogical_user.name}:{password}@{self.ip}:{self.port}/{self.db}" +class FilterConfig(BaseModel): + """ + Represents a object that includes the include and the exclude lists. + + include: list of string. + exclude: list of string. + """ + + include: Optional[list[str]] = None + exclude: Optional[list[str]] = None + + @classmethod + async def apply(self, list_of_items: list[str]) -> list[str]: + filtered = [] + if self.include is not None: + filtered = [item for item in list_of_items if item in self.include] + + # Filter based on the exclude list if provided + if self.exclude is not None: + filtered = [item for item in list_of_items if item not in self.exclude] + + return filtered + class DbupgradeConfig(BaseModel): """ Represents a migration to be performed. @@ -115,8 +141,8 @@ class DbupgradeConfig(BaseModel): dc: str A name used to identify the environment this database pair is in. Used in cli commands. src: DbConfig The database we are moving data out of. dst: DbConfig The database we are moving data into. - tables: Optional[list[str]] A list of tables to replicate. If not provided all tables in the named schema will be replicated. - sequences: Optional[list[str]] A list of sequences to replicate. If not provided all sequences in the named schema will be replicated. + tables: Optional[FilterConfig] A object for filters to filter out tables ending up with a list of tables to replicate. If not provided all tables in the named schema will be replicated. + sequences: Optional[FilterConfig] A object for filters to filter out sequences ending up with a list of sequences to replicate. If not provided all sequences in the named schema will be replicated. schema_name: Optional[str] The schema to operate on. Defaults to "public". """ @@ -124,8 +150,8 @@ class DbupgradeConfig(BaseModel): dc: str src: Optional[DbConfig] = None dst: Optional[DbConfig] = None - tables: Optional[list[str]] = None - sequences: Optional[list[str]] = None + tables: Optional[FilterConfig] = None + sequences: Optional[FilterConfig] = None schema_name: Optional[str] = "public" _not_empty = field_validator("db", "dc")(not_empty) From 0e14fcb184221a77511eb6d1673252a4323c7097 Mon Sep 17 00:00:00 2001 From: almogeldabach Date: Wed, 26 Feb 2025 16:21:06 +0200 Subject: [PATCH 2/2] test change, config change --- configs/datacenter-name/database-name/config.json | 10 ++++++++-- pgbelt/config/models.py | 1 + tests/integration/conftest.py | 12 ++++++------ 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/configs/datacenter-name/database-name/config.json b/configs/datacenter-name/database-name/config.json index dba3fde..f5aaeae 100644 --- a/configs/datacenter-name/database-name/config.json +++ b/configs/datacenter-name/database-name/config.json @@ -39,6 +39,12 @@ }, "other_users": null }, - "tables": null, - "sequences": null + "tables": { + "include": [], + "exclude": [] + }, + "sequences": { + "include": [], + "exclude": [] + } } diff --git a/pgbelt/config/models.py b/pgbelt/config/models.py index 7cf01c6..a298a56 100644 --- a/pgbelt/config/models.py +++ b/pgbelt/config/models.py @@ -124,6 +124,7 @@ class FilterConfig(BaseModel): @classmethod async def apply(self, list_of_items: list[str]) -> list[str]: filtered = [] + # Filter based on the include list if provided if self.include is not None: filtered = [item for item in list_of_items if item in self.include] diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 5ac3bc9..36001cd 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -53,12 +53,12 @@ async def _create_dbupgradeconfigs() -> dict[str, DbupgradeConfig]: db_upgrade_config_kwargs["schema_name"] = ( "non_public_schema" if "nonpublic" in s else "public" ) - db_upgrade_config_kwargs["tables"] = ( - ["UsersCapital", "existingSomethingIds"] if "exodus" in s else None - ) - db_upgrade_config_kwargs["sequences"] = ( - ["userS_id_seq"] if "exodus" in s else None - ) + db_upgrade_config_kwargs["tables"] = { + "include": ["UsersCapital", "existingSomethingIds"] if "exodus" in s else None + } + db_upgrade_config_kwargs["sequences"] = { + "include": ["userS_id_seq"] if "exodus" in s else None + } config = DbupgradeConfig(**db_upgrade_config_kwargs) # The IP addresses are set in the docker-compose file, so we can pull them out of the environment. They follow the following pattern: