Skip to content

Commit

Permalink
Merge branch 'master' into cus2240-lookml-ingestion-fails
Browse files Browse the repository at this point in the history
  • Loading branch information
sid-acryl authored Jul 27, 2024
2 parents dd02704 + d85da39 commit f6d97c0
Show file tree
Hide file tree
Showing 43 changed files with 636 additions and 306 deletions.
1 change: 1 addition & 0 deletions datahub-graphql-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
implementation externalDependency.opentelemetryAnnotations

implementation externalDependency.slf4jApi
implementation externalDependency.springContext
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.time.DateTimeException;
import java.time.ZoneId;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.support.CronExpression;

/** Creates or updates an ingestion source. Requires the MANAGE_INGESTION privilege. */
@Slf4j
Expand All @@ -46,55 +50,51 @@ public UpsertIngestionSourceResolver(final EntityClient entityClient) {
public CompletableFuture<String> get(final DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();

return GraphQLConcurrencyUtils.supplyAsync(
() -> {
if (IngestionAuthUtils.canManageIngestion(context)) {

final Optional<String> ingestionSourceUrn =
Optional.ofNullable(environment.getArgument("urn"));
final UpdateIngestionSourceInput input =
bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);
if (!IngestionAuthUtils.canManageIngestion(context)) {
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
}
final Optional<String> ingestionSourceUrn = Optional.ofNullable(environment.getArgument("urn"));
final UpdateIngestionSourceInput input =
bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);

// Create the policy info.
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
final MetadataChangeProposal proposal;
if (ingestionSourceUrn.isPresent()) {
// Update existing ingestion source
try {
proposal =
buildMetadataChangeProposalWithUrn(
Urn.createFromString(ingestionSourceUrn.get()),
INGESTION_INFO_ASPECT_NAME,
info);
} catch (URISyntaxException e) {
throw new DataHubGraphQLException(
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
} else {
// Create new ingestion source
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
final UUID uuid = UUID.randomUUID();
final String uuidStr = uuid.toString();
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId(uuidStr);
proposal =
buildMetadataChangeProposalWithKey(
key, INGESTION_SOURCE_ENTITY_NAME, INGESTION_INFO_ASPECT_NAME, info);
}
// Create the policy info.
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
final MetadataChangeProposal proposal;
if (ingestionSourceUrn.isPresent()) {
// Update existing ingestion source
try {
proposal =
buildMetadataChangeProposalWithUrn(
Urn.createFromString(ingestionSourceUrn.get()), INGESTION_INFO_ASPECT_NAME, info);
} catch (URISyntaxException e) {
throw new DataHubGraphQLException(
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
} else {
// Create new ingestion source
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
final UUID uuid = UUID.randomUUID();
final String uuidStr = uuid.toString();
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId(uuidStr);
proposal =
buildMetadataChangeProposalWithKey(
key, INGESTION_SOURCE_ENTITY_NAME, INGESTION_INFO_ASPECT_NAME, info);
}

try {
return _entityClient.ingestProposal(context.getOperationContext(), proposal, false);
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to perform update against ingestion source with urn %s",
input.toString()),
e);
}
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
return _entityClient.ingestProposal(context.getOperationContext(), proposal, false);
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to perform update against ingestion source with urn %s",
input.toString()),
e);
}
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
},
this.getClass().getSimpleName(),
"get");
Expand Down Expand Up @@ -137,9 +137,38 @@ private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfig

