Skip to content

Commit

Permalink
Merge branch 'master' into docs/ingestion-guide
Browse files Browse the repository at this point in the history
  • Loading branch information
yoonhyejin authored Nov 15, 2023
2 parents 2deb055 + 486e394 commit e10197b
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 15 deletions.
16 changes: 12 additions & 4 deletions datahub-frontend/app/auth/NativeAuthenticationConfigs.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@
public class NativeAuthenticationConfigs {

public static final String NATIVE_AUTHENTICATION_ENABLED_CONFIG_PATH = "auth.native.enabled";
public static final String NATIVE_AUTHENTICATION_ENFORCE_VALID_EMAIL_ENABLED_CONFIG_PATH = "auth.native.signUp.enforceValidEmail";

private Boolean _isEnabled = true;
private Boolean _isEnforceValidEmailEnabled = true;

public NativeAuthenticationConfigs(final com.typesafe.config.Config configs) {
if (configs.hasPath(NATIVE_AUTHENTICATION_ENABLED_CONFIG_PATH)
&& Boolean.FALSE.equals(
Boolean.parseBoolean(configs.getValue(NATIVE_AUTHENTICATION_ENABLED_CONFIG_PATH).toString()))) {
_isEnabled = false;
if (configs.hasPath(NATIVE_AUTHENTICATION_ENABLED_CONFIG_PATH)) {
_isEnabled = Boolean.parseBoolean(configs.getValue(NATIVE_AUTHENTICATION_ENABLED_CONFIG_PATH).toString());
}
if (configs.hasPath(NATIVE_AUTHENTICATION_ENFORCE_VALID_EMAIL_ENABLED_CONFIG_PATH)) {
_isEnforceValidEmailEnabled =
Boolean.parseBoolean(configs.getValue(NATIVE_AUTHENTICATION_ENFORCE_VALID_EMAIL_ENABLED_CONFIG_PATH).toString());
}
}

public boolean isNativeAuthenticationEnabled() {
return _isEnabled;
}

public boolean isEnforceValidEmailEnabled() {
return _isEnforceValidEmailEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.pac4j.play.store.PlaySessionStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import play.data.validation.Constraints;
import play.libs.Json;
import play.mvc.Controller;
import play.mvc.Http;
Expand Down Expand Up @@ -203,6 +204,13 @@ public Result signUp(Http.Request request) {
JsonNode invalidCredsJson = Json.newObject().put("message", "Email must not be empty.");
return Results.badRequest(invalidCredsJson);
}
if (_nativeAuthenticationConfigs.isEnforceValidEmailEnabled()) {
Constraints.EmailValidator emailValidator = new Constraints.EmailValidator();
if (!emailValidator.isValid(email)) {
JsonNode invalidCredsJson = Json.newObject().put("message", "Email must not be empty.");
return Results.badRequest(invalidCredsJson);
}
}

if (StringUtils.isBlank(password)) {
JsonNode invalidCredsJson = Json.newObject().put("message", "Password must not be empty.");
Expand Down
4 changes: 4 additions & 0 deletions datahub-frontend/conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ auth.oidc.preferredJwsAlgorithm = ${?AUTH_OIDC_PREFERRED_JWS_ALGORITHM} # Which
#
auth.jaas.enabled = ${?AUTH_JAAS_ENABLED}
auth.native.enabled = ${?AUTH_NATIVE_ENABLED}

# Enforces the usage of a valid email for user sign up
auth.native.signUp.enforceValidEmail = true
auth.native.signUp.enforceValidEmail = ${?ENFORCE_VALID_EMAIL}
#
# To disable all authentication to the app, and proxy all users through a master "datahub" account, make sure that,
# jaas, native and oidc auth are disabled:
Expand Down
1 change: 1 addition & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ In order to use this example, you must first configure the Datahub hook. Like in
If you're not seeing lineage in DataHub, check the following:

- Validate that the plugin is loaded in Airflow. Go to Admin -> Plugins and check that the DataHub plugin is listed.
- With the v2 plugin, it should also print a log line like `INFO [datahub_airflow_plugin.datahub_listener] DataHub plugin v2 using DataHubRestEmitter: configured to talk to <datahub_url>` during Airflow startup, and the `airflow plugins` command should list `datahub_plugin` with a listener enabled.
- If using the v2 plugin's automatic lineage, ensure that the `enable_extractors` config is set to true and that automatic lineage is supported for your operator.
- If using manual lineage annotation, ensure that you're using the `datahub_airflow_plugin.entities.Dataset` or `datahub_airflow_plugin.entities.Urn` classes for your inlets and outlets.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import functools
import logging
import os
import threading
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, TypeVar, cast

Expand Down Expand Up @@ -55,7 +56,10 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811

_airflow_listener_initialized = False
_airflow_listener: Optional["DataHubListener"] = None
_RUN_IN_THREAD = True
_RUN_IN_THREAD = os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD", "true").lower() in (
"true",
"1",
)
_RUN_IN_THREAD_TIMEOUT = 30


Expand Down Expand Up @@ -133,7 +137,7 @@ def __init__(self, config: DatahubLineageConfig):

self._emitter = config.make_emitter_hook().make_emitter()
self._graph: Optional[DataHubGraph] = None
logger.info(f"DataHub plugin using {repr(self._emitter)}")
logger.info(f"DataHub plugin v2 using {repr(self._emitter)}")

# See discussion here https://github.com/OpenLineage/OpenLineage/pull/508 for
# why we need to keep track of tasks ourselves.
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ def quickstart( # noqa: C901
logger.debug("docker compose up timed out, sending SIGTERM")
up_process.terminate()
try:
up_process.wait(timeout=3)
up_process.wait(timeout=8)
except subprocess.TimeoutExpired:
logger.debug("docker compose up still running, sending SIGKILL")
up_process.kill()
Expand Down
38 changes: 33 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView

import bson.timestamp
Expand Down Expand Up @@ -74,6 +75,12 @@
DENY_DATABASE_LIST = set(["admin", "config", "local"])


class HostingEnvironment(Enum):
SELF_HOSTED = "SELF_HOSTED"
ATLAS = "ATLAS"
AWS_DOCUMENTDB = "AWS_DOCUMENTDB"


class MongoDBConfig(
PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase
):
Expand Down Expand Up @@ -108,6 +115,11 @@ class MongoDBConfig(
# errors out with "16793600" as the maximum size supported.
maxDocumentSize: Optional[PositiveInt] = Field(default=16793600, description="")

hostingEnvironment: Optional[HostingEnvironment] = Field(
default=HostingEnvironment.SELF_HOSTED,
description="Hosting environment of MongoDB, default is SELF_HOSTED, currently support `SELF_HOSTED`, `ATLAS`, `AWS_DOCUMENTDB`",
)

database_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="regex patterns for databases to filter in ingestion.",
Expand Down Expand Up @@ -176,7 +188,7 @@ def construct_schema_pymongo(
delimiter: str,
use_random_sampling: bool,
max_document_size: int,
is_version_gte_4_4: bool,
should_add_document_size_filter: bool,
sample_size: Optional[int] = None,
) -> Dict[Tuple[str, ...], SchemaDescription]:
"""
Expand All @@ -191,15 +203,19 @@ def construct_schema_pymongo(
the PyMongo collection
delimiter:
string to concatenate field names by
use_random_sampling:
boolean to indicate if random sampling should be added to aggregation
max_document_size:
maximum size of the document that will be considered for generating the schema.
should_add_document_size_filter:
boolean to indicate if document size filter should be added to aggregation
sample_size:
number of items in the collection to sample
(reads entire collection if not provided)
max_document_size:
maximum size of the document that will be considered for generating the schema.
"""

aggregations: List[Dict] = []
if is_version_gte_4_4:
if should_add_document_size_filter:
doc_size_field = "temporary_doc_size_field"
# create a temporary field to store the size of the document. filter on it and then remove it.
aggregations = [
Expand Down Expand Up @@ -381,7 +397,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
delimiter=".",
use_random_sampling=self.config.useRandomSampling,
max_document_size=self.config.maxDocumentSize,
is_version_gte_4_4=self.is_server_version_gte_4_4(),
should_add_document_size_filter=self.should_add_document_size_filter(),
sample_size=self.config.schemaSamplingSize,
)

Expand Down Expand Up @@ -475,6 +491,18 @@ def is_server_version_gte_4_4(self) -> bool:

return False

def is_hosted_on_aws_documentdb(self) -> bool:
return self.config.hostingEnvironment == HostingEnvironment.AWS_DOCUMENTDB

def should_add_document_size_filter(self) -> bool:
# the operation $bsonsize is only available in server version greater than 4.4
# and is not supported by AWS DocumentDB, we should only add this operation to
# aggregation for mongodb that doesn't run on AWS DocumentDB and version is greater than 4.4
# https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html
return (
self.is_server_version_gte_4_4() and not self.is_hosted_on_aws_documentdb()
)

def get_report(self) -> MongoDBSourceReport:
return self.report

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.datahub.authentication.user;

import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationConfiguration;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.events.metadata.ChangeType;
Expand Down Expand Up @@ -34,6 +36,7 @@ public class NativeUserService {
private final EntityService _entityService;
private final EntityClient _entityClient;
private final SecretService _secretService;
private final AuthenticationConfiguration _authConfig;

public void createNativeUser(@Nonnull String userUrnString, @Nonnull String fullName, @Nonnull String email,
@Nonnull String title, @Nonnull String password, @Nonnull Authentication authentication) throws Exception {
Expand All @@ -45,7 +48,12 @@ public void createNativeUser(@Nonnull String userUrnString, @Nonnull String full
Objects.requireNonNull(authentication, "authentication must not be null!");

final Urn userUrn = Urn.createFromString(userUrnString);
if (_entityService.exists(userUrn) || userUrn.toString().equals(SYSTEM_ACTOR)) {
if (_entityService.exists(userUrn)
// Should never fail these due to Controller level check, but just in case more usages get put in
|| userUrn.toString().equals(SYSTEM_ACTOR)
|| userUrn.toString().equals(new CorpuserUrn(_authConfig.getSystemClientId()).toString())
|| userUrn.toString().equals(DATAHUB_ACTOR)
|| userUrn.toString().equals(UNKNOWN_ACTOR)) {
throw new RuntimeException("This user already exists! Cannot create a new user.");
}
updateCorpUserInfo(userUrn, fullName, email, title, authentication);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.datahub.authentication.Actor;
import com.datahub.authentication.ActorType;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationConfiguration;
import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.entity.client.EntityClient;
Expand Down Expand Up @@ -48,8 +49,10 @@ public void setupTest() throws Exception {
_entityService = mock(EntityService.class);
_entityClient = mock(EntityClient.class);
_secretService = mock(SecretService.class);
AuthenticationConfiguration authenticationConfiguration = new AuthenticationConfiguration();
authenticationConfiguration.setSystemClientId("someCustomId");

_nativeUserService = new NativeUserService(_entityService, _entityClient, _secretService);
_nativeUserService = new NativeUserService(_entityService, _entityClient, _secretService, authenticationConfiguration);
}

@Test
Expand All @@ -74,6 +77,16 @@ public void testCreateNativeUserUserAlreadyExists() throws Exception {
_nativeUserService.createNativeUser(USER_URN_STRING, FULL_NAME, EMAIL, TITLE, PASSWORD, SYSTEM_AUTHENTICATION);
}

@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "This user already exists! Cannot create a new user.")
public void testCreateNativeUserUserDatahub() throws Exception {
_nativeUserService.createNativeUser(DATAHUB_ACTOR, FULL_NAME, EMAIL, TITLE, PASSWORD, SYSTEM_AUTHENTICATION);
}

@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "This user already exists! Cannot create a new user.")
public void testCreateNativeUserUserSystemUser() throws Exception {
_nativeUserService.createNativeUser(SYSTEM_ACTOR, FULL_NAME, EMAIL, TITLE, PASSWORD, SYSTEM_AUTHENTICATION);
}

@Test
public void testCreateNativeUserPasses() throws Exception {
when(_entityService.exists(any())).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import java.util.concurrent.CompletableFuture;
Expand All @@ -28,6 +29,8 @@
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.HttpClientErrorException;

import static com.linkedin.metadata.Constants.*;


@Slf4j
@RestController
Expand Down Expand Up @@ -177,6 +180,11 @@ CompletableFuture<ResponseEntity<String>> signUp(final HttpEntity<String> httpEn
}

String userUrnString = userUrn.asText();
String systemClientUser = new CorpuserUrn(_configProvider.getAuthentication().getSystemClientId()).toString();

if (userUrnString.equals(systemClientUser) || userUrnString.equals(DATAHUB_ACTOR) || userUrnString.equals(UNKNOWN_ACTOR)) {
return CompletableFuture.completedFuture(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
String fullNameString = fullName.asText();
String emailString = email.asText();
String titleString = title.asText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.linkedin.gms.factory.auth;

import com.datahub.authentication.user.NativeUserService;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.entity.EntityService;
Expand Down Expand Up @@ -31,10 +32,14 @@ public class NativeUserServiceFactory {
@Qualifier("dataHubSecretService")
private SecretService _secretService;

@Autowired
private ConfigurationProvider _configurationProvider;

@Bean(name = "nativeUserService")
@Scope("singleton")
@Nonnull
protected NativeUserService getInstance() throws Exception {
return new NativeUserService(this._entityService, this._javaEntityClient, this._secretService);
return new NativeUserService(_entityService, _javaEntityClient, _secretService,
_configurationProvider.getAuthentication());
}
}

0 comments on commit e10197b

Please sign in to comment.