Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Octet-stream, cluster last-modified #42

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions deker_server_adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def hash_ring(self) -> HashRing:
"""Return HashRing instance."""
hash_ring = self.ctx.extra.get("hash_ring")
if not hash_ring:
raise AttributeError("Attempt to use cluster logic in single server mode")
msg = "Attempt to use cluster logic in single server mode"
raise AttributeError(msg)
return hash_ring # type: ignore[attr-defined]

@property
Expand Down Expand Up @@ -173,7 +174,7 @@ def read_meta(self, array: "BaseArray") -> ArrayMeta:

# Only Varray can be located on different nodes yet.
if self.type == ArrayType.varray and self.client.cluster_mode:
response = make_request(url=url, nodes=self.nodes_urls, client=self.client)
response = make_request(url=url, nodes=self.nodes_urls, client=self.client, retry_on_hash_failure=True)
elif self.client.cluster_mode:
response = request_in_cluster(url, array, self.ctx, True)
else:
Expand Down Expand Up @@ -278,17 +279,19 @@ def update(self, array: "BaseArray", bounds: Slice, data: Numeric) -> None:
"""
bounds = slice_converter[bounds]
url = f"{self.path_stripped}/{self.type.name}/by-id/{array.id}/subset/{bounds}/data"
request_kwargs = {"json": data}
if hasattr(data, "tolist"):
request_kwargs["json"] = data.tolist()
request_kwargs = {"headers": {"Content-Type": "application/octet-stream"}}
if hasattr(data, "tobytes"):
request_kwargs["data"] = data.tobytes()
else:
request_kwargs["data"] = bytes(data) # type: ignore
# We write (v)array through the node it belongs in cluster
try:
if self.client.cluster_mode:
response = request_in_cluster(
url, array, self.ctx, should_check_status=True, method="PUT", request_kwargs=request_kwargs
)
else:
response = self.client.put(f"{self.collection_host}{url}", **request_kwargs)
response = self.client.put(f"{self.collection_host}{url}", **request_kwargs) # type: ignore
except TimeoutException:
raise DekerTimeoutServer(
message=f"Timeout on {self.type.name} update {array}",
Expand Down Expand Up @@ -373,12 +376,12 @@ def get_by_primary_attributes(
return None
return self.__create_array_from_response(
response,
dict(
type=self.type,
collection=collection,
array_adapter=array_adapter,
varray_adapter=varray_adapter,
),
{
"type": self.type,
"collection": collection,
"array_adapter": array_adapter,
"varray_adapter": varray_adapter,
},
)

def get_by_id(
Expand Down Expand Up @@ -425,12 +428,12 @@ def get_by_id(

return self.__create_array_from_response(
response,
dict(
type=self.type,
collection=collection,
array_adapter=array_adapter,
varray_adapter=varray_adapter,
),
{
"type": self.type,
"collection": collection,
"array_adapter": array_adapter,
"varray_adapter": varray_adapter,
},
)

def __iter__(self) -> Generator["ArrayMeta", None, None]:
Expand Down
66 changes: 34 additions & 32 deletions deker_server_adapters/cluster_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from deker.ctx import CTX
from deker.uri import Uri

from deker_server_adapters.consts import STATUS_OK
from deker_server_adapters.consts import LAST_MODIFIED_HEADER, STATUS_OK
from deker_server_adapters.errors import DekerClusterError, DekerServerError
from deker_server_adapters.hash_ring import HashRing
from deker_server_adapters.utils.requests import make_request
from deker_server_adapters.utils.version import get_api_version


CLUSTER_MODE = "cluster"


Expand Down Expand Up @@ -47,7 +48,7 @@ class ClusterConfig:
leader: Node
current: List[Node]
target: Optional[List[Node]] = None # Only appears when cluster in rebalancing mode

cluster_status: Optional[str] = ""
__hash_ring: HashRing = field(init=False)
__hash_ring_target: HashRing = field(init=False)

Expand Down Expand Up @@ -75,35 +76,14 @@ def process_nodes(nodes: List[dict]) -> List[Node]:
current = process_nodes(cluster_config_dict["current"])
target = process_nodes(cluster_config_dict["target"]) if "target" in cluster_config_dict else None

return cls(mode=cluster_config_dict["mode"], leader=leader, current=current, target=target)


def request_config(ctx: CTX) -> dict: # type: ignore[return-value]
"""Request config from server and apply it on context.

