Skip to content

Commit

Permalink
AIP-72: Remove DAG pickling (apache#43667)
Browse files Browse the repository at this point in the history
This was a less used part of Airflow and does not make sense to keep it since we are removing DB access as part of AIP-72, I am removing it here.
This was missed in Airflow 2.0!

* Handle executors in the providers for Airflow <3 support
  • Loading branch information
kaxil authored Nov 5, 2024
1 parent 90b9847 commit b9b0614
Show file tree
Hide file tree
Showing 56 changed files with 1,867 additions and 2,352 deletions.
9 changes: 0 additions & 9 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2911,15 +2911,6 @@ components:
description: |
The last time the DAG was parsed.
*New in version 2.3.0*
last_pickled:
type: string
format: date-time
readOnly: true
nullable: true
description: |
The last time the DAG was pickled.
*New in version 2.3.0*
last_expired:
type: string
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class Meta:
is_paused = auto_field()
is_active = auto_field(dump_only=True)
last_parsed_time = auto_field(dump_only=True)
last_pickled = auto_field(dump_only=True)
last_expired = auto_field(dump_only=True)
default_view = auto_field(dump_only=True)
fileloc = auto_field(dump_only=True)
Expand Down
42 changes: 0 additions & 42 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2370,24 +2370,12 @@ components:
format: date-time
- type: 'null'
title: Last Parsed Time
last_pickled:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Pickled
last_expired:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Expired
pickle_id:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Pickle Id
default_view:
anyOf:
- type: string
Expand Down Expand Up @@ -2541,9 +2529,7 @@ components:
- is_paused
- is_active
- last_parsed_time
- last_pickled
- last_expired
- pickle_id
- default_view
- fileloc
- description
Expand Down Expand Up @@ -2606,24 +2592,12 @@ components:
format: date-time
- type: 'null'
title: Last Parsed Time
last_pickled:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Pickled
last_expired:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Expired
pickle_id:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Pickle Id
default_view:
anyOf:
- type: string
Expand Down Expand Up @@ -2710,9 +2684,7 @@ components:
- is_paused
- is_active
- last_parsed_time
- last_pickled
- last_expired
- pickle_id
- default_view
- fileloc
- description
Expand Down Expand Up @@ -2976,24 +2948,12 @@ components:
format: date-time
- type: 'null'
title: Last Parsed Time
last_pickled:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Pickled
last_expired:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Expired
pickle_id:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Pickle Id
default_view:
anyOf:
- type: string
Expand Down Expand Up @@ -3085,9 +3045,7 @@ components:
- is_paused
- is_active
- last_parsed_time
- last_pickled
- last_expired
- pickle_id
- default_view
- fileloc
- description
Expand Down
2 changes: 0 additions & 2 deletions airflow/api_fastapi/core_api/serializers/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ class DAGResponse(BaseModel):
is_paused: bool
is_active: bool
last_parsed_time: datetime | None
last_pickled: datetime | None
last_expired: datetime | None
pickle_id: datetime | None
default_view: str | None
fileloc: str
description: str | None
Expand Down
18 changes: 0 additions & 18 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,6 @@ def string_lower_type(val):
choices={"check", "ignore", "wait"},
default="check",
)
ARG_SHIP_DAG = Arg(
("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the worker", action="store_true"
)
ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the entire dag (used internally)")
ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg")
ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task index")
ARG_READ_FROM_DB = Arg(("--read-from-db",), help="Read dag from DB instead of dag file", action="store_true")
Expand Down Expand Up @@ -795,16 +791,6 @@ def string_lower_type(val):
type=int,
help="Set the number of runs to execute before exiting",
)
ARG_DO_PICKLE = Arg(
("-p", "--do-pickle"),
default=False,
help=(
"Attempt to pickle the DAG object to send over "
"to the workers, instead of letting workers run their version "
"of the code"
),
action="store_true",
)

ARG_WITHOUT_MINGLE = Arg(
("--without-mingle",),
Expand Down Expand Up @@ -1351,8 +1337,6 @@ class GroupCommand(NamedTuple):
ARG_IGNORE_ALL_DEPENDENCIES,
ARG_IGNORE_DEPENDENCIES,
ARG_DEPENDS_ON_PAST,
ARG_SHIP_DAG,
ARG_PICKLE,
ARG_INTERACTIVE,
ARG_SHUT_DOWN_LOGGING,
ARG_MAP_INDEX,
Expand Down Expand Up @@ -1968,7 +1952,6 @@ class GroupCommand(NamedTuple):
args=(
ARG_SUBDIR,
ARG_NUM_RUNS,
ARG_DO_PICKLE,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
Expand Down Expand Up @@ -2010,7 +1993,6 @@ class GroupCommand(NamedTuple):
ARG_DAEMON,
ARG_SUBDIR,
ARG_NUM_RUNS,
ARG_DO_PICKLE,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
Expand Down
1 change: 0 additions & 1 deletion airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
"is_paused": dag.get_is_paused(),
"is_active": dag.get_is_active(),
"last_parsed_time": None,
"last_pickled": None,
"last_expired": None,
"default_view": dag.default_view,
"fileloc": dag.fileloc,
Expand Down
1 change: 0 additions & 1 deletion airflow/cli/commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
dag_directory=args.subdir,
max_runs=args.num_runs,
dag_ids=[],
pickle_dags=args.do_pickle,
),
)

Expand Down
4 changes: 1 addition & 3 deletions airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@


