Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct URL handling for Elasticsearch/OpenSearch #3577

Merged
merged 7 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions lib/pbench/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,47 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)


# List of supported URI schemas and their default ports
SUPPORTED_SCHEMAS = {
"http": 80,
"https": 443,
"postgresql": 5432,
}


def wait_for_uri(uri: str, timeout: int):
"""Wait for the given URI to become available.

While we encounter "connection refused", sleep one second, and then try
again.

Args:
uri : a URL referencing the service to be waited for
timeout : integer number of seconds to wait before giving up
attempts to connect to the URI

Raises:
BadConfig : when the URI does not contain either a host or port
BadConfig : when the URI is missing the host or when the scheme is
missing or unrecognized
ConnectionRefusedError : after the timeout period has been exhausted
"""
url = urlparse(uri)
if not url.hostname:
raise BadConfig("URI must contain a host name")
if not url.port:
raise BadConfig("URI must contain a port number")
if url.scheme not in SUPPORTED_SCHEMAS:
raise BadConfig(
f"URI scheme must be one of {list(SUPPORTED_SCHEMAS)}; found {url.scheme!r}"
)
port = url.port if url.port else SUPPORTED_SCHEMAS[url.scheme]

end = time() + timeout
while True:
try:
# The timeout argument to `create_connection()` does not play into
# the retry logic, see:
#
# https://docs.python.org/3.9/library/socket.html#socket.create_connection
with socket.create_connection((url.hostname, url.port), timeout=1):
with socket.create_connection((url.hostname, port), timeout=1):
dbutenhof marked this conversation as resolved.
Show resolved Hide resolved
break
except ConnectionRefusedError:
if time() > end:
Expand Down
2 changes: 1 addition & 1 deletion lib/pbench/server/api/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __str__(self) -> str:
class SchemaError(APIAbort):
"""Generic base class for errors in processing a JSON schema."""

