Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(ingest/ldap)fix list index out of range error #8525

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
### Potential Downtime

### Deprecations
- #8525: In LDAP ingestor, the `manager_pagination_enabled` changed to general `pagination_enabled`

### Other Notable Changes

Expand Down
106 changes: 53 additions & 53 deletions metadata-ingestion/src/datahub/ingestion/source/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from datahub.configuration.common import ConfigurationError
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand Down Expand Up @@ -135,7 +136,14 @@ class LDAPSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):

manager_pagination_enabled: bool = Field(
default=True,
description="Use pagination while search for managers (enabled by default).",
description="[deprecated] Use pagination_enabled ",
)
_deprecate_manager_pagination_enabled = pydantic_renamed_field(
"manager_pagination_enabled", "pagination_enabled"
)
pagination_enabled: bool = Field(
default=True,
description="Use pagination while do search query (enabled by default).",
)

# default mapping for attrs
Expand Down Expand Up @@ -218,7 +226,10 @@ def __init__(self, ctx: PipelineContext, config: LDAPSourceConfig):
except ldap.LDAPError as e:
raise ConfigurationError("LDAP connection failed") from e

self.lc = create_controls(self.config.page_size)
if self.config.pagination_enabled:
self.lc = create_controls(self.config.page_size)
else:
self.lc = None

@classmethod
def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "LDAPSource":
Expand All @@ -244,7 +255,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
ldap.SCOPE_SUBTREE,
self.config.filter,
self.config.attrs_list,
serverctrls=[self.lc],
serverctrls=[self.lc] if self.lc else [],
)
_rtype, rdata, _rmsgid, serverctrls = self.ldap_client.result3(msgid)
except ldap.LDAPError as e:
Expand Down Expand Up @@ -278,15 +289,17 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
else:
self.report.report_dropped(dn)

pctrls = get_pctrls(serverctrls)
if not pctrls:
self.report.report_failure(
"ldap-control", "Server ignores RFC 2696 control."
)
if self.lc:
pctrls = get_pctrls(serverctrls)
if not pctrls:
self.report.report_failure(
"ldap-control", "Server ignores RFC 2696 control."
)
break
cookie = set_cookie(self.lc, pctrls)
else:
break

cookie = set_cookie(self.lc, pctrls)

def handle_user(self, dn: str, attrs: Dict[str, Any]) -> Iterable[MetadataWorkUnit]:
"""
Handle a DN and attributes by adding manager info and constructing a
Expand All @@ -300,15 +313,11 @@ def handle_user(self, dn: str, attrs: Dict[str, Any]) -> Iterable[MetadataWorkUn
manager_filter = self.config.filter
else:
manager_filter = None
if self.config.manager_pagination_enabled:
ctrls = [self.lc]
else:
ctrls = None
manager_msgid = self.ldap_client.search_ext(
m_cn,
ldap.SCOPE_BASE,
manager_filter,
serverctrls=ctrls,
serverctrls=[self.lc] if self.lc else [],
)
result = self.ldap_client.result3(manager_msgid)
if result[1]:
Expand Down Expand Up @@ -351,36 +360,25 @@ def build_corp_user_mce(
last_name = attrs[self.config.user_attrs_map["lastName"]][0].decode()
groups = parse_groups(attrs, self.config.user_attrs_map["memberOf"])

email = (
(attrs[self.config.user_attrs_map["email"]][0]).decode()
if self.config.user_attrs_map["email"] in attrs
else ldap_user
email = get_attr_or_none(attrs, self.config.user_attrs_map["email"], ldap_user)
display_name = get_attr_or_none(
attrs, self.config.user_attrs_map["displayName"], full_name
)
display_name = (
(attrs[self.config.user_attrs_map["displayName"]][0]).decode()
if self.config.user_attrs_map["displayName"] in attrs
else full_name
title = get_attr_or_none(attrs, self.config.user_attrs_map["title"])
department_id_str = get_attr_or_none(
attrs, self.config.user_attrs_map["departmentId"]
)
department_id = (
int(attrs[self.config.user_attrs_map["departmentId"]][0].decode())
if self.config.user_attrs_map["departmentId"] in attrs
else None
department_name = get_attr_or_none(
attrs, self.config.user_attrs_map["departmentName"]
)
department_name = (
(attrs[self.config.user_attrs_map["departmentName"]][0]).decode()
if self.config.user_attrs_map["departmentName"] in attrs
else None
)
country_code = (
(attrs[self.config.user_attrs_map["countryCode"]][0]).decode()
if self.config.user_attrs_map["countryCode"] in attrs
else None
)
title = (
attrs[self.config.user_attrs_map["title"]][0].decode()
if self.config.user_attrs_map["title"] in attrs
else None
country_code = get_attr_or_none(
attrs, self.config.user_attrs_map["countryCode"]
)
if department_id_str:
department_id = int(department_id_str)
else:
department_id = None

custom_props_map = {}
if self.config.custom_props_list:
for prop in self.config.custom_props_list:
Expand Down Expand Up @@ -420,21 +418,17 @@ def build_corp_group_mce(self, attrs: dict) -> Optional[MetadataChangeEvent]:
full_name = cn[0].decode()
admins = parse_users(attrs, self.config.group_attrs_map["admins"])
members = parse_users(attrs, self.config.group_attrs_map["members"])
email = (
attrs[self.config.group_attrs_map["email"]][0].decode()
if self.config.group_attrs_map["email"] in attrs
else full_name

email = get_attr_or_none(
attrs, self.config.group_attrs_map["email"], full_name
)
description = (
attrs[self.config.group_attrs_map["description"]][0].decode()
if self.config.group_attrs_map["description"] in attrs
else None
description = get_attr_or_none(
attrs, self.config.group_attrs_map["description"]
)
displayName = (
attrs[self.config.group_attrs_map["displayName"]][0].decode()
if self.config.group_attrs_map["displayName"] in attrs
else None
displayName = get_attr_or_none(
attrs, self.config.group_attrs_map["displayName"]
)

group_snapshot = CorpGroupSnapshotClass(
urn=f"urn:li:corpGroup:{full_name}",
aspects=[
Expand Down Expand Up @@ -490,3 +484,9 @@ def parse_ldap_dn(input_clean: bytes) -> str:
return ldap.dn.str2dn(input_clean, flags=ldap.DN_FORMAT_LDAPV3)[0][0][1]
else:
return input_clean.decode()


def get_attr_or_none(
attrs: Dict[str, Any], key: str, default: Optional[str] = None
) -> str:
return attrs[key][0].decode() if attrs.get(key) else default
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
Expand All @@ -83,8 +84,9 @@
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/tests/integration/ldap/test_ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_ldap_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
# The openldap container loads the sample data after exposing the port publicly. As such,
# we must wait a little bit extra to ensure that the sample data is loaded.
wait_for_port(docker_services, "openldap", 389)
# without this ldap server can provide empty results
time.sleep(5)

pipeline = Pipeline.create(
Expand Down Expand Up @@ -63,6 +64,7 @@ def test_ldap_memberof_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
# The openldap container loads the sample data after exposing the port publicly. As such,
# we must wait a little bit extra to ensure that the sample data is loaded.
wait_for_port(docker_services, "openldap", 389)
# without this ldap server can provide empty results
time.sleep(5)

pipeline = Pipeline.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def ldap_ingest_common(
# The openldap container loads the sample data after exposing the port publicly. As such,
# we must wait a little bit extra to ensure that the sample data is loaded.
wait_for_port(docker_services, "openldap", 389)
# without this ldap server can provide empty results
time.sleep(5)

with mock.patch(
Expand Down
Loading