diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index 30c38720dd96c..7ca5ce49019ab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -388,7 +388,7 @@ def _get_field_type( # if still not found, log and continue if type_class is None: - logger.info( + logger.debug( f"The type '{native_type}' is not recognized for field type, setting as NullTypeClass.", ) type_class = NullTypeClass diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py index 98d58c9fc9d87..e6ddea9a30489 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_config.py @@ -205,6 +205,10 @@ class LookerDashboardSourceConfig( False, description="Extract looks which are not part of any Dashboard. To enable this flag the stateful_ingestion should also be enabled.", ) + emit_used_explores_only: bool = Field( + True, + description="When enabled, only explores that are used by a Dashboard/Look will be ingested.", + ) @validator("external_base_url", pre=True, always=True) def external_url_defaults_to_api_config_base_url( diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py index b00f74b71e792..988caba1c0d74 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py @@ -59,6 +59,7 @@ class LookerAPIStats(BaseModel): lookml_model_calls: int = 0 all_dashboards_calls: int = 0 all_looks_calls: int = 0 + all_models_calls: int = 0 get_query_calls: int = 0 search_looks_calls: int = 0 search_dashboards_calls: int = 0 @@ -155,6 +156,12 @@ def dashboard(self, dashboard_id: str, fields: Union[str, List[str]]) -> Dashboa transport_options=self.transport_options, ) + def all_lookml_models(self) -> Sequence[LookmlModel]: + self.client_stats.all_models_calls += 1 + return self.client.all_lookml_models( + transport_options=self.transport_options, + ) + def lookml_model_explore(self, model: str, explore_name: str) -> LookmlModelExplore: self.client_stats.explore_calls += 1 return self.client.lookml_model_explore( diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index 09683d790c14c..4a98e8874bca0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -147,9 +147,12 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext): ) self.reporter._looker_explore_registry = self.explore_registry self.reporter._looker_api = self.looker_api + self.reachable_look_registry = set() - self.explores_to_fetch_set: Dict[Tuple[str, str], List[str]] = {} + # (model, explore) -> list of charts/looks/dashboards that reference this explore + # The list values are used purely for debugging purposes. + self.reachable_explores: Dict[Tuple[str, str], List[str]] = {} # Keep stat generators to generate entity stat aspect later stat_generator_config: looker_usage.StatGeneratorConfig = ( @@ -378,11 +381,11 @@ def _get_input_fields_from_query( return result - def add_explore_to_fetch(self, model: str, explore: str, via: str) -> None: - if (model, explore) not in self.explores_to_fetch_set: - self.explores_to_fetch_set[(model, explore)] = [] + def add_reachable_explore(self, model: str, explore: str, via: str) -> None: + if (model, explore) not in self.reachable_explores: + self.reachable_explores[(model, explore)] = [] - self.explores_to_fetch_set[(model, explore)].append(via) + self.reachable_explores[(model, explore)].append(via) def _get_looker_dashboard_element( # noqa: C901 self, element: DashboardElement @@ -403,7 +406,7 @@ def _get_looker_dashboard_element( # noqa: C901 f"Element {element.title}: Explores added via query: {explores}" ) for exp in explores: - self.add_explore_to_fetch( + self.add_reachable_explore( model=element.query.model, explore=exp, via=f"look:{element.look_id}:query:{element.dashboard_id}", @@ -439,7 +442,7 @@ def _get_looker_dashboard_element( # noqa: C901 explores = [element.look.query.view] logger.debug(f"Element {title}: Explores added via look: {explores}") for exp in explores: - self.add_explore_to_fetch( + self.add_reachable_explore( model=element.look.query.model, explore=exp, via=f"Look:{element.look_id}:query:{element.dashboard_id}", @@ -483,7 +486,7 @@ def _get_looker_dashboard_element( # noqa: C901 ) for exp in explores: - self.add_explore_to_fetch( + self.add_reachable_explore( model=element.result_maker.query.model, explore=exp, via=f"Look:{element.look_id}:resultmaker:query", @@ -495,7 +498,7 @@ def _get_looker_dashboard_element( # noqa: C901 if filterable.view is not None and filterable.model is not None: model = filterable.model explores.append(filterable.view) - self.add_explore_to_fetch( + self.add_reachable_explore( model=filterable.model, explore=filterable.view, via=f"Look:{element.look_id}:resultmaker:filterable", @@ -694,20 +697,26 @@ def _make_dashboard_metadata_events( def _make_explore_metadata_events( self, ) -> Iterable[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]: + if self.source_config.emit_used_explores_only: + explores_to_fetch = list(self.reachable_explores.keys()) + else: + explores_to_fetch = list(self.list_all_explores()) + explores_to_fetch.sort() + with concurrent.futures.ThreadPoolExecutor( max_workers=self.source_config.max_threads ) as async_executor: - self.reporter.total_explores = len(self.explores_to_fetch_set) + self.reporter.total_explores = len(explores_to_fetch) explore_futures = { async_executor.submit(self.fetch_one_explore, model, explore): ( model, explore, ) - for (model, explore) in self.explores_to_fetch_set + for (model, explore) in explores_to_fetch } - for future in concurrent.futures.as_completed(explore_futures): + for future in concurrent.futures.wait(explore_futures).done: events, explore_id, start_time, end_time = future.result() del explore_futures[future] self.reporter.explores_scanned += 1 @@ -717,6 +726,17 @@ def _make_explore_metadata_events( f"Running time of fetch_one_explore for {explore_id}: {(end_time - start_time).total_seconds()}" ) + def list_all_explores(self) -> Iterable[Tuple[str, str]]: + # returns a list of (model, explore) tuples + + for model in self.looker_api.all_lookml_models(): + if model.name is None or model.explores is None: + continue + for explore in model.explores: + if explore.name is None: + continue + yield (model.name, explore.name) + def fetch_one_explore( self, model: str, explore: str ) -> Tuple[ @@ -954,7 +974,7 @@ def _input_fields_from_dashboard_element( ) if explore is not None: # add this to the list of explores to finally generate metadata for - self.add_explore_to_fetch( + self.add_reachable_explore( input_field.model, input_field.explore, entity_urn ) entity_urn = explore.get_explore_urn(self.source_config)