private DataHubIngestionSourceSchedule mapSchedule(
final UpdateIngestionSourceScheduleInput input) {

final String modifiedCronInterval = adjustCronInterval(input.getInterval());
try {
CronExpression.parse(modifiedCronInterval);
} catch (IllegalArgumentException e) {
throw new DataHubGraphQLException(
String.format("Invalid cron schedule `%s`: %s", input.getInterval(), e.getMessage()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
try {
ZoneId.of(input.getTimezone());
} catch (DateTimeException e) {
throw new DataHubGraphQLException(
String.format("Invalid timezone `%s`: %s", input.getTimezone(), e.getMessage()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}

final DataHubIngestionSourceSchedule result = new DataHubIngestionSourceSchedule();
result.setInterval(input.getInterval());
result.setTimezone(input.getTimezone());
return result;
}

// Copied from IngestionScheduler.java
private String adjustCronInterval(final String origCronInterval) {
Objects.requireNonNull(origCronInterval, "origCronInterval must not be null");
// Typically we support 5-character cron. Spring's lib only supports 6 character cron so we make
// an adjustment here.
final String[] originalCronParts = origCronInterval.split(" ");
if (originalCronParts.length == 5) {
return String.format("0 %s", origCronInterval);
}
return origCronInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.testng.Assert.*;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLException;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceConfigInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceScheduleInput;
Expand All @@ -22,14 +23,17 @@

public class UpsertIngestionSourceResolverTest {

private static final UpdateIngestionSourceInput TEST_INPUT =
new UpdateIngestionSourceInput(
"Test source",
"mysql",
"Test source description",
new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"),
new UpdateIngestionSourceConfigInput(
"my test recipe", "0.8.18", "executor id", false, null));
private static final UpdateIngestionSourceInput TEST_INPUT = makeInput();

private static UpdateIngestionSourceInput makeInput() {
return new UpdateIngestionSourceInput(
"Test source",
"mysql",
"Test source description",
new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"),
new UpdateIngestionSourceConfigInput(
"my test recipe", "0.8.18", "executor id", false, null));
}

@Test
public void testGetSuccess() throws Exception {
Expand Down Expand Up @@ -104,4 +108,54 @@ public void testGetEntityClientException() throws Exception {

assertThrows(RuntimeException.class, () -> resolver.get(mockEnv).join());
}

@Test
public void testUpsertWithInvalidCron() throws Exception {
final UpdateIngestionSourceInput input = makeInput();
input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * 123", "UTC"));

// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);

// Execute resolver
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getArgument(Mockito.eq("urn")))
.thenReturn(TEST_INGESTION_SOURCE_URN.toString());
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());

input.setSchedule(new UpdateIngestionSourceScheduleInput("null", "UTC"));
assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());
}

@Test
public void testUpsertWithInvalidTimezone() throws Exception {
final UpdateIngestionSourceInput input = makeInput();
input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * *", "Invalid"));

// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);

// Execute resolver
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getArgument(Mockito.eq("urn")))
.thenReturn(TEST_INGESTION_SOURCE_URN.toString());
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());

input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * *", "America/Los_Angel"));
assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());
}
}
17 changes: 14 additions & 3 deletions datahub-web-react/src/app/ProtectedRoutes.tsx
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
import React from 'react';
import { Switch, Route } from 'react-router-dom';
import React, { useEffect } from 'react';
import { Switch, Route, useLocation, useHistory } from 'react-router-dom';
import { Layout } from 'antd';
import { HomePage } from './home/HomePage';
import { SearchRoutes } from './SearchRoutes';
import EmbedRoutes from './EmbedRoutes';
import { PageRoutes } from '../conf/Global';
import { NEW_ROUTE_MAP, PageRoutes } from '../conf/Global';
import { getRedirectUrl } from '../conf/utils';

/**
* Container for all views behind an authentication wall.
*/
export const ProtectedRoutes = (): JSX.Element => {
const location = useLocation();
const history = useHistory();

useEffect(() => {
if (location.pathname.indexOf('/Validation') !== -1) {
history.replace(getRedirectUrl(NEW_ROUTE_MAP));
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [location]);

return (
<Layout>
<Switch>
Expand Down
1 change: 1 addition & 0 deletions datahub-web-react/src/app/context/UserContextProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ const UserContextProvider = ({ children }: { children: React.ReactNode }) => {
return (
<UserContext.Provider
value={{
loaded: !!meData,
urn: meData?.me?.corpUser?.urn,
user: meData?.me?.corpUser as CorpUser,
platformPrivileges: meData?.me?.platformPrivileges as PlatformPrivileges,
Expand Down
2 changes: 2 additions & 0 deletions datahub-web-react/src/app/context/userContext.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export type State = {
* Context about the currently-authenticated user.
*/
export type UserContextType = {
loaded: boolean;
urn?: string | null;
user?: CorpUser | null;
platformPrivileges?: PlatformPrivileges | null;
Expand All @@ -53,6 +54,7 @@ export const DEFAULT_STATE: State = {
};

export const DEFAULT_CONTEXT = {
loaded: false,
urn: undefined,
user: undefined,
state: DEFAULT_STATE,
Expand Down
17 changes: 13 additions & 4 deletions datahub-web-react/src/app/entity/dataset/DatasetEntity.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import AccessManagement from '../shared/tabs/Dataset/AccessManagement/AccessMana
import { matchedFieldPathsRenderer } from '../../search/matches/matchedFieldPathsRenderer';
import { getLastUpdatedMs } from './shared/utils';
import { IncidentTab } from '../shared/tabs/Incident/IncidentTab';
import { GovernanceTab } from '../shared/tabs/Dataset/Governance/GovernanceTab';

const SUBTYPES = {
VIEW: 'view',
Expand Down Expand Up @@ -166,14 +167,22 @@ export class DatasetEntity implements Entity<Dataset> {
},
},
{
name: 'Validation',
name: 'Quality',
component: ValidationsTab,
display: {
visible: (_, _1) => true,
enabled: (_, dataset: GetDatasetQuery) => {
return (
(dataset?.dataset?.assertions?.total || 0) > 0 || dataset?.dataset?.testResults !== null
);
return (dataset?.dataset?.assertions?.total || 0) > 0;
},
},
},
{
name: 'Governance',
component: GovernanceTab,
display: {
visible: (_, _1) => true,
enabled: (_, dataset: GetDatasetQuery) => {
return dataset?.dataset?.testResults !== null;
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const EntityHealth = ({ health, baseUrl, fontSize, tooltipPlacement }: Pr
return (
<>
{(unhealthy && (
<Link to={`${baseUrl}/Validation`}>
<Link to={`${baseUrl}/Quality`}>
<Container>
<EntityHealthPopover health={health} baseUrl={baseUrl} placement={tooltipPlacement}>
{icon}
Expand Down
Loading

0 comments on commit f6d97c0

Please sign in to comment.