Skip to content

Commit

Permalink
Reduces scheduler logging to warnings and fixes a warning-raising issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
MattTriano committed Sep 28, 2024
1 parent faa8c45 commit 3fe4f52
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 6 deletions.
4 changes: 2 additions & 2 deletions airflow/dags/tasks/census_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def check_freshness(
return freshness_check


@task.branch(trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED)
@task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def fresher_source_data_available(
freshness_check: CensusDatasetFreshnessCheck, task_logger: Logger
) -> str:
Expand Down Expand Up @@ -418,7 +418,7 @@ def request_and_ingest_dataset(
raise Exception(f"No rows ingested")


@task(trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED)
@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def record_data_update(conn_id: str, task_logger: Logger, **kwargs) -> str:
ti = kwargs["ti"]
freshness_check = ti.xcom_pull(
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/tasks/socrata_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def check_table_metadata(
return metadata_3


@task.branch(trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED)
@task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def fresher_source_data_available(
socrata_metadata: SocrataTableMetadata, conn_id: str, task_logger: Logger
) -> str:
Expand Down Expand Up @@ -180,7 +180,7 @@ def download_fresh_data(task_logger: Logger, **kwargs) -> SocrataTableMetadata:
return socrata_metadata


@task.branch(trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED)
@task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def file_ext_branch_router(socrata_metadata: SocrataTableMetadata) -> str:
dl_format = socrata_metadata.download_format
if dl_format.lower() == "geojson":
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/tasks/tiger_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def check_freshness(
return freshness_check


@task.branch(trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED)
@task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def fresher_source_data_available(task_logger: Logger, **kwargs) -> str:
ti = kwargs["ti"]
freshness_check = ti.xcom_pull(
Expand Down Expand Up @@ -236,7 +236,7 @@ def request_and_ingest_fresh_data(
return "ingested"


@task(trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED)
@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def record_data_update(conn_id: str, task_logger: Logger, **kwargs) -> str:
ti = kwargs["ti"]
freshness_check = ti.xcom_pull(
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ x-airflow-common: &airflow-common
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__LOGGING__LOGGING_LEVEL: 'WARNING'
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC: 30
AIRFLOW__SCHEDULER__SCHEDULER_IDLE_SLEEP_TIME: 5
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
GE_JUPYTER_CMD: "jupyter lab --allow-root --ip 0.0.0.0 --port 18888"
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
Expand Down

0 comments on commit 3fe4f52

Please sign in to comment.