Skip to content

Commit

Permalink
Merge branch 'master' into feat-improve-mongodb
Browse files Browse the repository at this point in the history
  • Loading branch information
TonyOuyangGit authored Oct 27, 2023
2 parents c20f4c5 + e02b909 commit 09b45f5
Show file tree
Hide file tree
Showing 32 changed files with 3,608 additions and 1,629 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps)
<Input
data-testid="cli-version-input"
className="cli-version-input"
placeholder="(e.g. 0.10.5)"
placeholder="(e.g. 0.12.0)"
value={state.config?.version || ''}
onChange={(event) => setVersion(event.target.value)}
/>
Expand Down
12 changes: 3 additions & 9 deletions datahub-web-react/src/app/search/useGetSearchQueryInputs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { useLocation, useParams } from 'react-router';
import { useMemo } from 'react';
import { FacetFilterInput, EntityType } from '../../types.generated';
import { useEntityRegistry } from '../useEntityRegistry';
import { ENTITY_FILTER_NAME, FILTER_DELIMITER, UnionType } from './utils/constants';
import { ENTITY_FILTER_NAME, UnionType } from './utils/constants';
import { useUserContext } from '../context/useUserContext';
import useFilters from './utils/useFilters';
import { generateOrFilters } from './utils/generateOrFilters';
Expand All @@ -27,12 +27,6 @@ export default function useGetSearchQueryInputs(excludedFilterFields?: Array<str
const sortInput = useSortInput();

const filters: Array<FacetFilterInput> = useFilters(params);
const nonNestedFilters = filters.filter(
(f) => !f.field.includes(FILTER_DELIMITER) && !excludedFilterFields?.includes(f.field),
);
const nestedFilters = filters.filter(
(f) => f.field.includes(FILTER_DELIMITER) && !excludedFilterFields?.includes(f.field),
);
const entityFilters: Array<EntityType> = useMemo(
() =>
filters
Expand All @@ -43,8 +37,8 @@ export default function useGetSearchQueryInputs(excludedFilterFields?: Array<str
);

const orFilters = useMemo(
() => generateOrFilters(unionType, nonNestedFilters, nestedFilters),
[nonNestedFilters, nestedFilters, unionType],
() => generateOrFilters(unionType, filters, excludedFilterFields),
[filters, excludedFilterFields, unionType],
);

return { entityFilters, query, unionType, filters, orFilters, viewUrn, page, activeType, sortInput };
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
DOMAINS_FILTER_NAME,
ENTITY_SUB_TYPE_FILTER_NAME,
ENTITY_TYPE_FILTER_NAME,
ENTITY_FILTER_NAME,
TAGS_FILTER_NAME,
UnionType,
} from '../constants';
Expand All @@ -10,7 +10,7 @@ import { generateOrFilters } from '../generateOrFilters';
describe('generateOrFilters', () => {
it('should generate orFilters with UnionType.AND', () => {
const filters = [
{ field: ENTITY_TYPE_FILTER_NAME, values: ['DATASET', 'CONTAINER'] },
{ field: ENTITY_FILTER_NAME, values: ['DATASET', 'CONTAINER'] },
{ field: TAGS_FILTER_NAME, values: ['urn:li:tag:tag1'] },
];
const orFilters = generateOrFilters(UnionType.AND, filters);
Expand All @@ -24,7 +24,7 @@ describe('generateOrFilters', () => {

it('should generate orFilters with UnionType.OR', () => {
const filters = [
{ field: ENTITY_TYPE_FILTER_NAME, values: ['DATASET', 'CONTAINER'] },
{ field: ENTITY_FILTER_NAME, values: ['DATASET', 'CONTAINER'] },
{ field: TAGS_FILTER_NAME, values: ['urn:li:tag:tag1'] },
];
const orFilters = generateOrFilters(UnionType.OR, filters);
Expand All @@ -43,17 +43,23 @@ describe('generateOrFilters', () => {
const filters = [
{ field: TAGS_FILTER_NAME, values: ['urn:li:tag:tag1'] },
{ field: DOMAINS_FILTER_NAME, values: ['urn:li:domains:domain1'] },
{ field: ENTITY_SUB_TYPE_FILTER_NAME, values: ['CONTAINER', 'DATASET␞table'] },
];
const nestedFilters = [{ field: ENTITY_SUB_TYPE_FILTER_NAME, values: ['CONTAINER', 'DATASET␞table'] }];
const orFilters = generateOrFilters(UnionType.AND, filters, nestedFilters);
// const nestedFilters = [{ field: ENTITY_SUB_TYPE_FILTER_NAME, values: ['CONTAINER', 'DATASET␞table'] }];
const orFilters = generateOrFilters(UnionType.AND, filters);

expect(orFilters).toMatchObject([
{
and: [...filters, { field: '_entityType', values: ['CONTAINER'] }],
and: [
{ field: TAGS_FILTER_NAME, values: ['urn:li:tag:tag1'] },
{ field: DOMAINS_FILTER_NAME, values: ['urn:li:domains:domain1'] },
{ field: '_entityType', values: ['CONTAINER'] },
],
},
{
and: [
...filters,
{ field: TAGS_FILTER_NAME, values: ['urn:li:tag:tag1'] },
{ field: DOMAINS_FILTER_NAME, values: ['urn:li:domains:domain1'] },
{ field: '_entityType', values: ['DATASET'] },
{ field: 'typeNames', values: ['table'] },
],
Expand All @@ -65,9 +71,9 @@ describe('generateOrFilters', () => {
const filters = [
{ field: TAGS_FILTER_NAME, values: ['urn:li:tag:tag1'] },
{ field: DOMAINS_FILTER_NAME, values: ['urn:li:domains:domain1'] },
{ field: ENTITY_SUB_TYPE_FILTER_NAME, values: ['CONTAINER', 'DATASET␞table'] },
];
const nestedFilters = [{ field: ENTITY_SUB_TYPE_FILTER_NAME, values: ['CONTAINER', 'DATASET␞table'] }];
const orFilters = generateOrFilters(UnionType.OR, filters, nestedFilters);
const orFilters = generateOrFilters(UnionType.OR, filters);

expect(orFilters).toMatchObject([
{
Expand All @@ -87,4 +93,18 @@ describe('generateOrFilters', () => {
},
]);
});

it('should generate orFilters and exclude filters with a provided exclude field', () => {
const filters = [
{ field: ENTITY_FILTER_NAME, values: ['DATASET', 'CONTAINER'] },
{ field: TAGS_FILTER_NAME, values: ['urn:li:tag:tag1'] },
];
const orFilters = generateOrFilters(UnionType.AND, filters, [ENTITY_FILTER_NAME]);

expect(orFilters).toMatchObject([
{
and: [{ field: TAGS_FILTER_NAME, values: ['urn:li:tag:tag1'] }],
},
]);
});
});
16 changes: 11 additions & 5 deletions datahub-web-react/src/app/search/utils/generateOrFilters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,34 @@ function generateInputWithNestedFilters(filters: FacetFilterInput[], nestedFilte
export function generateOrFilters(
unionType: UnionType,
filters: FacetFilterInput[],
nestedFilters: FacetFilterInput[] = [],
excludedFilterFields: string[] = [],
): AndFilterInput[] {
if ((filters?.length || 0) === 0 && nestedFilters.length === 0) {
if ((filters?.length || 0) === 0) {
return [];
}
const nonNestedFilters = filters.filter(
(f) => !f.field.includes(FILTER_DELIMITER) && !excludedFilterFields?.includes(f.field),
);
const nestedFilters = filters.filter(
(f) => f.field.includes(FILTER_DELIMITER) && !excludedFilterFields?.includes(f.field),
);

if (unionType === UnionType.OR) {
const orFiltersWithNestedFilters = generateInputWithNestedFilters([], nestedFilters);
const orFilters = filters.map((filter) => ({
const orFilters = nonNestedFilters.map((filter) => ({
and: [filter],
}));
return [...orFilters, ...orFiltersWithNestedFilters];
}
const andFiltersWithNestedFilters = generateInputWithNestedFilters(filters, nestedFilters);
const andFiltersWithNestedFilters = generateInputWithNestedFilters(nonNestedFilters, nestedFilters);

if (andFiltersWithNestedFilters.length) {
return andFiltersWithNestedFilters;
}

return [
{
and: filters,
and: nonNestedFilters,
},
];
}
29 changes: 23 additions & 6 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -442,11 +442,29 @@ module.exports = {
},
"docs/act-on-metadata/impact-analysis",
{
Observability: [
"docs/managed-datahub/observe/freshness-assertions",
"docs/managed-datahub/observe/volume-assertions",
"docs/managed-datahub/observe/custom-sql-assertions",
"docs/managed-datahub/observe/column-assertions",
label: "Observability",
type: "category",
items: [
{
type: "doc",
id: "docs/managed-datahub/observe/freshness-assertions",
className: "saasOnly",
},
{
type: "doc",
id: "docs/managed-datahub/observe/volume-assertions",
className: "saasOnly",
},
{
type: "doc",
id: "docs/managed-datahub/observe/custom-sql-assertions",
className: "saasOnly",
},
{
type: "doc",
id: "docs/managed-datahub/observe/column-assertions",
className: "saasOnly",
},
],
},
{
Expand Down Expand Up @@ -606,7 +624,6 @@ module.exports = {
{
type: "doc",
id: "docs/managed-datahub/chrome-extension",
className: "saasOnly",
},
{
"Managed DataHub Release History": [
Expand Down
1 change: 1 addition & 0 deletions docs-website/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[
"0.12.0",
"0.11.0",
"0.10.5"
]
2 changes: 0 additions & 2 deletions docs/managed-datahub/chrome-extension.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
---
description: Learn how to upload and use the Acryl DataHub Chrome extension (beta) locally before it's available on the Chrome store.
---
import FeatureAvailability from '@site/src/components/FeatureAvailability';

# Acryl DataHub Chrome Extension
<FeatureAvailability saasOnly />

## Installing the Extension

Expand Down
2 changes: 1 addition & 1 deletion gradle/versioning/versioning.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Produces the following variables and supports token replacement
import org.apache.tools.ant.filters.ReplaceTokens

def detailedVersionString = "0.0.0-unknown-SNAPSHOT"
def cliMajorVersion = "0.10.5" // base default cli major version
def cliMajorVersion = "0.12.0" // base default cli major version
def snapshotVersion = false
if (project.hasProperty("releaseVersion")) {
version = releaseVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _get_dependencies(
# It is possible to tie an external sensor to DAG if external_task_id is omitted but currently we can't tie
# jobflow to anothet jobflow.
external_task_upstreams = []
if task.task_type == "ExternalTaskSensor":
if isinstance(task, ExternalTaskSensor):
task = cast(ExternalTaskSensor, task)
if hasattr(task, "external_task_id") and task.external_task_id is not None:
external_task_upstreams = [
Expand Down Expand Up @@ -155,6 +155,8 @@ def generate_dataflow(
"_concurrency",
# "_default_view",
"catchup",
"description",
"doc_md",
"fileloc",
"is_paused_upon_creation",
"start_date",
Expand Down Expand Up @@ -431,6 +433,9 @@ def run_datajob(
job_property_bag["operator"] = str(ti.operator)
job_property_bag["priority_weight"] = str(ti.priority_weight)
job_property_bag["log_url"] = ti.log_url
job_property_bag["orchestrator"] = "airflow"
job_property_bag["dag_id"] = str(dag.dag_id)
job_property_bag["task_id"] = str(ti.task_id)
dpi.properties.update(job_property_bag)
dpi.url = ti.log_url

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

TASK_ON_FAILURE_CALLBACK = "on_failure_callback"
TASK_ON_SUCCESS_CALLBACK = "on_success_callback"
TASK_ON_RETRY_CALLBACK = "on_retry_callback"


def get_task_inlets_advanced(task: BaseOperator, context: Any) -> Iterable[Any]:
Expand Down Expand Up @@ -259,6 +260,28 @@ def custom_on_success_callback(context):
return custom_on_success_callback


def _wrap_on_retry_callback(on_retry_callback):
def custom_on_retry_callback(context):
config = get_lineage_config()
if config.enabled:
context["_datahub_config"] = config
try:
datahub_task_status_callback(
context, status=InstanceRunResult.UP_FOR_RETRY
)
except Exception as e:
if not config.graceful_exceptions:
raise e
else:
print(f"Exception: {traceback.format_exc()}")

# Call original policy
if on_retry_callback:
on_retry_callback(context)

return custom_on_retry_callback


def task_policy(task: Union[BaseOperator, MappedOperator]) -> None:
task.log.debug(f"Setting task policy for Dag: {task.dag_id} Task: {task.task_id}")
# task.add_inlets(["auto"])
Expand All @@ -274,7 +297,14 @@ def task_policy(task: Union[BaseOperator, MappedOperator]) -> None:
on_success_callback_prop: property = getattr(
MappedOperator, TASK_ON_SUCCESS_CALLBACK
)
if not on_failure_callback_prop.fset or not on_success_callback_prop.fset:
on_retry_callback_prop: property = getattr(
MappedOperator, TASK_ON_RETRY_CALLBACK
)
if (
not on_failure_callback_prop.fset
or not on_success_callback_prop.fset
or not on_retry_callback_prop.fset
):
task.log.debug(
"Using MappedOperator's partial_kwargs instead of callback properties"
)
Expand All @@ -284,10 +314,14 @@ def task_policy(task: Union[BaseOperator, MappedOperator]) -> None:
task.partial_kwargs[TASK_ON_SUCCESS_CALLBACK] = _wrap_on_success_callback(
task.on_success_callback
)
task.partial_kwargs[TASK_ON_RETRY_CALLBACK] = _wrap_on_retry_callback(
task.on_retry_callback
)
return

task.on_failure_callback = _wrap_on_failure_callback(task.on_failure_callback) # type: ignore
task.on_success_callback = _wrap_on_success_callback(task.on_success_callback) # type: ignore
task.on_retry_callback = _wrap_on_retry_callback(task.on_retry_callback) # type: ignore
# task.pre_execute = _wrap_pre_execution(task.pre_execute)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
"customProperties": {
"_access_control": "None",
"catchup": "False",
"description": "None",
"doc_md": "None",
"fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
Expand Down Expand Up @@ -373,7 +375,10 @@
"state": "success",
"operator": "BashOperator",
"priority_weight": "1",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets"
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets",
"orchestrator": "airflow",
"dag_id": "basic_iolets",
"task_id": "run_data_task"
},
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets",
"name": "basic_iolets_run_data_task_manual_run_test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
"customProperties": {
"_access_control": "None",
"catchup": "False",
"description": "'A simple DAG that runs a few fake data tasks.'",
"doc_md": "None",
"fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
Expand Down Expand Up @@ -302,7 +304,10 @@
"state": "success",
"operator": "BashOperator",
"priority_weight": "2",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag"
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag",
"orchestrator": "airflow",
"dag_id": "simple_dag",
"task_id": "task_1"
},
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag",
"name": "simple_dag_task_1_manual_run_test",
Expand Down Expand Up @@ -433,6 +438,8 @@
"customProperties": {
"_access_control": "None",
"catchup": "False",
"description": "'A simple DAG that runs a few fake data tasks.'",
"doc_md": "None",
"fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
Expand Down Expand Up @@ -654,7 +661,10 @@
"state": "success",
"operator": "BashOperator",
"priority_weight": "1",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag"
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag",
"orchestrator": "airflow",
"dag_id": "simple_dag",
"task_id": "run_another_data_task"
},
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag",
"name": "simple_dag_run_another_data_task_manual_run_test",
Expand Down
Loading

0 comments on commit 09b45f5

Please sign in to comment.