Skip to content

Commit

Permalink
feat(ingest/aerospike): add aerospike ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
dotan-mor committed Nov 12, 2024
1 parent 9025a01 commit 163241c
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/aerospike.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView, Set

import aerospike
import aerospike_helpers
Expand Down Expand Up @@ -70,7 +70,7 @@

logger = logging.getLogger(__name__)

DENY_NAMESPACE_LIST: set[str] = set([])
DENY_NAMESPACE_LIST: Set[str] = set([])


class AuthMode(Enum):
Expand All @@ -83,7 +83,7 @@ class AerospikeConfig(
PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase
):
# See the Aerospike authentication docs for details and examples.
hosts: list[tuple] = Field(
hosts: List[tuple] = Field(
default=[("localhost", 3000)], description="Aerospike hosts list."
)
username: Optional[str] = Field(default=None, description="Aerospike username.")
Expand Down Expand Up @@ -354,11 +354,10 @@ def get_field_type(
return SchemaFieldDataType(type=TypeClass())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
sets_info: str = (
self.aerospike_client.info_random_node("sets")
.removeprefix("sets\t")
.removesuffix(";\n")
)
sets_info: str = self.aerospike_client.info_random_node("sets")
sets_info = sets_info[len("sets\t"):] if sets_info.startswith("sets\t") else sets_info
sets_info = sets_info[:-len(";\n")] if sets_info.endswith(";\n") else sets_info

all_sets: List[AerospikeSet] = [
AerospikeSet(item) for item in sets_info.split(";") if item
]
Expand Down

0 comments on commit 163241c

Please sign in to comment.