:param ctx: App context
"""
httpx_client = ctx.extra["httpx_client"]
url = f"{get_api_version()}/ping"

# If we do healthcheck in cluster
nodes = [*ctx.uri.servers] if ctx.uri.servers else [ctx.uri.raw_url]
response = make_request(url=url, nodes=nodes, client=httpx_client)

if not response or response.status_code != STATUS_OK:
httpx_client.close()
raise DekerServerError(
response,
"Healthcheck failed. Deker client will be closed.",
return cls(
mode=cluster_config_dict["mode"],
leader=leader,
current=current,
target=target,
cluster_status=cluster_config_dict.get("cluster_status"),
)

try:
config = response.json() # type: ignore[union-attr]
return config
except JSONDecodeError:
if ctx.uri.servers:
raise DekerClusterError(response, "Server responded with wrong config. Couldn't parse json")


def is_config_in_cluster_mode(config: Optional[dict], ctx: CTX) -> bool:
"""Check if mode from config is set to cluster.
Expand Down Expand Up @@ -151,6 +131,28 @@ def request_and_apply_config(ctx: CTX) -> None:

:param ctx: Application context
"""
config_dict = request_config(ctx)
if is_config_in_cluster_mode(config_dict, ctx):
apply_config(config_dict, ctx)
httpx_client = ctx.extra["httpx_client"]
url = f"{get_api_version()}/ping"

# If we do healthcheck in cluster
nodes = [*ctx.uri.servers] if ctx.uri.servers else [ctx.uri.raw_url]
response = make_request(url=url, nodes=nodes, client=httpx_client)

if not response or response.status_code != STATUS_OK:
httpx_client.close()
raise DekerServerError(
response,
"Healthcheck failed. Deker client will be closed.",
)

try:
config = response.json() # type: ignore[union-attr]
if is_config_in_cluster_mode(config, ctx):
# Set hash of config
httpx_client.headers.update({LAST_MODIFIED_HEADER: response.headers[LAST_MODIFIED_HEADER]})
apply_config(config, ctx)
except KeyError:
raise DekerClusterError(response, f"No {LAST_MODIFIED_HEADER} header found in response.")
except JSONDecodeError:
if ctx.uri.servers:
raise DekerClusterError(response, "Server responded with wrong config. Couldn't parse json")
4 changes: 4 additions & 0 deletions deker_server_adapters/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CONTENT_TOO_LARGE = 413
TOO_MANY_REQUESTS = 429
NON_LEADER_WRITE = 421
CONFLICT_HASH = 409

# Name of the parameter in the server response
COLLECTION_NAME_PARAM = "collection_name"
Expand All @@ -19,6 +20,9 @@
TOO_LARGE_ERROR_MESSAGE = "Requested object is too large, use smaller subset"

EXCEPTION_CLASS_PARAM_NAME = "class"
LAST_MODIFIED_HEADER = "last-modified"
REBALANCING_STATUS = "rebalancing"
NORMAL_STATUS = "normal"


class ArrayType(Enum):
Expand Down
4 changes: 4 additions & 0 deletions deker_server_adapters/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,7 @@ class FilteringByIdInClusterIsForbidden(DekerBaseApplicationError):

class HashRingError(DekerBaseApplicationError):
"""If there is a problem with HashRing."""


class InvalidConfigHash(DekerBaseApplicationError):
"""If config was updated."""
3 changes: 2 additions & 1 deletion deker_server_adapters/hash_ring.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def get_node(self, string_key: str) -> T: # type: ignore[type-var]
"""
pos = self.get_node_pos(string_key)
if pos is None:
raise HashRingError(f"Couldn't find a position in {self.ring}")
msg = f"Couldn't find a position in {self.ring}"
raise HashRingError(msg)
return self.ring[self._sorted_keys[pos]]

def get_node_pos(self, string_key: str) -> Optional[int]:
Expand Down
22 changes: 19 additions & 3 deletions deker_server_adapters/httpx_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@

from deker_server_adapters.cluster_config import apply_config
from deker_server_adapters.consts import (
CONFLICT_HASH,
CONTENT_TOO_LARGE,
EXCEPTION_CLASS_PARAM_NAME,
LAST_MODIFIED_HEADER,
NON_LEADER_WRITE,
RATE_ERROR_MESSAGE,
TOO_LARGE_ERROR_MESSAGE,
TOO_MANY_REQUESTS,
)
from deker_server_adapters.errors import DekerBaseRateLimitError, DekerDataPointsLimitError, DekerRateLimitError
from deker_server_adapters.errors import (
DekerBaseRateLimitError,
DekerDataPointsLimitError,
DekerRateLimitError,
InvalidConfigHash,
)


def rate_limit_err(response: Response, message: str, class_: Type[DekerBaseRateLimitError]) -> None:
Expand Down Expand Up @@ -43,13 +50,23 @@ class HttpxClient(Client):
ctx: CTX
cluster_mode: bool = False