def _run_scheduler_job(args) -> None:
job_runner = SchedulerJobRunner(
job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle
)
job_runner = SchedulerJobRunner(job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs)
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__)
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check):
Expand Down
27 changes: 3 additions & 24 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from airflow.jobs.job import Job, run_job
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagPickle, TaskInstance
from airflow.models import TaskInstance
from airflow.models.dag import DAG, _run_inline_trigger
from airflow.models.dagrun import DagRun
from airflow.models.param import ParamsDict
Expand All @@ -56,7 +56,6 @@
from airflow.utils.cli import (
get_dag,
get_dag_by_file_location,
get_dag_by_pickle,
get_dags,
should_ignore_depends_on_past,
suppress_logs_and_warning,
Expand Down Expand Up @@ -266,20 +265,6 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None:
This can result in the task being started by another host if the executor implementation does.
"""
pickle_id = None
if args.ship_dag:
try:
# Running remotely, so pickling the DAG
with create_session() as session:
pickle = DagPickle(dag)
session.add(pickle)
pickle_id = pickle.id
# TODO: This should be written to a log
print(f"Pickled dag {dag} as pickle_id: {pickle_id}")
except Exception as e:
print("Could not pickle the DAG")
print(e)
raise e
if ti.executor:
executor = ExecutorLoader.load_executor(ti.executor)
else:
Expand All @@ -290,7 +275,6 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None:
executor.queue_task_instance(
ti,
mark_success=args.mark_success,
pickle_id=pickle_id,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=should_ignore_depends_on_past(args),
wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"),
Expand All @@ -311,7 +295,6 @@ def _run_task_by_local_task_job(args, ti: TaskInstance | TaskInstancePydantic) -
job=Job(dag_id=ti.dag_id),
task_instance=ti,
mark_success=args.mark_success,
pickle_id=args.pickle,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=should_ignore_depends_on_past(args),
wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"),
Expand Down Expand Up @@ -435,8 +418,7 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:
f"You provided the option {unsupported_flags}. "
"Delete it to execute the command."
)
if dag and args.pickle:
raise AirflowException("You cannot use the --pickle option when using DAG.cli() method.")

if args.cfg_path:
with open(args.cfg_path) as conf_file:
conf_dict = json.load(conf_file)
Expand All @@ -451,10 +433,7 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:

get_listener_manager().hook.on_starting(component=TaskCommandMarker())

if args.pickle:
print(f"Loading pickle id: {args.pickle}")
_dag = get_dag_by_pickle(args.pickle)
elif not dag:
if not dag:
_dag = get_dag(args.subdir, args.dag_id, args.read_from_db)
else:
_dag = dag
Expand Down
13 changes: 1 addition & 12 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
for unlimited.
:param processor_timeout: How long to wait before timing out a DAG file processor
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:param pickle_dags: whether to pickle DAGs.
:param async_mode: Whether to start agent in async mode
"""

Expand All @@ -127,15 +126,13 @@ def __init__(
max_runs: int,
processor_timeout: timedelta,
dag_ids: list[str] | None,
pickle_dags: bool,
async_mode: bool,
):
super().__init__()
self._dag_directory: os.PathLike = dag_directory
self._max_runs = max_runs
self._processor_timeout = processor_timeout
self._dag_ids = dag_ids
self._pickle_dags = pickle_dags
self._async_mode = async_mode
# Map from file path to the processor
self._processors: dict[str, DagFileProcessorProcess] = {}
Expand Down Expand Up @@ -163,7 +160,6 @@ def start(self) -> None:
self._processor_timeout,
child_signal_conn,
self._dag_ids,
self._pickle_dags,
self._async_mode,
),
)
Expand Down Expand Up @@ -223,7 +219,6 @@ def _run_processor_manager(
processor_timeout: timedelta,
signal_conn: MultiprocessingConnection,
dag_ids: list[str] | None,
pickle_dags: bool,
async_mode: bool,
) -> None:
# Make this process start as a new process group - that makes it easy
Expand All @@ -240,7 +235,6 @@ def _run_processor_manager(
max_runs=max_runs,
processor_timeout=processor_timeout,
dag_ids=dag_ids,
pickle_dags=pickle_dags,
signal_conn=signal_conn,
async_mode=async_mode,
)
Expand Down Expand Up @@ -353,7 +347,6 @@ class DagFileProcessorManager(LoggingMixin):
:param processor_timeout: How long to wait before timing out a DAG file processor
:param signal_conn: connection to communicate signal with processor agent.
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:param pickle_dags: whether to pickle DAGs.
:param async_mode: whether to start the manager in async mode
"""

Expand All @@ -372,7 +365,6 @@ def __init__(
max_runs: int,
processor_timeout: timedelta,
dag_ids: list[str] | None,
pickle_dags: bool,
signal_conn: MultiprocessingConnection | None = None,
async_mode: bool = True,
):
Expand All @@ -383,7 +375,6 @@ def __init__(
self._max_runs = max_runs
# signal_conn is None for dag_processor_standalone mode.
self._direct_scheduler_conn = signal_conn
self._pickle_dags = pickle_dags
self._dag_ids = dag_ids
self._async_mode = async_mode
self._parsing_start_time: float | None = None
Expand Down Expand Up @@ -1191,11 +1182,10 @@ def collect_results(self) -> None:
self.log.debug("%s file paths queued for processing", len(self._file_path_queue))

@staticmethod
def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_requests):
def _create_process(file_path, dag_ids, dag_directory, callback_requests):
"""Create DagFileProcessorProcess instance."""
return DagFileProcessorProcess(
file_path=file_path,
pickle_dags=pickle_dags,
dag_ids=dag_ids,
dag_directory=dag_directory,
callback_requests=callback_requests,
Expand All @@ -1217,7 +1207,6 @@ def start_new_processes(self):
callback_to_execute_for_file = self._callback_to_execute[file_path]
processor = self._create_process(
file_path,
self._pickle_dags,
self._dag_ids,
self.get_dag_directory(),
callback_to_execute_for_file,
Expand Down
Loading

0 comments on commit b9b0614

Please sign in to comment.