Skip to content

Commit

Permalink
Feat(ingest/ldap)fix list index out of range error (#8525)
Browse files Browse the repository at this point in the history
Co-authored-by: Mayuri Nehate <[email protected]>
  • Loading branch information
2 people authored and yoonhyejin committed Aug 24, 2023
1 parent 2a79b1e commit 580329a
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 57 deletions.
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
### Potential Downtime

### Deprecations
- #8525: In LDAP ingestor, the `manager_pagination_enabled` changed to general `pagination_enabled`

### Other Notable Changes

Expand Down
106 changes: 53 additions & 53 deletions metadata-ingestion/src/datahub/ingestion/source/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from datahub.configuration.common import ConfigurationError
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand Down Expand Up @@ -135,7 +136,14 @@ class LDAPSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):

manager_pagination_enabled: bool = Field(
default=True,
description="Use pagination while search for managers (enabled by default).",
description="[deprecated] Use pagination_enabled ",
)
_deprecate_manager_pagination_enabled = pydantic_renamed_field(
"manager_pagination_enabled", "pagination_enabled"
)
pagination_enabled: bool = Field(
default=True,
description="Use pagination while do search query (enabled by default).",
)

# default mapping for attrs
Expand Down Expand Up @@ -218,7 +226,10 @@ def __init__(self, ctx: PipelineContext, config: LDAPSourceConfig):
except ldap.LDAPError as e:
raise ConfigurationError("LDAP connection failed") from e

self.lc = create_controls(self.config.page_size)
if self.config.pagination_enabled:
self.lc = create_controls(self.config.page_size)
else:
self.lc = None

@classmethod
def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "LDAPSource":
Expand All @@ -244,7 +255,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
ldap.SCOPE_SUBTREE,
self.config.filter,
self.config.attrs_list,
serverctrls=[self.lc],
serverctrls=[self.lc] if self.lc else [],
)
_rtype, rdata, _rmsgid, serverctrls = self.ldap_client.result3(msgid)
except ldap.LDAPError as e:
Expand Down Expand Up @@ -278,15 +289,17 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
else:
self.report.report_dropped(dn)

pctrls = get_pctrls(serverctrls)
if not pctrls:
self.report.report_failure(
"ldap-control", "Server ignores RFC 2696 control."
)
if self.lc:
pctrls = get_pctrls(serverctrls)
if not pctrls:
self.report.report_failure(
"ldap-control", "Server ignores RFC 2696 control."
)
break
cookie = set_cookie(self.lc, pctrls)
else:
break

cookie = set_cookie(self.lc, pctrls)