def request(self, *args: Any, **kwargs: Any) -> Response:
def request(self, *args: Any, retry_on_hash_failure: bool = False, **kwargs: Any) -> Response:
"""Override httpx method to handle rate errors.

:param args: arguments to request
:param retry_on_hash_failure: If we should retry on invalid hash
:param kwargs: keyword arguments to request
"""
response = super().request(*args, **kwargs)
if response.status_code == CONFLICT_HASH:
apply_config(response.json(), self.ctx)
if LAST_MODIFIED_HEADER in response.headers:
self.headers.update({LAST_MODIFIED_HEADER: response.headers[LAST_MODIFIED_HEADER]})
if retry_on_hash_failure:
response = super().request(*args, **kwargs)
else:
raise InvalidConfigHash

if response.status_code == TOO_MANY_REQUESTS:
rate_limit_err(response=response, message=RATE_ERROR_MESSAGE, class_=DekerRateLimitError)
elif (
Expand All @@ -60,7 +77,6 @@ def request(self, *args: Any, **kwargs: Any) -> Response:

if response.status_code == NON_LEADER_WRITE:
apply_config(response.json(), self.ctx)

return super().request(*args, **kwargs)

return response
49 changes: 33 additions & 16 deletions deker_server_adapters/utils/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

from deker.ABC import BaseArray
from deker.ctx import CTX
from httpx import Client, Response
from httpx import Response

from deker_server_adapters.consts import STATUS_OK
from deker_server_adapters.errors import DekerServerError
from deker_server_adapters.consts import REBALANCING_STATUS, STATUS_OK
from deker_server_adapters.errors import DekerServerError, InvalidConfigHash
from deker_server_adapters.models import Status
from deker_server_adapters.utils.hashing import get_hash_key, get_id_and_primary_attributes
from deker_server_adapters.utils.version import get_api_version
Expand All @@ -22,11 +22,17 @@


def _request(
url: str, node: str, client: Client, method: str = "GET", request_kwargs: Optional[Dict] = None
url: str,
node: str,
client: "HttpxClient",
method: str = "GET",
request_kwargs: Optional[Dict] = None,
retry_on_hash_failure: bool = False,
) -> Optional[Response]:
"""Internal request func - Make GET request on given node.

:param url: What we request
:param retry_on_hash_failure: If we should retry on invalid hash
:param node: Node for requesting
:param client: Httpx Client
:param method: Http method
Expand All @@ -36,7 +42,9 @@ def _request(
request_url = f"{node.rstrip('/')}/{url.lstrip('/')}"
request_kwargs = request_kwargs or {}
try:
response = client.request(method, request_url, **request_kwargs)
response = client.request(method, request_url, **request_kwargs, retry_on_hash_failure=retry_on_hash_failure)
except InvalidConfigHash:
raise
except Exception as e:
traceback.print_exc(-1)
logger.exception(f"Coudn't get response from {node}", exc_info=e) # noqa
Expand All @@ -45,10 +53,16 @@ def _request(


def make_request(
url: str, nodes: Union[List, Tuple, Set], client: Client, method: str = "GET", request_kwargs: Optional[Dict] = None
url: str,
nodes: Union[List, Tuple, Set],
client: "HttpxClient",
method: str = "GET",
request_kwargs: Optional[Dict] = None,
retry_on_hash_failure: bool = False,
) -> Optional[Response]:
"""Make GET request on random node, while response is not received.

:param retry_on_hash_failure: If we should retry on invalid hash
:param method: HTTP Method
:param url: What we request
:param request_kwargs: Kwargs for request
Expand All @@ -59,13 +73,13 @@ def make_request(
nodes = list(nodes)
if len(nodes) == 1:
node = nodes.pop(0)
response = _request(url, node, client, method, request_kwargs)
response = _request(url, node, client, method, request_kwargs, retry_on_hash_failure=retry_on_hash_failure)
else:
while nodes and (response is None or response.status_code != STATUS_OK):
index = randint(0, len(nodes) - 1)
node = nodes.pop(index)

response = _request(url, node, client, method, request_kwargs)
response = _request(url, node, client, method, request_kwargs, retry_on_hash_failure=retry_on_hash_failure)

return response

Expand Down Expand Up @@ -106,21 +120,24 @@ def request_in_cluster(
:param method: Http method
:param request_kwargs: Extra data for request
"""
from deker_server_adapters.cluster_config import request_and_apply_config

# Retrieve fresh config
request_and_apply_config(ctx)
client = ctx.extra["httpx_client"]

node = ctx.extra["hash_ring"].get_node(get_hash_key(array))

# Check status of file
if should_check_status:
status = check_status(ctx, array)
if status == Status.MOVED:
node = ctx.extra["hash_ring_target"].get_node(get_hash_key(array))
def _check_status() -> None:
if should_check_status and ctx.extra["cluster_config"].cluster_status == REBALANCING_STATUS:
status = check_status(ctx, array)
if status == Status.MOVED:
ctx.extra["hash_ring_target"].get_node(get_hash_key(array))

_check_status()
# Acquire locks
# TODO: Lock acquiring logic if needed
# Make request
return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs)
try:
return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs)
except InvalidConfigHash:
_check_status()
return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs)
Loading
Loading