Skip to content

Commit

Permalink
feat(airflow): make RUN_IN_THREAD configurable (#9226)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Nov 14, 2023
1 parent cfeecd7 commit ec13847
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ In order to use this example, you must first configure the Datahub hook. Like in
If you're not seeing lineage in DataHub, check the following:

- Validate that the plugin is loaded in Airflow. Go to Admin -> Plugins and check that the DataHub plugin is listed.
- With the v2 plugin, it should also print a log line like `INFO [datahub_airflow_plugin.datahub_listener] DataHub plugin v2 using DataHubRestEmitter: configured to talk to <datahub_url>` during Airflow startup, and the `airflow plugins` command should list `datahub_plugin` with a listener enabled.
- If using the v2 plugin's automatic lineage, ensure that the `enable_extractors` config is set to true and that automatic lineage is supported for your operator.
- If using manual lineage annotation, ensure that you're using the `datahub_airflow_plugin.entities.Dataset` or `datahub_airflow_plugin.entities.Urn` classes for your inlets and outlets.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import functools
import logging
import os
import threading
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, TypeVar, cast

Expand Down Expand Up @@ -55,7 +56,10 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811

_airflow_listener_initialized = False
_airflow_listener: Optional["DataHubListener"] = None
_RUN_IN_THREAD = True
_RUN_IN_THREAD = os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD", "true").lower() in (
"true",
"1",
)
_RUN_IN_THREAD_TIMEOUT = 30


Expand Down Expand Up @@ -133,7 +137,7 @@ def __init__(self, config: DatahubLineageConfig):

self._emitter = config.make_emitter_hook().make_emitter()
self._graph: Optional[DataHubGraph] = None
logger.info(f"DataHub plugin using {repr(self._emitter)}")
logger.info(f"DataHub plugin v2 using {repr(self._emitter)}")

# See discussion here https://github.com/OpenLineage/OpenLineage/pull/508 for
# why we need to keep track of tasks ourselves.
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ def quickstart( # noqa: C901
logger.debug("docker compose up timed out, sending SIGTERM")
up_process.terminate()
try:
up_process.wait(timeout=3)
up_process.wait(timeout=8)
except subprocess.TimeoutExpired:
logger.debug("docker compose up still running, sending SIGKILL")
up_process.kill()
Expand Down

0 comments on commit ec13847

Please sign in to comment.