def handle_user(self, dn: str, attrs: Dict[str, Any]) -> Iterable[MetadataWorkUnit]:
"""
Handle a DN and attributes by adding manager info and constructing a
Expand All @@ -300,15 +313,11 @@ def handle_user(self, dn: str, attrs: Dict[str, Any]) -> Iterable[MetadataWorkUn
manager_filter = self.config.filter
else:
manager_filter = None
if self.config.manager_pagination_enabled:
ctrls = [self.lc]
else:
ctrls = None
manager_msgid = self.ldap_client.search_ext(
m_cn,
ldap.SCOPE_BASE,
manager_filter,
serverctrls=ctrls,
serverctrls=[self.lc] if self.lc else [],
)
result = self.ldap_client.result3(manager_msgid)
if result[1]:
Expand Down Expand Up @@ -351,36 +360,25 @@ def build_corp_user_mce(
last_name = attrs[self.config.user_attrs_map["lastName"]][0].decode()
groups = parse_groups(attrs, self.config.user_attrs_map["memberOf"])

email = (
(attrs[self.config.user_attrs_map["email"]][0]).decode()
if self.config.user_attrs_map["email"] in attrs
else ldap_user
email = get_attr_or_none(attrs, self.config.user_attrs_map["email"], ldap_user)
display_name = get_attr_or_none(
attrs, self.config.user_attrs_map["displayName"], full_name
)
display_name = (
(attrs[self.config.user_attrs_map["displayName"]][0]).decode()
if self.config.user_attrs_map["displayName"] in attrs
else full_name
title = get_attr_or_none(attrs, self.config.user_attrs_map["title"])
department_id_str = get_attr_or_none(
attrs, self.config.user_attrs_map["departmentId"]
)
department_id = (
int(attrs[self.config.user_attrs_map["departmentId"]][0].decode())
if self.config.user_attrs_map["departmentId"] in attrs
else None
department_name = get_attr_or_none(
attrs, self.config.user_attrs_map["departmentName"]
)
department_name = (
(attrs[self.config.user_attrs_map["departmentName"]][0]).decode()
if self.config.user_attrs_map["departmentName"] in attrs
else None
)
country_code = (
(attrs[self.config.user_attrs_map["countryCode"]][0]).decode()
if self.config.user_attrs_map["countryCode"] in attrs
else None
)
title = (
attrs[self.config.user_attrs_map["title"]][0].decode()
if self.config.user_attrs_map["title"] in attrs
else None
country_code = get_attr_or_none(
attrs, self.config.user_attrs_map["countryCode"]
)
if department_id_str:
department_id = int(department_id_str)
else:
department_id = None

custom_props_map = {}
if self.config.custom_props_list:
for prop in self.config.custom_props_list:
Expand Down Expand Up @@ -420,21 +418,17 @@ def build_corp_group_mce(self, attrs: dict) -> Optional[MetadataChangeEvent]:
full_name = cn[0].decode()
admins = parse_users(attrs, self.config.group_attrs_map["admins"])
members = parse_users(attrs, self.config.group_attrs_map["members"])
email = (
attrs[self.config.group_attrs_map["email"]][0].decode()
if self.config.group_attrs_map["email"] in attrs
else full_name

email = get_attr_or_none(
attrs, self.config.group_attrs_map["email"], full_name
)
description = (
attrs[self.config.group_attrs_map["description"]][0].decode()
if self.config.group_attrs_map["description"] in attrs
else None
description = get_attr_or_none(
attrs, self.config.group_attrs_map["description"]
)
displayName = (
attrs[self.config.group_attrs_map["displayName"]][0].decode()
if self.config.group_attrs_map["displayName"] in attrs
else None
displayName = get_attr_or_none(
attrs, self.config.group_attrs_map["displayName"]
)

group_snapshot = CorpGroupSnapshotClass(
urn=f"urn:li:corpGroup:{full_name}",
aspects=[
Expand Down Expand Up @@ -490,3 +484,9 @@ def parse_ldap_dn(input_clean: bytes) -> str:
return ldap.dn.str2dn(input_clean, flags=ldap.DN_FORMAT_LDAPV3)[0][0][1]
else:
return input_clean.decode()


def get_attr_or_none(
attrs: Dict[str, Any], key: str, default: Optional[str] = None
) -> str:
return attrs[key][0].decode() if attrs.get(key) else default
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
Expand All @@ -83,8 +84,9 @@
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/tests/integration/ldap/test_ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_ldap_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
# The openldap container loads the sample data after exposing the port publicly. As such,
# we must wait a little bit extra to ensure that the sample data is loaded.
wait_for_port(docker_services, "openldap", 389)
# without this ldap server can provide empty results
time.sleep(5)

pipeline = Pipeline.create(
Expand Down Expand Up @@ -63,6 +64,7 @@ def test_ldap_memberof_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
# The openldap container loads the sample data after exposing the port publicly. As such,
# we must wait a little bit extra to ensure that the sample data is loaded.
wait_for_port(docker_services, "openldap", 389)
# without this ldap server can provide empty results
time.sleep(5)

pipeline = Pipeline.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def ldap_ingest_common(
# The openldap container loads the sample data after exposing the port publicly. As such,
# we must wait a little bit extra to ensure that the sample data is loaded.
wait_for_port(docker_services, "openldap", 389)
# without this ldap server can provide empty results
time.sleep(5)

with mock.patch(
Expand Down

0 comments on commit 580329a

Please sign in to comment.