def __init__(self, http_status: int = HTTPStatus.BAD_REQUEST):
def __init__(self, http_status: int = HTTPStatus.BAD_REQUEST, **_kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of adding an unused kwargs??

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It resolved a lint error, probably for ConversionError.__init__() (I don't remember which function specifically -- there are several with similar issues).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh; this is ugly. OK, so ConversionError is designed with **kwargs to allow callers to pass an http_status through the ConversionError constructor to SchemaError ... but if anyone used other kwargs it wouldn't work, and the explicit declaration here is misleading because SchemaError really doesn't support kwargs. (Pass-through or otherwise.)

I'm curious where you're seeing a lint warning, because I'm not. 😦

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, the http_status is only specified through ConversionError in convert_username and test_exceptions.py ... I don't see any lint annotations. I'm curious exactly what you're seeing, and this workaround bugs me because it shouldn't be necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the main branch, I'm getting Unexpected argument on the super().__init__() invocation in DatasetConversionError.__init__(), but, interestingly, I don't get the complaint for the one in ConversionError.__init__(). 🤷

Copy link
Member Author

@webbnh webbnh Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume it's because the linter knows that (in the code in the main branch), Schema.__init__() accepts only one argument which, in the case of ConversionError.__init__(), could be in **kwargs, whereas in the case of DatasetConversionError.__init__() it can't/shouldn't be, since the http_status argument has already been specified explicitly, there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I guess, because DatasetConversionError passes http_status explicitly and doesn't actually use kwargs. (In fact, it can't, aside from repeating the http_status, because it's not good for anything else.)

I think your linter is broken. But if you really want to quiet its ranting, a better "ugly workaround" would be to add an explicit http_status: Optional[int] = None to ConversionError, pass it on, and drop the kwargs entirely from both ConversionError and DatasetConversionError (which, so far as I can see, is never used anyway... maybe it was at one time).

But again, I don't think there's anything illegal or even unreasonable here, and I think this is just an argument against your linter. 😆

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't really argue against my linter -- it is more or less built-in to my IDE; and arguing with a linter is, as we know, largely pointless. 🙃

At this point, I'm not inclined to disturb the SchemaError (and children) code any further. Given that the Production deployment seems happy, shall I go ahead and merge this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really dislike adding kwargs here, because it makes no sense and is potentially misleading. But, whatever: it'll never be perfect.

super().__init__(http_status=http_status)

def __str__(self) -> str:
Expand Down
83 changes: 41 additions & 42 deletions lib/pbench/server/api/resources/query_apis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import defaultdict
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from http import HTTPStatus
import json
Expand Down Expand Up @@ -77,19 +77,19 @@ def __init__(self, status: int, message: str, data: JSON = None):
class ElasticBase(ApiBase):
"""
A base class for Elasticsearch queries that allows subclasses to provide
custom pre- and post- processing.
custom pre- and post-processing.

This class extends the ApiBase class in order to connect the post
and get methods to Flask's URI routing algorithms. It implements a common
This class extends the ApiBase class in order to connect the POST
and GET methods to Flask's URI routing algorithms. It implements a common
mechanism for calling Elasticsearch and processing errors.

Hooks are defined for subclasses extending this class to "preprocess"
the query, to "assemble" the Elasticsearch request payload from Pbench
server data and the client's JSON payload, and to "postprocess" a
successful response payload from Elasticsearch.

Note that "preprocess" can provide context that's passed to the assemble
and postprocess methods.
Note that `preprocess` can provide context that's passed to the `assemble`
and `postprocess` methods.
"""

def __init__(
Expand Down Expand Up @@ -119,9 +119,11 @@ def __init__(
super().__init__(config, *schemas)
self.prefix = config.get("Indexing", "index_prefix")
self.es_url = config.get("Indexing", "uri")
self.ca_bundle = config.get("Indexing", "ca_bundle")

@staticmethod
def _build_elasticsearch_query(
self, user: Optional[str], access: Optional[str], terms: List[JSON]
user: Optional[str], access: Optional[str], terms: List[JSON]
) -> JSON:
"""
Generate the "query" parameter for an Elasticsearch _search request
Expand Down Expand Up @@ -154,7 +156,7 @@ def _build_elasticsearch_query(
ADMIN, AUTHENTICATED as drb: owner:drb
UNAUTHENTICATED (or non-drb): owner:drb AND access:public

{"user": "drb, "access": "private"}: private drb
{"user": "drb", "access": "private"}: private drb
All datasets owned by "drb" with "private" access

owner:drb AND access:private
Expand Down Expand Up @@ -198,7 +200,7 @@ def _build_elasticsearch_query(
authorized_id = str(authorized_user.id) if authorized_user else None
is_admin = authorized_user.is_admin() if authorized_user else False

filter = terms.copy()
filter_list = terms.copy()
current_app.logger.debug(
"QUERY auth ID {}, user {!r}, access {!r}, admin {}",
authorized_id,
Expand Down Expand Up @@ -259,13 +261,13 @@ def _build_elasticsearch_query(

# We control the order of terms here to allow stable unit testing.
if combo_term:
filter.append(combo_term)
filter_list.append(combo_term)
else:
if access_term:
filter.append(access_term)
filter_list.append(access_term)
if user_term:
filter.append(user_term)
return {"bool": {"filter": filter}}
filter_list.append(user_term)
return {"bool": {"filter": filter_list}}

def _gen_month_range(self, index: str, start: datetime, end: datetime) -> str:
"""
Expand Down Expand Up @@ -333,7 +335,7 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSON:
context: API context dictionary

Raises:
Any errors in the assemble method shall be reported by exceptions
Any errors in the `assemble` method shall be reported by exceptions
which will be logged and will terminate the operation.

Returns:
Expand Down Expand Up @@ -367,7 +369,7 @@ def postprocess(self, es_json: JSON, context: ApiContext) -> JSON:
"""
raise NotImplementedError()

def _call(self, method: Callable, params: ApiParams, context: ApiContext):
def _call(self, method: Callable, params: ApiParams, context: ApiContext) -> JSON:
"""
Perform the requested call to Elasticsearch, and handle any exceptions.

Expand All @@ -377,7 +379,7 @@ def _call(self, method: Callable, params: ApiParams, context: ApiContext):
context: API context dictionary

Returns:
Postprocessed JSON body to return to client
Post-processed JSON body to return to client
"""
klasname = self.__class__.__name__
try:
Expand All @@ -404,7 +406,7 @@ def _call(self, method: Callable, params: ApiParams, context: ApiContext):

try:
# perform the Elasticsearch query
es_response = method(url, **es_request["kwargs"])
es_response = method(url, **es_request["kwargs"], verify=self.ca_bundle)
current_app.logger.debug(
"ES query response {}:{}",
es_response.reason,
Expand Down Expand Up @@ -453,9 +455,7 @@ def _call(self, method: Callable, params: ApiParams, context: ApiContext):
except Exception as e:
raise APIInternalError(f"Unexpected backend exception '{e}'") from e

def _post(
self, params: ApiParams, request: Request, context: ApiContext
) -> Response:
def _post(self, params: ApiParams, request: Request, context: ApiContext) -> JSON:
"""Handle a Pbench server POST operation involving Elasticsearch

The assembly and post-processing of the Elasticsearch query are
Expand All @@ -464,37 +464,33 @@ def _post(
parameter validation and normalization.

Args:
method: The API HTTP method
params: The API HTTP method parameters
request: The flask Request object containing payload and headers
uri_params: URI encoded keyword-arg supplied by the Flask
framework
context: The API context dictionary
"""
context["request"] = request
return self._call(requests.post, params, context)

def _get(
self, params: ApiParams, request: Request, context: ApiContext
) -> Response:
def _get(self, params: ApiParams, request: Request, context: ApiContext) -> JSON:
"""Handle a GET operation involving a call to Elasticsearch

The post-processing of the Elasticsearch query is handled by the
subclasses through their postprocess() methods.

Args:
method: The API HTTP method
params: The API HTTP method parameters
request: The flask Request object containing payload and headers
uri_params: URI encoded keyword-arg supplied by the Flask
framework
context: The API context dictionary
"""
context["request"] = request
return self._call(requests.get, params, context)


@dataclass
class BulkResults:
errors: int
count: int
report: defaultdict
errors: int = 0
count: int = 0
report: defaultdict = field(default_factory=lambda: defaultdict(int))


class ElasticBulkBase(ApiBase):
Expand Down Expand Up @@ -543,6 +539,7 @@ def __init__(
api_name = self.__class__.__name__

self.elastic_uri = config.get("Indexing", "uri")
self.ca_bundle = config.get("Indexing", "ca_bundle")
self.config = config

# Look for a parameter of type DATASET. It may be defined in any of the
Expand All @@ -565,7 +562,8 @@ def __init__(
self.schemas[ApiMethod.POST].authorization == ApiAuthorizationType.DATASET
), f"API {self.__class__.__name__} authorization type must be DATASET"

def expect_index(self, dataset: Dataset) -> bool:
@staticmethod
def expect_index(dataset: Dataset) -> bool:
"""Are we waiting for an index map?

If a dataset doesn't have an index map, and we require one, we need to
Expand Down Expand Up @@ -610,7 +608,7 @@ def generate_actions(
self,
dataset: Dataset,
context: ApiContext,
map: Iterator[IndexStream],
doc_map: Iterator[IndexStream],
) -> Iterator[dict]:
"""Generate a series of Elasticsearch bulk operation actions

Expand All @@ -628,7 +626,7 @@ def generate_actions(
Args:
dataset: The associated Dataset object
context: The operation's ApiContext
map: Elasticsearch index document map generator
doc_map: Elasticsearch index document map generator

Returns:
Sequence of Elasticsearch bulk action dict objects
Expand Down Expand Up @@ -775,13 +773,13 @@ def _delete(
return self._bulk_dispatch(params, request, context)

def _bulk_dispatch(
self, params: ApiParams, request: Request, context: ApiContext
self, params: ApiParams, _request: Request, context: ApiContext
) -> Response:
"""Perform the requested operation, and handle any exceptions.

Args:
params: Type-normalized client parameters
request: Original incoming Request object (not used)
_request: Original incoming Request object (not used)
context: API context

Returns:
Expand Down Expand Up @@ -831,15 +829,15 @@ def _bulk_dispatch(
# indexed and we skip the Elasticsearch actions.
if IndexMap.exists(dataset):
# Build an Elasticsearch instance to manage the bulk update
elastic = Elasticsearch(self.elastic_uri)
map = IndexMap.stream(dataset=dataset)
elastic = Elasticsearch(self.elastic_uri, ca_certs=self.ca_bundle)
doc_map = IndexMap.stream(dataset=dataset)

# NOTE: because both generate_actions and streaming_bulk return
# generators, the entire sequence is inside a single try block.
try:
results = helpers.streaming_bulk(
elastic,
self.generate_actions(dataset, context, map),
self.generate_actions(dataset, context, doc_map),
raise_on_exception=False,
raise_on_error=False,
)
Expand All @@ -857,7 +855,7 @@ def _bulk_dispatch(
f"Operation unavailable: dataset {dataset.resource_id} is not indexed.",
)
else:
report = BulkResults(errors=0, count=0, report={})
report = BulkResults()

summary: JSONOBJECT = {
"ok": report.count - report.errors,
Expand Down Expand Up @@ -916,6 +914,7 @@ def _bulk_dispatch(
"message"
] = f"Unable to {context['attributes'].action} some indexed documents"
raise APIInternalError(
f"Failed to {context['attributes'].action} any of {report.count} Elasticsearch documents: {json.dumps(report.report)}"
f"Failed to {context['attributes'].action} any of {report.count} "
f"Elasticsearch documents: {json.dumps(report.report)}"
)
return jsonify(summary)
27 changes: 14 additions & 13 deletions lib/pbench/server/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,38 +106,39 @@ def _sleep_w_backoff(backoff):
_sleep(_calc_backoff_sleep(backoff))


def _get_es_hosts(config, logger):
"""
Return list of dicts (a single dict for now) - that's what ES is expecting.
def _get_es_hosts(config):
"""Return a list of strings (containing only a single string for now) which
are the network locations (i.e., URIs) for the configured Elasticsearch hosts.
"""
try:
uri = config.get("Indexing", "uri")
except (configparser.NoSectionError, configparser.NoOptionError):
raise BadConfig("Indexing URI missing")

url = urlparse(uri)
if not url.hostname:
raise BadConfig("Indexing URI must contain a host name")
if not url.port:
raise BadConfig("Indexing URI must contain a port number")
timeoutobj = Timeout(total=1200, connect=10, read=_read_timeout)
return [
dict(host=url.hostname, port=url.port, timeout=timeoutobj),
]

return [uri]

def get_es(config, logger):

def get_es(config):
"""Return an Elasticsearch() object derived from the given configuration.
If the configuration does not provide the necessary data, we return None
instead.
"""
hosts = _get_es_hosts(config, logger)
hosts = _get_es_hosts(config)
if hosts is None:
return None

# FIXME: we should just change these two loggers to write to a
# file instead of setting the logging level up so high.
logging.getLogger("urllib3").setLevel(logging.FATAL)
logging.getLogger("elasticsearch1").setLevel(logging.FATAL)
return Elasticsearch(hosts, max_retries=0)

timeoutobj = Timeout(total=1200, connect=10, read=_read_timeout)
ca_bundle = config.get("Indexing", "ca_bundle")
return Elasticsearch(hosts, max_retries=0, timeout=timeoutobj, ca_certs=ca_bundle)


# Always use "create" operations, as we also ensure each JSON document being
Expand Down Expand Up @@ -4100,7 +4101,7 @@ def __init__(self, options, name, config, logger, _dbg=0):
self.getuid = os.getuid
self.TS = self.config.TS

self.es = get_es(self.config, self.logger)
self.es = get_es(self.config)
self.templates = PbenchTemplates(
self.config.LIBDIR,
self.idx_prefix,
Expand Down
2 changes: 1 addition & 1 deletion lib/pbench/server/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(
else:
if es is None:
try:
self.es = get_es(config, self.logger)
self.es = get_es(config)
except Exception:
self.logger.exception(
"Unexpected failure fetching" " Elasticsearch configuration"
Expand Down
Loading