Skip to content

Commit

Permalink
feat: use pydantic for ff providers
Browse files Browse the repository at this point in the history
  • Loading branch information
z3z1ma committed Apr 8, 2024
1 parent 559f158 commit 3e93ea8
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 168 deletions.
122 changes: 28 additions & 94 deletions src/cdf/core/feature_flag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,112 +2,46 @@

import typing as t

import fsspec
import pydantic
from dlt.common.configuration import with_config

import cdf.core.logger as logger
from cdf.core.feature_flag.file import create_file_provider
from cdf.core.feature_flag.harness import create_harness_provider
from cdf.core.feature_flag.launchdarkly import create_launchdarkly_provider
from cdf.core.feature_flag.file import FileFlagProvider
from cdf.core.feature_flag.harness import HarnessFlagProvider
from cdf.core.feature_flag.launchdarkly import LaunchDarklyFlagProvider
from cdf.core.feature_flag.noop import NoopFlagProvider

if t.TYPE_CHECKING:
from dlt.sources import DltSource


# The general interface for a feature flag provider
# TODO: we should decouple the protocol from dlt sources
class SupportsFFs(t.Protocol):
def __call__(self, source: "DltSource") -> "DltSource": ...


def create_noop_provider() -> SupportsFFs:
def _processor(source: "DltSource") -> "DltSource":
return source

return _processor


class NoopProviderOptions(t.TypedDict): ...


@t.overload
def load_feature_flag_provider(
provider: t.Literal["noop"] = "noop",
options: t.Optional[NoopProviderOptions] = None,
) -> SupportsFFs: ...


class FileProviderOptions(t.TypedDict):
path: str
fs: fsspec.AbstractFileSystem


@t.overload
def load_feature_flag_provider(
provider: t.Literal["file"] = "file",
options: t.Optional[FileProviderOptions] = None,
) -> SupportsFFs: ...


class HarnessProviderOptions(t.TypedDict):
api_key: str
sdk_key: str
account: str
organization: str
project: str


@t.overload
def load_feature_flag_provider(
provider: t.Literal["harness"] = "harness",
options: t.Optional[HarnessProviderOptions] = None,
) -> SupportsFFs: ...


class LaunchDarklyProviderOptions(t.TypedDict):
sdk_key: str
FlagProvider = t.Union[
FileFlagProvider,
HarnessFlagProvider,
LaunchDarklyFlagProvider,
NoopFlagProvider,
]


@t.overload
def load_feature_flag_provider(
provider: t.Literal["launchdarkly"] = "launchdarkly",
options: t.Optional[LaunchDarklyProviderOptions] = None,
) -> SupportsFFs: ...
def _ensure_dict(o: t.Any) -> t.Dict[str, t.Any]:
"""Unwraps dynaconf config objects to dict."""
if isinstance(o, dict):
return o
return o.to_dict()


@with_config(sections=("feature_flags",))
def load_feature_flag_provider(
provider: t.Literal["file", "harness", "launchdarkly", "noop"] = "noop",
options: t.Optional[
t.Union[
NoopProviderOptions,
FileProviderOptions,
HarnessProviderOptions,
LaunchDarklyProviderOptions,
]
] = None,
) -> SupportsFFs:
opts = t.cast(dict, options or {})
if provider == "file":
logger.info("Using file-based feature flags")
return create_file_provider(**opts)
if provider == "harness":
logger.info("Using Harness feature flags")
return create_harness_provider(**opts)
if provider == "launchdarkly":
logger.info("Using LaunchDarkly feature flags")
return create_launchdarkly_provider(**opts)
if provider is None or provider == "noop":
logger.info("No feature flag provider configured")
return create_noop_provider(**opts)
raise ValueError(f"Unknown feature flag provider: {provider}")
options: t.Optional[t.Dict[str, t.Any]] = None,
) -> FlagProvider:
options = _ensure_dict(options or {})
options["provider"] = provider
return t.cast(
FlagProvider, pydantic.TypeAdapter(FlagProvider).validate_python(options)
)


__all__ = [
"SupportsFFs",
"create_noop_provider",
"create_file_provider",
"create_harness_provider",
"create_launchdarkly_provider",
"load_feature_flag_provider",
"FlagProvider",
"FileFlagProvider",
"HarnessFlagProvider",
"LaunchDarklyFlagProvider",
"NoopFlagProvider",
]
15 changes: 15 additions & 0 deletions src/cdf/core/feature_flag/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import abc
import typing as t

import pydantic

if t.TYPE_CHECKING:
from dlt.sources import DltSource


class BaseFlagProvider(pydantic.BaseModel, abc.ABC):
provider: str

@abc.abstractmethod
def apply_source(self, source: "DltSource") -> "DltSource":
"""Apply the feature flags to a dlt source."""
41 changes: 21 additions & 20 deletions src/cdf/core/feature_flag/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,43 @@
import typing as t
from threading import Lock

import dlt
import fsspec
from dlt.common.configuration import with_config
import pydantic

import cdf.core.logger as logger
from cdf.core.feature_flag.base import BaseFlagProvider

if t.TYPE_CHECKING:
from dlt.sources import DltSource

from cdf.core.feature_flag import SupportsFFs

WLock = Lock()
class FileFlagProvider(BaseFlagProvider, extra="allow", arbitrary_types_allowed=True):
path: str = pydantic.Field(
description="The path to the file where the feature flags are stored in the configured filesystem."
)
storage: fsspec.AbstractFileSystem = pydantic.Field(
default=fsspec.filesystem("file")
)

provider: t.Literal["file"] = "file"

_lock: Lock = pydantic.PrivateAttr(default_factory=Lock)

@with_config(sections=("feature_flags", "options"))
def create_file_provider(
path: str = dlt.config.value,
fs: fsspec.AbstractFileSystem = fsspec.filesystem("file"),
**_: t.Any,
) -> "SupportsFFs":
def _processor(source: "DltSource") -> "DltSource":
if not fs.exists(path):
def apply_source(self, source: "DltSource") -> "DltSource":
"""Apply the feature flags to a dlt source."""
logger.info("Reading feature flags from %s", self.path)
if not self.storage.exists(self.path):
flags = {}
else:
with fs.open(path) as file:
with self.storage.open(self.path) as file:
flags = json.load(file)

source_name = source.name
for resource_name, resource in source.selected_resources.items():
key = f"{source_name}.{resource_name}"
resource.selected = flags.setdefault(key, False)

with WLock, fs.open(path, "w") as file:
with self._lock, self.storage.open(self.path, "w") as file:
json.dump(flags, file, indent=2)

return source

return _processor


__all__ = ["create_file_provider"]
__all__ = ["FileFlagProvider"]
Loading

0 comments on commit 3e93ea8

Please sign in to comment.