Skip to content

Commit

Permalink
address_pr_review rucio#6059
Browse files Browse the repository at this point in the history
  • Loading branch information
panta-123 committed Nov 25, 2024
1 parent 528faff commit 32c50da
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 47 deletions.
53 changes: 25 additions & 28 deletions lib/rucio/core/did_meta_plugins/elasticsearch_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

#
# Author(s):
# - Anil Panta <[email protected]>, 2023

'''
Elasticsearch based metadata plugin.
'''

import datetime
import operator
from typing import TYPE_CHECKING, Any, Optional, Union
from typing import TYPE_CHECKING, Any, Literal, Optional, Union

from elasticsearch import Elasticsearch
from elasticsearch import exceptions as elastic_exceptions
Expand All @@ -48,27 +45,27 @@
class ElasticDidMeta(DidMetaPlugin):
def __init__(
self,
hosts: "Optional[list[str]]" = None,
user: "Optional[str]" = None,
password: "Optional[str]" = None,
index: "Optional[str]" = None,
archive_index: "Optional[str]" = None,
use_ssl: "Optional[bool]" = False,
hosts: Optional[list[str]] = None,
user: Optional[str] = None,
password: Optional[str] = None,
index: Optional[str] = None,
archive_index: Optional[str] = None,
use_ssl: Optional[bool] = False,
verify_certs: bool = True,
ca_certs: "Optional[str]" = None,
client_cert: "Optional[str]" = None,
client_key: "Optional[str]" = None,
ca_certs: Optional[str] = None,
client_cert: Optional[str] = None,
client_key: Optional[str] = None,
request_timeout: int = 100,
max_retries: int = 3,
retry_on_timeout: bool = False
) -> None:
super(ElasticDidMeta, self).__init__()
hosts = hosts or [config.config_get('metadata', 'elastic_service_host')]
hosts = hosts or [config.config_get('metadata', 'elastic_service_hosts')]
user = user or config.config_get('metadata', 'elastic_user', False, None)
password = password or config.config_get('metadata', 'elastic_password', False, None)
self.index = index or config.config_get('metadata', 'meta_index', False, 'rucio_did_meta')
self.archive_index = archive_index or config.config_get('metadata', 'archive_index', False, 'archive_meta')
use_ssl = use_ssl or bool(config.config_get('metadata', 'use_ssl', False, False))
use_ssl = use_ssl or config.config_get_bool('metadata', 'use_ssl', False, False)
ca_certs = ca_certs or config.config_get('metadata', 'ca_certs', False, None)
client_cert = client_cert or config.config_get('metadata', 'client_cert', False, None)
client_key = client_key or config.config_get('metadata', 'client_key', False, None)
Expand Down Expand Up @@ -177,8 +174,8 @@ def set_metadata_bulk(
except exception.DataIdentifierNotFound:
existing_meta = {
'scope': str(scope.external),
'name': str(name),
'vo': str(scope.vo)
'name': name,
'vo': scope.vo
}
for key, value in meta.items():
if key not in IMMUTABLE_KEYS:
Expand Down Expand Up @@ -230,15 +227,15 @@ def list_dids(
self,
scope: "InternalScope",
filters: Union[list[dict[str, Any]], dict[str, Any]],
did_type: str = 'collection',
did_type: Literal['all', 'collection', 'dataset', 'container', 'file'] = 'collection',
ignore_case: bool = False,
limit: "Optional[int]" = None,
offset: "Optional[int]" = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
long: bool = False,
recursive: bool = False,
ignore_dids: "Optional[list]" = None,
ignore_dids: Optional[list] = None,
*,
session: "Optional[Session]" = None
session: Optional[Session] = None
) -> "Iterator[dict[str, Any]]":
"""
List DIDs (Data Identifier).
Expand All @@ -261,7 +258,7 @@ def list_dids(
if not ignore_dids:
ignore_dids = []

# backwards compatability for filters as single {}.
# backwards compatibility for filters as single {}.
if isinstance(filters, dict):
filters = [filters]

Expand All @@ -270,12 +267,12 @@ def list_dids(
elastic_query_str = fe.create_elastic_query(
additional_filters=[
('scope', operator.eq, str(scope.external)),
('vo', operator.eq, str(scope.vo))
('vo', operator.eq, scope.vo)
]
)
pit = self.client.open_point_in_time(index=self.index, keep_alive="2m")
pit_id = pit["id"]
# Base query with point in time(pit) paramter.
# Base query with point in time (pit) paramter.
# sort is needed for search_after, so we use scope sort (random choice)
query = {
"query": elastic_query_str,
Expand Down Expand Up @@ -336,7 +333,7 @@ def on_delete(
scope: "InternalScope",
name: str,
archive: bool = False,
session: "Optional[Session]" = None
session: Optional[Session] = None
) -> None:
"""
Delete a document and optionally archive it.
Expand Down Expand Up @@ -368,7 +365,7 @@ def get_metadata_archived(
self,
scope: "InternalScope",
name: str,
session: "Optional[Session]" = None
session: Optional[Session] = None
) -> None:
"""
Retrieve archived metadata for a given scope and name.
Expand All @@ -393,7 +390,7 @@ def manages_key(
self,
key: str,
*,
session: "Optional[Session]" = None
session: Optional[Session] = None
) -> bool:
return True

Expand Down
12 changes: 6 additions & 6 deletions lib/rucio/core/did_meta_plugins/filter_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,18 +362,18 @@ def create_elastic_query(
additional_filters: Optional["Iterable[FilterTuple]"] = None
) -> dict[str, Any]:
"""
Returns a single elastic query describing the filters expression.
Returns an single elastic query dictionary describing the filters expression.
:param additional_filters: additional filters to be applied to all clauses.
:returns: a elastic query string describing the filters expression.
:returns: a elastic query dictionary describing the filters expression.
"""

additional_filters = additional_filters or []
for or_group in self._filters:
for _filter in additional_filters:
or_group.append(list(_filter)) # type: ignore

should = []
should_clauses = []
for or_group in self._filters:
bool_query = {
"must": [],
Expand Down Expand Up @@ -402,10 +402,10 @@ def create_elastic_query(
elif oper == operator.ne:
bool_query["must_not"].append({"term": {key: value}})

should.append({"bool": bool_query})
should_clauses.append({"bool": bool_query})

q = {"bool": {"should": should}}
return q
query_expression = {"bool": {"should": should_clauses}}
return query_expression

def create_postgres_query(
self,
Expand Down
17 changes: 4 additions & 13 deletions tests/test_did_meta_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,26 +168,17 @@ def test_list_did_meta(self, mock_scope, root_account):
expected = sorted([tmp_dsn1, tmp_dsn4])
assert expected == results

dids = list_dids(mock_scope, {meta_key1: meta_value2})
results = []
for d in dids:
results.append(d)
results = list(list_dids(mock_scope, {meta_key1: meta_value2}))
assert len(results) == 1
# assert [{'scope': (tmp_scope), 'name': str(tmp_dsn2)}] == results
assert [tmp_dsn2] == results

dids = list_dids(mock_scope, {meta_key2: meta_value1})
results = []
for d in dids:
results.append(d)
results = list(list_dids(mock_scope, {meta_key2: meta_value1}))
assert len(results) == 1
# assert [{'scope': (tmp_scope), 'name': tmp_dsn3}] == results
assert [tmp_dsn3] == results

dids = list_dids(mock_scope, {meta_key1: meta_value1, meta_key2: meta_value2})
results = []
for d in dids:
results.append(d)
results = list(list_dids(mock_scope, {meta_key2: meta_value1}))
assert len(results) == 1
# assert [{'scope': (tmp_scope), 'name': tmp_dsn4}] == results
assert [tmp_dsn4] == results
Expand Down Expand Up @@ -285,7 +276,7 @@ def test_list_did_meta(self, mock_scope, root_account, mongo_meta):
# assert [{'scope': (tmp_scope), 'name': tmp_dsn4}] == results
assert [tmp_dsn4] == results

@pytest.fixture()
@pytest.fixture
def elastic_meta():
return ElasticDidMeta(
hosts=['http://elasticsearch_meta:9200'],
Expand Down

0 comments on commit 32c50da

Please sign in to comment.