-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat(ingest/ldap)fix list index out of range error #8525
Changes from 5 commits
6829e99
74f3283
810eed4
5780b23
9f6525c
b786939
4eae919
3690008
bb51fd2
03c5371
1ad0456
867baa5
2901f7d
048f660
189b0d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,9 +4,11 @@ | |
|
||
import ldap | ||
from ldap.controls import SimplePagedResultsControl | ||
from pydantic.class_validators import root_validator | ||
from pydantic.fields import Field | ||
|
||
from datahub.configuration.common import ConfigurationError | ||
from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated | ||
from datahub.configuration.source_common import DatasetSourceConfigMixin | ||
from datahub.ingestion.api.common import PipelineContext | ||
from datahub.ingestion.api.decorators import ( | ||
|
@@ -16,7 +18,7 @@ | |
support_status, | ||
) | ||
from datahub.ingestion.api.source import MetadataWorkUnitProcessor | ||
from datahub.ingestion.api.workunit import MetadataWorkUnit | ||
from datahub.ingestion.api.workunit import MetadataWorkUnit, logger | ||
from datahub.ingestion.source.state.stale_entity_removal_handler import ( | ||
StaleEntityRemovalHandler, | ||
StaleEntityRemovalSourceReport, | ||
|
@@ -135,13 +137,41 @@ class LDAPSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): | |
|
||
manager_pagination_enabled: bool = Field( | ||
default=True, | ||
description="Use pagination while search for managers (enabled by default).", | ||
description="[deprecated] Use pagination_enabled ", | ||
) | ||
_deprecate_manager_pagination_enabled = pydantic_field_deprecated( | ||
"manager_pagination_enabled" | ||
) | ||
pagination_enabled: bool = Field( | ||
default=True, | ||
description="Use pagination while do search query (enabled by default).", | ||
) | ||
|
||
# default mapping for attrs | ||
user_attrs_map: Dict[str, Any] = {} | ||
group_attrs_map: Dict[str, Any] = {} | ||
|
||
# pre = True because we want to take some decision before pydantic initialize the configuration to default values | ||
@root_validator(pre=True) | ||
def pagination_backward_compatibility(cls, values: Dict) -> Dict: | ||
manager_pagination_enabled = values.get("manager_pagination_enabled") | ||
pagination_enabled = values.get("pagination_enabled") | ||
if pagination_enabled is None and manager_pagination_enabled: | ||
logger.warning( | ||
"pagination_enabled is not set but manager_pagination_enabled is set. manager_pagination_enabled is " | ||
"deprecated, please use pagination_enabled instead." | ||
) | ||
logger.info( | ||
"Initializing pagination_enabled from manager_pagination_enabled" | ||
) | ||
values["pagination_enabled"] = manager_pagination_enabled | ||
elif manager_pagination_enabled and pagination_enabled: | ||
raise ValueError( | ||
"manager_pagination_enabled is deprecated. Please use pagination_enabled only." | ||
) | ||
|
||
return values | ||
|
||
|
||
@dataclasses.dataclass | ||
class LDAPSourceReport(StaleEntityRemovalSourceReport): | ||
|
@@ -218,7 +248,10 @@ def __init__(self, ctx: PipelineContext, config: LDAPSourceConfig): | |
except ldap.LDAPError as e: | ||
raise ConfigurationError("LDAP connection failed") from e | ||
|
||
self.lc = create_controls(self.config.page_size) | ||
if self.config.pagination_enabled: | ||
self.lc = create_controls(self.config.page_size) | ||
else: | ||
self.lc = None | ||
|
||
@classmethod | ||
def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "LDAPSource": | ||
|
@@ -244,7 +277,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: | |
ldap.SCOPE_SUBTREE, | ||
self.config.filter, | ||
self.config.attrs_list, | ||
serverctrls=[self.lc], | ||
serverctrls=[self.lc] if self.lc else [], | ||
) | ||
_rtype, rdata, _rmsgid, serverctrls = self.ldap_client.result3(msgid) | ||
except ldap.LDAPError as e: | ||
|
@@ -278,15 +311,17 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: | |
else: | ||
self.report.report_dropped(dn) | ||
|
||
pctrls = get_pctrls(serverctrls) | ||
if not pctrls: | ||
self.report.report_failure( | ||
"ldap-control", "Server ignores RFC 2696 control." | ||
) | ||
if serverctrls: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the source report failure if serverctrls is not truthy ? When is serverctls not truthy ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No, only if enabled
In some scenarios, clients might want to check if the server supports specific controls without actually using them. To do this, they can send an empty server control in the request and observe the server's response. If the server supports the control, it will be included in the response. Changed on |
||
pctrls = get_pctrls(serverctrls) | ||
if not pctrls: | ||
self.report.report_failure( | ||
"ldap-control", "Server ignores RFC 2696 control." | ||
) | ||
break | ||
cookie = set_cookie(self.lc, pctrls) | ||
else: | ||
break | ||
|
||
cookie = set_cookie(self.lc, pctrls) | ||
|
||
def handle_user(self, dn: str, attrs: Dict[str, Any]) -> Iterable[MetadataWorkUnit]: | ||
""" | ||
Handle a DN and attributes by adding manager info and constructing a | ||
|
@@ -300,15 +335,11 @@ def handle_user(self, dn: str, attrs: Dict[str, Any]) -> Iterable[MetadataWorkUn | |
manager_filter = self.config.filter | ||
else: | ||
manager_filter = None | ||
if self.config.manager_pagination_enabled: | ||
ctrls = [self.lc] | ||
else: | ||
ctrls = None | ||
manager_msgid = self.ldap_client.search_ext( | ||
m_cn, | ||
ldap.SCOPE_BASE, | ||
manager_filter, | ||
serverctrls=ctrls, | ||
serverctrls=[self.lc], | ||
alplatonov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
result = self.ldap_client.result3(manager_msgid) | ||
if result[1]: | ||
|
@@ -351,36 +382,39 @@ def build_corp_user_mce( | |
last_name = attrs[self.config.user_attrs_map["lastName"]][0].decode() | ||
groups = parse_groups(attrs, self.config.user_attrs_map["memberOf"]) | ||
|
||
email = ( | ||
(attrs[self.config.user_attrs_map["email"]][0]).decode() | ||
if self.config.user_attrs_map["email"] in attrs | ||
else ldap_user | ||
) | ||
display_name = ( | ||
(attrs[self.config.user_attrs_map["displayName"]][0]).decode() | ||
if self.config.user_attrs_map["displayName"] in attrs | ||
else full_name | ||
) | ||
department_id = ( | ||
int(attrs[self.config.user_attrs_map["departmentId"]][0].decode()) | ||
if self.config.user_attrs_map["departmentId"] in attrs | ||
else None | ||
) | ||
department_name = ( | ||
(attrs[self.config.user_attrs_map["departmentName"]][0]).decode() | ||
if self.config.user_attrs_map["departmentName"] in attrs | ||
else None | ||
) | ||
country_code = ( | ||
(attrs[self.config.user_attrs_map["countryCode"]][0]).decode() | ||
if self.config.user_attrs_map["countryCode"] in attrs | ||
else None | ||
) | ||
title = ( | ||
attrs[self.config.user_attrs_map["title"]][0].decode() | ||
if self.config.user_attrs_map["title"] in attrs | ||
else None | ||
) | ||
if attrs.get(self.config.user_attrs_map["email"]): | ||
email = (attrs[self.config.user_attrs_map["email"]][0]).decode() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this logic be extracted into a method and use the method - say -
also is it assumed that |
||
else: | ||
email = ldap_user | ||
if attrs.get(self.config.user_attrs_map["displayName"]): | ||
display_name = ( | ||
attrs[self.config.user_attrs_map["displayName"]][0] | ||
).decode() | ||
else: | ||
display_name = full_name | ||
if attrs.get(self.config.user_attrs_map["departmentId"]): | ||
department_id = ( | ||
attrs[self.config.user_attrs_map["departmentId"]][0] | ||
).decode() | ||
else: | ||
department_id = None | ||
if attrs.get(self.config.user_attrs_map["departmentName"]): | ||
department_name = ( | ||
attrs[self.config.user_attrs_map["departmentName"]][0] | ||
).decode() | ||
else: | ||
department_name = None | ||
if attrs.get(self.config.user_attrs_map["countryCode"]): | ||
country_code = ( | ||
attrs[self.config.user_attrs_map["countryCode"]][0] | ||
).decode() | ||
else: | ||
country_code = None | ||
if attrs.get(self.config.user_attrs_map["title"]): | ||
title = (attrs[self.config.user_attrs_map["title"]][0]).decode() | ||
else: | ||
title = None | ||
|
||
custom_props_map = {} | ||
if self.config.custom_props_list: | ||
for prop in self.config.custom_props_list: | ||
|
@@ -420,21 +454,24 @@ def build_corp_group_mce(self, attrs: dict) -> Optional[MetadataChangeEvent]: | |
full_name = cn[0].decode() | ||
admins = parse_users(attrs, self.config.group_attrs_map["admins"]) | ||
members = parse_users(attrs, self.config.group_attrs_map["members"]) | ||
email = ( | ||
attrs[self.config.group_attrs_map["email"]][0].decode() | ||
if self.config.group_attrs_map["email"] in attrs | ||
else full_name | ||
) | ||
description = ( | ||
attrs[self.config.group_attrs_map["description"]][0].decode() | ||
if self.config.group_attrs_map["description"] in attrs | ||
else None | ||
) | ||
displayName = ( | ||
attrs[self.config.group_attrs_map["displayName"]][0].decode() | ||
if self.config.group_attrs_map["displayName"] in attrs | ||
else None | ||
) | ||
|
||
if attrs.get(self.config.group_attrs_map["email"]): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment as above |
||
email = attrs[self.config.group_attrs_map["email"]][0].decode() | ||
else: | ||
email = full_name | ||
if attrs.get(self.config.group_attrs_map["description"]): | ||
description = attrs[self.config.group_attrs_map["description"]][ | ||
0 | ||
].decode() | ||
else: | ||
description = None | ||
if attrs.get(self.config.group_attrs_map["displayName"]): | ||
displayName = attrs[self.config.group_attrs_map["displayName"]][ | ||
0 | ||
].decode() | ||
else: | ||
displayName = None | ||
|
||
group_snapshot = CorpGroupSnapshotClass( | ||
urn=f"urn:li:corpGroup:{full_name}", | ||
aspects=[ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this required ? There is also manager_filter_enabled. Does it also need renaming ?
Suggestion : Use pydantic_renamed_field instead. No need to write your own validator in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LDAP service like database, we can get error "SIZELIMIT_EXCEEDED: {'desc': 'Size limit exceeded'}"
This error tells us that we reached the limit of search result entries (1000 by default). There are 2 ways of resolving this error:
Also, LDAP DATABASE sometimes doesn't support pagination at all. For example, google workspace LDAP doesn't support it -> so we need the option to turn off pagination completely, or we get an error.
Sometimes finding a person manager provides unexpected data and can be reduced with pagination.
No, this option is responsible for additional manager discovery. We want that person's head will be founded or not.
Thx, I change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. My question was around - why this rename is required - manager_pagination_enabled->pagination_enabled. Does this rename also warrant a rename in manager_filter_enabled->filter_enabled ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the old code/way, we always setup page control with line
self.lc = create_controls(self.config.page_size)
.Sometimes LDAP provider doesn't support pagination. That's why i need an option – enable pagination or not in new code.
Now I have a decision:
pagination_enabled
andsave manager_pagination_enabled
manager_pagination_enabled
topagination_enabled
I choose option 2) because I don't see a situation where we need pagination for manager discovery, and at the same time, we need a global user/group pagination. If we have a problem and pagination is needed for the user/group or head, we can enable or disable it globally. The user will need clarification, as I am, with the name manager_pagination_enabled and global enable/disable pagination.