Skip to content

Commit

Permalink
feat(bigquery): excluding projects without any datasets from ingestion (
Browse files Browse the repository at this point in the history
#8535)

Co-authored-by: Upendra Vedullapalli <[email protected]>
Co-authored-by: Andrew Sikowitz <[email protected]>
  • Loading branch information
3 people authored Oct 4, 2023
1 parent e3780c2 commit 13508a9
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,6 @@ def _process_project(
db_views: Dict[str, List[BigqueryView]] = {}

project_id = bigquery_project.id

yield from self.gen_project_id_containers(project_id)

try:
bigquery_project.datasets = (
self.bigquery_data_dictionary.get_datasets_for_project_id(project_id)
Expand All @@ -619,11 +616,23 @@ def _process_project(
return None

if len(bigquery_project.datasets) == 0:
logger.warning(
f"No dataset found in {project_id}. Either there are no datasets in this project or missing bigquery.datasets.get permission. You can assign predefined roles/bigquery.metadataViewer role to your service account."
more_info = (
"Either there are no datasets in this project or missing bigquery.datasets.get permission. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account."
)
if self.config.exclude_empty_projects:
self.report.report_dropped(project_id)
warning_message = f"Excluded project '{project_id}' since no were datasets found. {more_info}"
else:
yield from self.gen_project_id_containers(project_id)
warning_message = (
f"No datasets found in project '{project_id}'. {more_info}"
)
logger.warning(warning_message)
return

yield from self.gen_project_id_containers(project_id)

self.report.num_project_datasets_to_scan[project_id] = len(
bigquery_project.datasets
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
description="Maximum number of entries for the in-memory caches of FileBacked data structures.",
)

exclude_empty_projects: bool = Field(
default=False,
description="Option to exclude empty projects from being ingested.",
)

@root_validator(pre=False)
def profile_default_settings(cls, values: Dict) -> Dict:
# Extra default SQLAlchemy option for better connection pooling and threading.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowR

usage_state_size: Optional[str] = None

exclude_empty_projects: Optional[bool] = None

schema_api_perf: BigQuerySchemaApiPerfReport = field(
default_factory=BigQuerySchemaApiPerfReport
)
Expand Down
53 changes: 51 additions & 2 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@
import os
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from typing import Any, Dict, Optional, cast
from typing import Any, Dict, List, Optional, cast
from unittest.mock import MagicMock, Mock, patch

import pytest
from google.api_core.exceptions import GoogleAPICallError
from google.cloud.bigquery.table import Row, TableListItem

from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.bigquery_v2.bigquery import BigqueryV2Source
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX,
BigqueryTableIdentifier,
BigQueryTableRef,
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryConnectionConfig,
BigQueryV2Config,
)
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryDataset,
BigqueryProject,
BigQuerySchemaApi,
BigqueryView,
Expand Down Expand Up @@ -854,3 +859,47 @@ def test_get_table_name(full_table_name: str, datahub_full_table_name: str) -> N
BigqueryTableIdentifier.from_string_name(full_table_name).get_table_name()
== datahub_full_table_name
)


def test_default_config_for_excluding_projects_and_datasets():
config = BigQueryV2Config.parse_obj({})
assert config.exclude_empty_projects is False
config = BigQueryV2Config.parse_obj({"exclude_empty_projects": True})
assert config.exclude_empty_projects


@patch.object(BigQueryConnectionConfig, "get_bigquery_client", new=lambda self: None)
@patch.object(BigQuerySchemaApi, "get_datasets_for_project_id")
def test_excluding_empty_projects_from_ingestion(
get_datasets_for_project_id_mock,
):
project_id_with_datasets = "project-id-with-datasets"
project_id_without_datasets = "project-id-without-datasets"

def get_datasets_for_project_id_side_effect(
project_id: str,
) -> List[BigqueryDataset]:
return (
[]
if project_id == project_id_without_datasets
else [BigqueryDataset("some-dataset")]
)

get_datasets_for_project_id_mock.side_effect = (
get_datasets_for_project_id_side_effect
)

base_config = {
"project_ids": [project_id_with_datasets, project_id_without_datasets],
"schema_pattern": AllowDenyPattern(deny=[".*"]),
"include_usage_statistics": False,
"include_table_lineage": False,
}

config = BigQueryV2Config.parse_obj(base_config)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-1"))
assert len({wu.metadata.entityUrn for wu in source.get_workunits()}) == 2 # type: ignore

config = BigQueryV2Config.parse_obj({**base_config, "exclude_empty_projects": True})
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-2"))
assert len({wu.metadata.entityUrn for wu in source.get_workunits()}) == 1 # type: ignore

0 comments on commit 13508a9

Please sign in to comment.