diff --git a/src/cdf/core/feature_flag/__init__.py b/src/cdf/core/feature_flag/__init__.py index fd43aa4..fc8e521 100644 --- a/src/cdf/core/feature_flag/__init__.py +++ b/src/cdf/core/feature_flag/__init__.py @@ -2,112 +2,46 @@ import typing as t -import fsspec +import pydantic from dlt.common.configuration import with_config -import cdf.core.logger as logger -from cdf.core.feature_flag.file import create_file_provider -from cdf.core.feature_flag.harness import create_harness_provider -from cdf.core.feature_flag.launchdarkly import create_launchdarkly_provider +from cdf.core.feature_flag.file import FileFlagProvider +from cdf.core.feature_flag.harness import HarnessFlagProvider +from cdf.core.feature_flag.launchdarkly import LaunchDarklyFlagProvider +from cdf.core.feature_flag.noop import NoopFlagProvider -if t.TYPE_CHECKING: - from dlt.sources import DltSource - - -# The general interface for a feature flag provider -# TODO: we should decouple the protocol from dlt sources -class SupportsFFs(t.Protocol): - def __call__(self, source: "DltSource") -> "DltSource": ... - - -def create_noop_provider() -> SupportsFFs: - def _processor(source: "DltSource") -> "DltSource": - return source - - return _processor - - -class NoopProviderOptions(t.TypedDict): ... - - -@t.overload -def load_feature_flag_provider( - provider: t.Literal["noop"] = "noop", - options: t.Optional[NoopProviderOptions] = None, -) -> SupportsFFs: ... - - -class FileProviderOptions(t.TypedDict): - path: str - fs: fsspec.AbstractFileSystem - - -@t.overload -def load_feature_flag_provider( - provider: t.Literal["file"] = "file", - options: t.Optional[FileProviderOptions] = None, -) -> SupportsFFs: ... - - -class HarnessProviderOptions(t.TypedDict): - api_key: str - sdk_key: str - account: str - organization: str - project: str - - -@t.overload -def load_feature_flag_provider( - provider: t.Literal["harness"] = "harness", - options: t.Optional[HarnessProviderOptions] = None, -) -> SupportsFFs: ... - - -class LaunchDarklyProviderOptions(t.TypedDict): - sdk_key: str +FlagProvider = t.Union[ + FileFlagProvider, + HarnessFlagProvider, + LaunchDarklyFlagProvider, + NoopFlagProvider, +] -@t.overload -def load_feature_flag_provider( - provider: t.Literal["launchdarkly"] = "launchdarkly", - options: t.Optional[LaunchDarklyProviderOptions] = None, -) -> SupportsFFs: ... +def _ensure_dict(o: t.Any) -> t.Dict[str, t.Any]: + """Unwraps dynaconf config objects to dict.""" + if isinstance(o, dict): + return o + return o.to_dict() @with_config(sections=("feature_flags",)) def load_feature_flag_provider( provider: t.Literal["file", "harness", "launchdarkly", "noop"] = "noop", - options: t.Optional[ - t.Union[ - NoopProviderOptions, - FileProviderOptions, - HarnessProviderOptions, - LaunchDarklyProviderOptions, - ] - ] = None, -) -> SupportsFFs: - opts = t.cast(dict, options or {}) - if provider == "file": - logger.info("Using file-based feature flags") - return create_file_provider(**opts) - if provider == "harness": - logger.info("Using Harness feature flags") - return create_harness_provider(**opts) - if provider == "launchdarkly": - logger.info("Using LaunchDarkly feature flags") - return create_launchdarkly_provider(**opts) - if provider is None or provider == "noop": - logger.info("No feature flag provider configured") - return create_noop_provider(**opts) - raise ValueError(f"Unknown feature flag provider: {provider}") + options: t.Optional[t.Dict[str, t.Any]] = None, +) -> FlagProvider: + options = _ensure_dict(options or {}) + options["provider"] = provider + return t.cast( + FlagProvider, pydantic.TypeAdapter(FlagProvider).validate_python(options) + ) __all__ = [ - "SupportsFFs", - "create_noop_provider", - "create_file_provider", - "create_harness_provider", - "create_launchdarkly_provider", "load_feature_flag_provider", + "FlagProvider", + "FileFlagProvider", + "HarnessFlagProvider", + "LaunchDarklyFlagProvider", + "NoopFlagProvider", ] diff --git a/src/cdf/core/feature_flag/base.py b/src/cdf/core/feature_flag/base.py new file mode 100644 index 0000000..4a78ba0 --- /dev/null +++ b/src/cdf/core/feature_flag/base.py @@ -0,0 +1,15 @@ +import abc +import typing as t + +import pydantic + +if t.TYPE_CHECKING: + from dlt.sources import DltSource + + +class BaseFlagProvider(pydantic.BaseModel, abc.ABC): + provider: str + + @abc.abstractmethod + def apply_source(self, source: "DltSource") -> "DltSource": + """Apply the feature flags to a dlt source.""" diff --git a/src/cdf/core/feature_flag/file.py b/src/cdf/core/feature_flag/file.py index 219464d..c862f74 100644 --- a/src/cdf/core/feature_flag/file.py +++ b/src/cdf/core/feature_flag/file.py @@ -4,42 +4,43 @@ import typing as t from threading import Lock -import dlt import fsspec -from dlt.common.configuration import with_config +import pydantic + +import cdf.core.logger as logger +from cdf.core.feature_flag.base import BaseFlagProvider if t.TYPE_CHECKING: from dlt.sources import DltSource - from cdf.core.feature_flag import SupportsFFs -WLock = Lock() +class FileFlagProvider(BaseFlagProvider, extra="allow", arbitrary_types_allowed=True): + path: str = pydantic.Field( + description="The path to the file where the feature flags are stored in the configured filesystem." + ) + storage: fsspec.AbstractFileSystem = pydantic.Field( + default=fsspec.filesystem("file") + ) + + provider: t.Literal["file"] = "file" + _lock: Lock = pydantic.PrivateAttr(default_factory=Lock) -@with_config(sections=("feature_flags", "options")) -def create_file_provider( - path: str = dlt.config.value, - fs: fsspec.AbstractFileSystem = fsspec.filesystem("file"), - **_: t.Any, -) -> "SupportsFFs": - def _processor(source: "DltSource") -> "DltSource": - if not fs.exists(path): + def apply_source(self, source: "DltSource") -> "DltSource": + """Apply the feature flags to a dlt source.""" + logger.info("Reading feature flags from %s", self.path) + if not self.storage.exists(self.path): flags = {} else: - with fs.open(path) as file: + with self.storage.open(self.path) as file: flags = json.load(file) - source_name = source.name for resource_name, resource in source.selected_resources.items(): key = f"{source_name}.{resource_name}" resource.selected = flags.setdefault(key, False) - - with WLock, fs.open(path, "w") as file: + with self._lock, self.storage.open(self.path, "w") as file: json.dump(flags, file, indent=2) - return source - return _processor - -__all__ = ["create_file_provider"] +__all__ = ["FileFlagProvider"] diff --git a/src/cdf/core/feature_flag/harness.py b/src/cdf/core/feature_flag/harness.py index cbfedc6..cb56d5f 100644 --- a/src/cdf/core/feature_flag/harness.py +++ b/src/cdf/core/feature_flag/harness.py @@ -8,8 +8,8 @@ import typing as t from concurrent.futures import ThreadPoolExecutor -import dlt -from dlt.common.configuration import with_config +import pydantic +from dlt.sources import DltSource from dlt.sources.helpers import requests from featureflags.client import CfClient, Config, Target from featureflags.evaluations.feature import FeatureConfigKind @@ -18,17 +18,15 @@ import cdf.core.context as context import cdf.core.logger as logger +from cdf.core.feature_flag.base import BaseFlagProvider from cdf.types.monads import Promise -if t.TYPE_CHECKING: - from dlt.sources import DltSource - - from cdf.core.feature_flag import SupportsFFs - # This exists because the default harness LRU implementation does not store >1000 flags # The interface is mostly satisfied by dict, so we subclass it and implement the missing methods class _HarnessCache(dict, Cache): + """A cache implementation for the harness feature flag provider.""" + def set(self, key: str, value: bool) -> None: self[key] = value @@ -39,50 +37,99 @@ def remove(self, key: str | t.List[str]) -> None: self.pop(k, None) -@with_config(sections=("feature_flags", "options")) -def create_harness_provider( - api_key: str = dlt.secrets.value, - sdk_key: str = dlt.secrets.value, - account: str = os.getenv("HARNESS_ACCOUNT_ID", dlt.config.value), - organization: str = os.getenv("HARNESS_ORG_ID", dlt.config.value), - project: str = os.getenv("HARNESS_PROJECT_ID", dlt.config.value), - **_: t.Any, -) -> "SupportsFFs": - _ff_logger.setLevel(logging.ERROR) - - def _get_client() -> CfClient: - client = CfClient( - sdk_key=sdk_key, +class HarnessFlagProvider(BaseFlagProvider, extra="allow"): + """Harness feature flag provider.""" + + api_key: str = pydantic.Field( + description="The harness API key. Get it from your user settings.", + pattern=r"^pat\.[a-zA-Z0-9_\-]+$", + ) + sdk_key: pydantic.UUID4 = pydantic.Field( + description="The harness SDK key. Get it from the environment management page of the FF module.", + ) + account: str = pydantic.Field( + description="The harness account ID.", + min_length=22, + max_length=22, + pattern=r"^[a-zA-Z0-9_\-]+$", + ) + organization: str = pydantic.Field( + description="The harness organization ID.", + ) + project: str = pydantic.Field( + description="The harness project ID.", + ) + + provider: t.Literal["harness"] = "harness" + + _client: t.Optional[CfClient] = None + + @pydantic.field_validator("account", mode="before") + @classmethod + def _validate_account(cls, value: str) -> str: + if not value: + value = os.environ["HARNESS_ACCOUNT_ID"] + return value + + @pydantic.field_validator("organization", mode="before") + @classmethod + def _validate_organization(cls, value: str) -> str: + if not value: + value = os.environ["HARNESS_ORG_ID"] + return value + + @pydantic.field_validator("project", mode="before") + @classmethod + def _validate_project(cls, value: str) -> str: + if not value: + value = os.environ["HARNESS_PROJECT_ID"] + return value + + @pydantic.model_validator(mode="after") + def _configure(self): + """Configure the harness FF logger to only show errors. Its too verbose otherwise.""" + _ff_logger.setLevel(logging.ERROR) + return self + + def _get_client(self) -> CfClient: + """Get the client and cache it in the instance.""" + if self._client is not None: + return self._client + self._client = CfClient( + sdk_key=str(self.sdk_key), config=Config( enable_stream=False, enable_analytics=False, cache=_HarnessCache() ), ) - client.wait_for_initialization() - return client - - client = Promise(lambda: asyncio.to_thread(_get_client)) + self._client.wait_for_initialization() + return self._client - def drop(ident: str) -> str: + def drop(self, ident: str) -> str: + """Drop a feature flag.""" logger.info(f"Deleting feature flag {ident}") requests.delete( f"https://app.harness.io/gateway/cf/admin/features/{ident}", - headers={"x-api-key": api_key}, + headers={"x-api-key": self.api_key}, params={ - "accountIdentifier": account, - "orgIdentifier": organization, - "projectIdentifier": project, + "accountIdentifier": self.account, + "orgIdentifier": self.organization, + "projectIdentifier": self.project, "forceDelete": True, }, ) return ident - def create(ident: str, name: str) -> str: + def create(self, ident: str, name: str) -> str: + """Create a feature flag.""" logger.info(f"Creating feature flag {ident}") try: requests.post( "https://app.harness.io/gateway/cf/admin/features", - params={"accountIdentifier": account, "orgIdentifier": organization}, - headers={"Content-Type": "application/json", "x-api-key": api_key}, + params={ + "accountIdentifier": self.account, + "orgIdentifier": self.organization, + }, + headers={"Content-Type": "application/json", "x-api-key": self.api_key}, json={ "defaultOnVariation": "on-variation", "defaultOffVariation": "off-variation", @@ -91,7 +138,7 @@ def create(ident: str, name: str) -> str: "name": name, "kind": FeatureConfigKind.BOOLEAN.value, "permanent": True, - "project": project, + "project": self.project, "variations": [ {"identifier": "on-variation", "value": "true"}, {"identifier": "off-variation", "value": "false"}, @@ -102,8 +149,9 @@ def create(ident: str, name: str) -> str: logger.exception(f"Failed to create feature flag {ident}") return ident - def _processor(source: "DltSource") -> "DltSource": - nonlocal client + def apply_source(self, source: DltSource) -> DltSource: + """Apply the feature flags to a dlt source.""" + client = Promise(lambda: asyncio.to_thread(self._get_client)) workspace = context.active_project.get() if isinstance(client, Promise): client = client.unwrap() @@ -111,12 +159,11 @@ def _processor(source: "DltSource") -> "DltSource": client._repository.cache.clear() client._polling_processor.retrieve_flags_and_segments() cache = client._repository.cache - ns = f"pipeline__{workspace.name}__{source.name}" - tpe = ThreadPoolExecutor(thread_name_prefix="harness-ff") + namespace = f"pipeline__{workspace.name}__{source.name}" def get_resource_id(r: str) -> str: - return f"{ns}__{r}" + return f"{namespace}__{r}" resource_lookup = { get_resource_id(key): resource for key, resource in source.resources.items() @@ -126,7 +173,7 @@ def get_resource_id(r: str) -> str: current_flags = set( filter( - lambda f: f.startswith(ns), + lambda f: f.startswith(namespace), map(lambda f: f.split("/", 1)[1], cache.keys()), ) ) @@ -135,9 +182,9 @@ def get_resource_id(r: str) -> str: added = selected_resources.difference(current_flags) if os.getenv("HARNESS_FF_AUTORECONCILE", "0") == "1": - list(tpe.map(drop, removed)) + list(tpe.map(self.drop, removed)) for f in tpe.map( - create, + self.create, added, [ f"Extract {source.name.title()} {resource_lookup[f].name.title()}" @@ -150,7 +197,5 @@ def get_resource_id(r: str) -> str: return source - return _processor - -__all__ = ["create_harness_provider"] +__all__ = ["HarnessFlagProvider"] diff --git a/src/cdf/core/feature_flag/launchdarkly.py b/src/cdf/core/feature_flag/launchdarkly.py index e855a60..ee4b93c 100644 --- a/src/cdf/core/feature_flag/launchdarkly.py +++ b/src/cdf/core/feature_flag/launchdarkly.py @@ -2,12 +2,20 @@ import typing as t -from dlt.common.configuration import with_config +from dlt.sources import DltSource +from cdf.core.feature_flag.base import BaseFlagProvider -@with_config(sections=("feature_flags", "options")) -def create_launchdarkly_provider(**_: t.Any): - raise NotImplementedError("LaunchDarkly feature flags are not yet supported") +class LaunchDarklyFlagProvider(BaseFlagProvider, extra="allow"): + """LaunchDarkly feature flag provider.""" -__all__ = ["create_launchdarkly_provider"] + sdk_key: str + + provider: t.Literal["launchdarkly"] = "launchdarkly" + + def apply_source(self, source: DltSource) -> DltSource: + raise NotImplementedError("LaunchDarkly feature flags are not yet supported") + + +__all__ = ["LaunchDarklyFlagProvider"] diff --git a/src/cdf/core/feature_flag/noop.py b/src/cdf/core/feature_flag/noop.py new file mode 100644 index 0000000..6294f50 --- /dev/null +++ b/src/cdf/core/feature_flag/noop.py @@ -0,0 +1,19 @@ +"""No-op feature flag provider.""" + +import typing as t + +from dlt.sources import DltSource + +from cdf.core.feature_flag.base import BaseFlagProvider + + +class NoopFlagProvider(BaseFlagProvider, extra="allow"): + """LaunchDarkly feature flag provider.""" + + provider: t.Literal["noop"] = "noop" + + def apply_source(self, source: DltSource) -> DltSource: + return source + + +__all__ = ["NoopFlagProvider"] diff --git a/src/cdf/core/project.py b/src/cdf/core/project.py index 3a0b0d5..6ef636e 100644 --- a/src/cdf/core/project.py +++ b/src/cdf/core/project.py @@ -13,7 +13,7 @@ import cdf.core.logger as logger from cdf.core.configuration import load_config -from cdf.core.feature_flag import SupportsFFs, load_feature_flag_provider +from cdf.core.feature_flag import FlagProvider, load_feature_flag_provider from cdf.core.filesystem import load_filesystem_provider from cdf.core.specification import ( CoreSpecification, @@ -95,7 +95,7 @@ def root(self) -> Path: return self.configuration.maps[0]._root_path @cached_property - def feature_flag_provider(self) -> SupportsFFs: + def feature_flag_provider(self) -> FlagProvider: """The feature flag provider.""" try: ff = self.configuration["feature_flags"] @@ -104,7 +104,7 @@ def feature_flag_provider(self) -> SupportsFFs: return load_feature_flag_provider("noop") options = ff.setdefault("options", {}) if ff.provider == "file": - options.fs = self.filesystem + options.storage = self.filesystem return load_feature_flag_provider(ff.provider, options=options.to_dict()) @cached_property diff --git a/src/cdf/core/runtime/pipeline.py b/src/cdf/core/runtime/pipeline.py index a6686de..048f094 100644 --- a/src/cdf/core/runtime/pipeline.py +++ b/src/cdf/core/runtime/pipeline.py @@ -167,7 +167,7 @@ def _apply_filters( else: active_project = context.active_project.get() for i, source in enumerate(sources): - sources[i] = active_project.feature_flag_provider(source) + sources[i] = active_project.feature_flag_provider.apply_source(source) if runtime_context.exclude: for i, source in enumerate(sources):