From b9b06140491d55878954b1a490c76ce7593b6357 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 5 Nov 2024 09:06:04 +0000 Subject: [PATCH] AIP-72: Remove DAG pickling (#43667) This was a less used part of Airflow and does not make sense to keep it since we are removing DB access as part of AIP-72, I am removing it here. This was missed in Airflow 2.0! * Handle executors in the providers for Airflow <3 support --- airflow/api_connexion/openapi/v1.yaml | 9 - airflow/api_connexion/schemas/dag_schema.py | 1 - .../core_api/openapi/v1-generated.yaml | 42 - .../api_fastapi/core_api/serializers/dags.py | 2 - airflow/cli/cli_config.py | 18 - airflow/cli/commands/dag_command.py | 1 - airflow/cli/commands/dag_processor_command.py | 1 - airflow/cli/commands/scheduler_command.py | 4 +- airflow/cli/commands/task_command.py | 27 +- airflow/dag_processing/manager.py | 13 +- airflow/dag_processing/processor.py | 32 +- airflow/executors/base_executor.py | 3 - airflow/executors/debug_executor.py | 1 - airflow/executors/local_executor.py | 1 - airflow/executors/sequential_executor.py | 2 - airflow/jobs/local_task_job_runner.py | 2 - airflow/jobs/scheduler_job_runner.py | 10 - .../versions/0046_3_0_0_drop_dag_pickling.py | 66 + airflow/models/__init__.py | 3 - airflow/models/dag.py | 44 - airflow/models/dagpickle.py | 56 - airflow/models/taskinstance.py | 20 +- airflow/serialization/pydantic/dag.py | 2 - .../serialization/pydantic/taskinstance.py | 2 - airflow/serialization/serialized_objects.py | 4 - airflow/task/standard_task_runner.py | 1 - .../ui/openapi-gen/requests/schemas.gen.ts | 78 - airflow/ui/openapi-gen/requests/types.gen.ts | 6 - .../ui/src/pages/DagsList/DagCard.test.tsx | 2 - airflow/utils/cli.py | 16 - airflow/utils/db.py | 2 +- airflow/www/static/js/types/api-generated.ts | 7 - dev/perf/scheduler_dag_execution_timing.py | 4 +- dev/perf/sql_queries.py | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 3492 ++++++++--------- docs/apache-airflow/migrations-ref.rst | 4 +- newsfragments/aip-72.significant.rst | 4 + .../executors/celery_kubernetes_executor.py | 10 +- .../executors/local_kubernetes_executor.py | 9 +- .../celery/executors/test_celery_executor.py | 3 - .../executors/test_kubernetes_executor.py | 3 - .../endpoints/test_dag_endpoint.py | 32 - .../api_connexion/schemas/test_dag_schema.py | 3 - .../core_api/routes/public/test_dags.py | 4 - tests/cli/commands/test_task_command.py | 30 - tests/dag_processing/test_job_runner.py | 88 +- tests/dag_processing/test_processor.py | 11 +- tests/executors/test_base_executor.py | 4 - tests/executors/test_local_executor.py | 3 - tests/executors/test_sequential_executor.py | 3 - .../test_dag_import_error_listener.py | 2 +- tests/models/test_dag.py | 7 - tests/utils/test_cli_util.py | 18 +- tests/utils/test_db_cleanup.py | 1 - tests/www/views/test_views_home.py | 2 +- 56 files changed, 1867 insertions(+), 2352 deletions(-) create mode 100644 airflow/migrations/versions/0046_3_0_0_drop_dag_pickling.py delete mode 100644 airflow/models/dagpickle.py diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index c884c1595411..41d469cd207e 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -2911,15 +2911,6 @@ components: description: | The last time the DAG was parsed. - *New in version 2.3.0* - last_pickled: - type: string - format: date-time - readOnly: true - nullable: true - description: | - The last time the DAG was pickled. - *New in version 2.3.0* last_expired: type: string diff --git a/airflow/api_connexion/schemas/dag_schema.py b/airflow/api_connexion/schemas/dag_schema.py index f22812abd111..9f75f4dad52f 100644 --- a/airflow/api_connexion/schemas/dag_schema.py +++ b/airflow/api_connexion/schemas/dag_schema.py @@ -54,7 +54,6 @@ class Meta: is_paused = auto_field() is_active = auto_field(dump_only=True) last_parsed_time = auto_field(dump_only=True) - last_pickled = auto_field(dump_only=True) last_expired = auto_field(dump_only=True) default_view = auto_field(dump_only=True) fileloc = auto_field(dump_only=True) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index abd7f0baf5ed..3dcff4b2d066 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2370,24 +2370,12 @@ components: format: date-time - type: 'null' title: Last Parsed Time - last_pickled: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Last Pickled last_expired: anyOf: - type: string format: date-time - type: 'null' title: Last Expired - pickle_id: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Pickle Id default_view: anyOf: - type: string @@ -2541,9 +2529,7 @@ components: - is_paused - is_active - last_parsed_time - - last_pickled - last_expired - - pickle_id - default_view - fileloc - description @@ -2606,24 +2592,12 @@ components: format: date-time - type: 'null' title: Last Parsed Time - last_pickled: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Last Pickled last_expired: anyOf: - type: string format: date-time - type: 'null' title: Last Expired - pickle_id: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Pickle Id default_view: anyOf: - type: string @@ -2710,9 +2684,7 @@ components: - is_paused - is_active - last_parsed_time - - last_pickled - last_expired - - pickle_id - default_view - fileloc - description @@ -2976,24 +2948,12 @@ components: format: date-time - type: 'null' title: Last Parsed Time - last_pickled: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Last Pickled last_expired: anyOf: - type: string format: date-time - type: 'null' title: Last Expired - pickle_id: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Pickle Id default_view: anyOf: - type: string @@ -3085,9 +3045,7 @@ components: - is_paused - is_active - last_parsed_time - - last_pickled - last_expired - - pickle_id - default_view - fileloc - description diff --git a/airflow/api_fastapi/core_api/serializers/dags.py b/airflow/api_fastapi/core_api/serializers/dags.py index 6e2c3933e176..27cc3ad47356 100644 --- a/airflow/api_fastapi/core_api/serializers/dags.py +++ b/airflow/api_fastapi/core_api/serializers/dags.py @@ -43,9 +43,7 @@ class DAGResponse(BaseModel): is_paused: bool is_active: bool last_parsed_time: datetime | None - last_pickled: datetime | None last_expired: datetime | None - pickle_id: datetime | None default_view: str | None fileloc: str description: str | None diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 06ac2f7bd817..e93d5e25c631 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -576,10 +576,6 @@ def string_lower_type(val): choices={"check", "ignore", "wait"}, default="check", ) -ARG_SHIP_DAG = Arg( - ("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the worker", action="store_true" -) -ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the entire dag (used internally)") ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg") ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task index") ARG_READ_FROM_DB = Arg(("--read-from-db",), help="Read dag from DB instead of dag file", action="store_true") @@ -795,16 +791,6 @@ def string_lower_type(val): type=int, help="Set the number of runs to execute before exiting", ) -ARG_DO_PICKLE = Arg( - ("-p", "--do-pickle"), - default=False, - help=( - "Attempt to pickle the DAG object to send over " - "to the workers, instead of letting workers run their version " - "of the code" - ), - action="store_true", -) ARG_WITHOUT_MINGLE = Arg( ("--without-mingle",), @@ -1351,8 +1337,6 @@ class GroupCommand(NamedTuple): ARG_IGNORE_ALL_DEPENDENCIES, ARG_IGNORE_DEPENDENCIES, ARG_DEPENDS_ON_PAST, - ARG_SHIP_DAG, - ARG_PICKLE, ARG_INTERACTIVE, ARG_SHUT_DOWN_LOGGING, ARG_MAP_INDEX, @@ -1968,7 +1952,6 @@ class GroupCommand(NamedTuple): args=( ARG_SUBDIR, ARG_NUM_RUNS, - ARG_DO_PICKLE, ARG_PID, ARG_DAEMON, ARG_STDOUT, @@ -2010,7 +1993,6 @@ class GroupCommand(NamedTuple): ARG_DAEMON, ARG_SUBDIR, ARG_NUM_RUNS, - ARG_DO_PICKLE, ARG_STDOUT, ARG_STDERR, ARG_LOG_FILE, diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 92d1825dc627..dfff75ee2d6c 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -225,7 +225,6 @@ def _get_dagbag_dag_details(dag: DAG) -> dict: "is_paused": dag.get_is_paused(), "is_active": dag.get_is_active(), "last_parsed_time": None, - "last_pickled": None, "last_expired": None, "default_view": dag.default_view, "fileloc": dag.fileloc, diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py index 8ec173ba5202..eea1c0db20dc 100644 --- a/airflow/cli/commands/dag_processor_command.py +++ b/airflow/cli/commands/dag_processor_command.py @@ -48,7 +48,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner: dag_directory=args.subdir, max_runs=args.num_runs, dag_ids=[], - pickle_dags=args.do_pickle, ), ) diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index 96cfe1e2852f..4c4b751e2c50 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -39,9 +39,7 @@ def _run_scheduler_job(args) -> None: - job_runner = SchedulerJobRunner( - job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle - ) + job_runner = SchedulerJobRunner(job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs) ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__) enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK") with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check): diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 03d2737072f3..e14c18399555 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -42,7 +42,7 @@ from airflow.jobs.job import Job, run_job from airflow.jobs.local_task_job_runner import LocalTaskJobRunner from airflow.listeners.listener import get_listener_manager -from airflow.models import DagPickle, TaskInstance +from airflow.models import TaskInstance from airflow.models.dag import DAG, _run_inline_trigger from airflow.models.dagrun import DagRun from airflow.models.param import ParamsDict @@ -56,7 +56,6 @@ from airflow.utils.cli import ( get_dag, get_dag_by_file_location, - get_dag_by_pickle, get_dags, should_ignore_depends_on_past, suppress_logs_and_warning, @@ -266,20 +265,6 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None: This can result in the task being started by another host if the executor implementation does. """ - pickle_id = None - if args.ship_dag: - try: - # Running remotely, so pickling the DAG - with create_session() as session: - pickle = DagPickle(dag) - session.add(pickle) - pickle_id = pickle.id - # TODO: This should be written to a log - print(f"Pickled dag {dag} as pickle_id: {pickle_id}") - except Exception as e: - print("Could not pickle the DAG") - print(e) - raise e if ti.executor: executor = ExecutorLoader.load_executor(ti.executor) else: @@ -290,7 +275,6 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None: executor.queue_task_instance( ti, mark_success=args.mark_success, - pickle_id=pickle_id, ignore_all_deps=args.ignore_all_dependencies, ignore_depends_on_past=should_ignore_depends_on_past(args), wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"), @@ -311,7 +295,6 @@ def _run_task_by_local_task_job(args, ti: TaskInstance | TaskInstancePydantic) - job=Job(dag_id=ti.dag_id), task_instance=ti, mark_success=args.mark_success, - pickle_id=args.pickle, ignore_all_deps=args.ignore_all_dependencies, ignore_depends_on_past=should_ignore_depends_on_past(args), wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"), @@ -435,8 +418,7 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None: f"You provided the option {unsupported_flags}. " "Delete it to execute the command." ) - if dag and args.pickle: - raise AirflowException("You cannot use the --pickle option when using DAG.cli() method.") + if args.cfg_path: with open(args.cfg_path) as conf_file: conf_dict = json.load(conf_file) @@ -451,10 +433,7 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None: get_listener_manager().hook.on_starting(component=TaskCommandMarker()) - if args.pickle: - print(f"Loading pickle id: {args.pickle}") - _dag = get_dag_by_pickle(args.pickle) - elif not dag: + if not dag: _dag = get_dag(args.subdir, args.dag_id, args.read_from_db) else: _dag = dag diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 05fb72daee60..0f3441a5d4d1 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -117,7 +117,6 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin): for unlimited. :param processor_timeout: How long to wait before timing out a DAG file processor :param dag_ids: if specified, only schedule tasks with these DAG IDs - :param pickle_dags: whether to pickle DAGs. :param async_mode: Whether to start agent in async mode """ @@ -127,7 +126,6 @@ def __init__( max_runs: int, processor_timeout: timedelta, dag_ids: list[str] | None, - pickle_dags: bool, async_mode: bool, ): super().__init__() @@ -135,7 +133,6 @@ def __init__( self._max_runs = max_runs self._processor_timeout = processor_timeout self._dag_ids = dag_ids - self._pickle_dags = pickle_dags self._async_mode = async_mode # Map from file path to the processor self._processors: dict[str, DagFileProcessorProcess] = {} @@ -163,7 +160,6 @@ def start(self) -> None: self._processor_timeout, child_signal_conn, self._dag_ids, - self._pickle_dags, self._async_mode, ), ) @@ -223,7 +219,6 @@ def _run_processor_manager( processor_timeout: timedelta, signal_conn: MultiprocessingConnection, dag_ids: list[str] | None, - pickle_dags: bool, async_mode: bool, ) -> None: # Make this process start as a new process group - that makes it easy @@ -240,7 +235,6 @@ def _run_processor_manager( max_runs=max_runs, processor_timeout=processor_timeout, dag_ids=dag_ids, - pickle_dags=pickle_dags, signal_conn=signal_conn, async_mode=async_mode, ) @@ -353,7 +347,6 @@ class DagFileProcessorManager(LoggingMixin): :param processor_timeout: How long to wait before timing out a DAG file processor :param signal_conn: connection to communicate signal with processor agent. :param dag_ids: if specified, only schedule tasks with these DAG IDs - :param pickle_dags: whether to pickle DAGs. :param async_mode: whether to start the manager in async mode """ @@ -372,7 +365,6 @@ def __init__( max_runs: int, processor_timeout: timedelta, dag_ids: list[str] | None, - pickle_dags: bool, signal_conn: MultiprocessingConnection | None = None, async_mode: bool = True, ): @@ -383,7 +375,6 @@ def __init__( self._max_runs = max_runs # signal_conn is None for dag_processor_standalone mode. self._direct_scheduler_conn = signal_conn - self._pickle_dags = pickle_dags self._dag_ids = dag_ids self._async_mode = async_mode self._parsing_start_time: float | None = None @@ -1191,11 +1182,10 @@ def collect_results(self) -> None: self.log.debug("%s file paths queued for processing", len(self._file_path_queue)) @staticmethod - def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_requests): + def _create_process(file_path, dag_ids, dag_directory, callback_requests): """Create DagFileProcessorProcess instance.""" return DagFileProcessorProcess( file_path=file_path, - pickle_dags=pickle_dags, dag_ids=dag_ids, dag_directory=dag_directory, callback_requests=callback_requests, @@ -1217,7 +1207,6 @@ def start_new_processes(self): callback_to_execute_for_file = self._callback_to_execute[file_path] processor = self._create_process( file_path, - self._pickle_dags, self._dag_ids, self.get_dag_directory(), callback_to_execute_for_file, diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 8694f5890ccd..394e09245127 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -91,7 +91,6 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin): Runs DAG processing in a separate process using DagFileProcessor. :param file_path: a Python file containing Airflow DAG definitions - :param pickle_dags: whether to serialize the DAG objects to the DB :param dag_ids: If specified, only look at these DAG ID's :param callback_requests: failure callback to execute """ @@ -102,14 +101,12 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin): def __init__( self, file_path: str, - pickle_dags: bool, dag_ids: list[str] | None, dag_directory: str, callback_requests: list[CallbackRequest], ): super().__init__() self._file_path = file_path - self._pickle_dags = pickle_dags self._dag_ids = dag_ids self._dag_directory = dag_directory self._callback_requests = callback_requests @@ -138,7 +135,6 @@ def _run_file_processor( result_channel: MultiprocessingConnection, parent_channel: MultiprocessingConnection, file_path: str, - pickle_dags: bool, dag_ids: list[str] | None, thread_name: str, dag_directory: str, @@ -150,8 +146,6 @@ def _run_file_processor( :param result_channel: the connection to use for passing back the result :param parent_channel: the parent end of the channel to close in the child :param file_path: the file to process - :param pickle_dags: whether to pickle the DAGs found in the file and - save them to the DB :param dag_ids: if specified, only examine DAG ID's that are in this list :param thread_name: the name to use for the process that is launched @@ -182,7 +176,6 @@ def _handle_dag_file_processing(): dag_file_processor = DagFileProcessor(dag_ids=dag_ids, dag_directory=dag_directory, log=log) result: tuple[int, int, int] = dag_file_processor.process_file( file_path=file_path, - pickle_dags=pickle_dags, callback_requests=callback_requests, ) result_channel.send(result) @@ -245,7 +238,6 @@ def start(self) -> None: _child_channel, _parent_channel, self.file_path, - self._pickle_dags, self._dag_ids, f"DagFileProcessor{self._instance_id}", self._dag_directory, @@ -416,8 +408,7 @@ class DagFileProcessor(LoggingMixin): 1. Execute the file and look for DAG objects in the namespace. 2. Execute any Callbacks if passed to DagFileProcessor.process_file 3. Serialize the DAGs and save it to DB (or update existing record in the DB). - 4. Pickle the DAG and save it to the DB (if necessary). - 5. Record any errors importing the file into ORM + 4. Record any errors importing the file into ORM Returns a tuple of 'number of dags found' and 'the count of import errors' @@ -709,7 +700,6 @@ def process_file( self, file_path: str, callback_requests: list[CallbackRequest], - pickle_dags: bool = False, session: Session = NEW_SESSION, ) -> tuple[int, int, int]: """ @@ -720,14 +710,11 @@ def process_file( 1. Execute the file and look for DAG objects in the namespace. 2. Execute any Callbacks if passed to this method. 3. Serialize the DAGs and save it to DB (or update existing record in the DB). - 4. Pickle the DAG and save it to the DB (if necessary). - 5. Mark any DAGs which are no longer present as inactive - 6. Record any errors importing the file into ORM + 4. Mark any DAGs which are no longer present as inactive + 5. Record any errors importing the file into ORM :param file_path: the path to the Python file that should be executed :param callback_requests: failure callback to execute - :param pickle_dags: whether serialize the DAGs found in the file and - save them to the db :return: number of dags found, count of import errors, last number of db queries """ self.log.info("Processing file %s for tasks to queue", file_path) @@ -761,7 +748,6 @@ def process_file( serialize_errors = DagFileProcessor.save_dag_to_db( dags=dagbag.dags, dag_directory=self._dag_directory, - pickle_dags=pickle_dags, ) dagbag.import_errors.update(dict(serialize_errors)) @@ -795,20 +781,8 @@ def _cache_last_num_of_db_queries(self, query_counter: _QueryCounter | None = No def save_dag_to_db( dags: dict[str, DAG], dag_directory: str, - pickle_dags: bool = False, session=NEW_SESSION, ): import_errors = DagBag._sync_to_db(dags=dags, processor_subdir=dag_directory, session=session) session.commit() - - dag_ids = list(dags) - - if pickle_dags: - paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dag_ids) - - unpaused_dags: list[DAG] = [dag for dag_id, dag in dags.items() if dag_id not in paused_dag_ids] - - for dag in unpaused_dags: - dag.pickle(session) - return import_errors diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 87f496fb0540..fba6d96969a1 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -112,7 +112,6 @@ class BaseExecutor(LoggingMixin): """ supports_ad_hoc_ti_run: bool = False - supports_pickling: bool = True supports_sentry: bool = False is_local: bool = False @@ -172,7 +171,6 @@ def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, - pickle_id: int | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, wait_for_past_depends_before_skipping: bool = False, @@ -196,7 +194,6 @@ def queue_task_instance( ignore_task_deps=ignore_task_deps, ignore_ti_state=ignore_ti_state, pool=pool, - pickle_id=pickle_id, # cfg_path is needed to propagate the config values if using impersonation # (run_as_user), given that there are different code paths running tasks. # https://github.com/apache/airflow/pull/2991 diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index aead7e2b2c11..525c80791e37 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -97,7 +97,6 @@ def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, - pickle_id: int | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, wait_for_past_depends_before_skipping: bool = False, diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index f28e525ec3ac..a39a206af507 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -223,7 +223,6 @@ class LocalExecutor(BaseExecutor): """ is_local: bool = True - supports_pickling: bool = False serve_logs: bool = True diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 0b4cbdea9dd4..1fca95acd3b0 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -48,8 +48,6 @@ class SequentialExecutor(BaseExecutor): SequentialExecutor alongside sqlite as you first install it. """ - supports_pickling: bool = False - is_local: bool = True is_single_threaded: bool = True is_production: bool = False diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index c900c88674e7..599493ea58c4 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -90,7 +90,6 @@ def __init__( ignore_task_deps: bool = False, ignore_ti_state: bool = False, mark_success: bool = False, - pickle_id: int | None = None, pool: str | None = None, external_executor_id: str | None = None, ): @@ -103,7 +102,6 @@ def __init__( self.ignore_task_deps = ignore_task_deps self.ignore_ti_state = ignore_ti_state self.pool = pool - self.pickle_id = pickle_id self.mark_success = mark_success self.external_executor_id = external_executor_id # terminating state is used so that a job don't try to diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 39e4e35087bc..fb85a4a73cc3 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -156,8 +156,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): -1 for unlimited times. :param scheduler_idle_sleep_time: The number of seconds to wait between polls of running processors - :param do_pickle: once a DAG object is obtained by executing the Python - file, whether to serialize the DAG object to the DB :param log: override the default Logger """ @@ -170,7 +168,6 @@ def __init__( num_runs: int = conf.getint("scheduler", "num_runs"), num_times_parse_dags: int = -1, scheduler_idle_sleep_time: float = conf.getfloat("scheduler", "scheduler_idle_sleep_time"), - do_pickle: bool = False, log: logging.Logger | None = None, ): super().__init__(job) @@ -187,8 +184,6 @@ def __init__( self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration") self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout") - self.do_pickle = do_pickle - self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc") if self._enable_tracemalloc: import tracemalloc @@ -639,7 +634,6 @@ def _enqueue_task_instances_with_queued_state( continue command = ti.command_as_list( local=True, - pickle_id=ti.dag_model.pickle_id, ) priority = ti.priority_weight @@ -923,9 +917,6 @@ def _execute(self) -> int | None: executor_class, _ = ExecutorLoader.import_default_executor_cls() - # DAGs can be pickled for easier remote execution by some executors - pickle_dags = self.do_pickle and executor_class.supports_pickling - self.log.info("Processing each file at most %s times", self.num_times_parse_dags) # When using sqlite, we do not use async_mode @@ -940,7 +931,6 @@ def _execute(self) -> int | None: max_runs=self.num_times_parse_dags, processor_timeout=processor_timeout, dag_ids=[], - pickle_dags=pickle_dags, async_mode=async_mode, ) diff --git a/airflow/migrations/versions/0046_3_0_0_drop_dag_pickling.py b/airflow/migrations/versions/0046_3_0_0_drop_dag_pickling.py new file mode 100644 index 000000000000..599759fa9f86 --- /dev/null +++ b/airflow/migrations/versions/0046_3_0_0_drop_dag_pickling.py @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Drop DAG pickling. + +Revision ID: d03e4a635aa3 +Revises: d8cd3297971e +Create Date: 2024-11-04 22:07:51.329843 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +from airflow.migrations.db_types import TIMESTAMP + +# revision identifiers, used by Alembic. +revision = "d03e4a635aa3" +down_revision = "d8cd3297971e" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Drop DAG pickling.""" + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.drop_column("pickle_id") + batch_op.drop_column("last_pickled") + + op.drop_table("dag_pickle") + + +def downgrade(): + """Re-Add DAG pickling.""" + import dill + + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.add_column(sa.Column("last_pickled", sa.TIMESTAMP(), nullable=True)) + batch_op.add_column(sa.Column("pickle_id", sa.INTEGER(), nullable=True)) + + op.create_table( + "dag_pickle", + sa.Column("id", sa.Integer(), primary_key=True, nullable=False), + sa.Column("pickle", sa.PickleType(pickler=dill), nullable=True), + sa.Column("created_dttm", TIMESTAMP(timezone=True), nullable=True), + sa.Column("pickle_hash", sa.BigInteger, nullable=True), + ) diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 7e71dddc65df..1ab4e5584c97 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -30,7 +30,6 @@ "DagBag", "DagWarning", "DagModel", - "DagPickle", "DagRun", "DagTag", "DbCallbackRequest", @@ -90,7 +89,6 @@ def __getattr__(name): "Connection": "airflow.models.connection", "DagBag": "airflow.models.dagbag", "DagModel": "airflow.models.dag", - "DagPickle": "airflow.models.dagpickle", "DagRun": "airflow.models.dagrun", "DagTag": "airflow.models.dag", "DagWarning": "airflow.models.dagwarning", @@ -119,7 +117,6 @@ def __getattr__(name): from airflow.models.connection import Connection from airflow.models.dag import DAG, DagModel, DagTag from airflow.models.dagbag import DagBag - from airflow.models.dagpickle import DagPickle from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 851d2a512934..337fc5c8163e 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -22,10 +22,8 @@ import functools import logging import pathlib -import pickle import sys import time -import traceback from collections import defaultdict from contextlib import ExitStack from datetime import datetime, timedelta @@ -88,7 +86,6 @@ from airflow.models.base import Base, StringID from airflow.models.baseoperator import BaseOperator from airflow.models.dagcode import DagCode -from airflow.models.dagpickle import DagPickle from airflow.models.dagrun import RUN_ID_REGEX, DagRun from airflow.models.taskinstance import ( Context, @@ -739,14 +736,6 @@ def dag_id(self, value: str) -> None: def timetable_summary(self) -> str: return self.timetable.summary - @property - def pickle_id(self) -> int | None: - return self._pickle_id - - @pickle_id.setter - def pickle_id(self, value: int) -> None: - self._pickle_id = value - @property def relative_fileloc(self) -> pathlib.Path: """File location of the importable dag 'file' relative to the configured DAGs folder.""" @@ -1549,35 +1538,6 @@ def clear_dags( print("Cancelled, nothing was cleared.") return count - def pickle_info(self): - d = {} - d["is_picklable"] = True - try: - dttm = timezone.utcnow() - pickled = pickle.dumps(self) - d["pickle_len"] = len(pickled) - d["pickling_duration"] = str(timezone.utcnow() - dttm) - except Exception as e: - self.log.debug(e) - d["is_picklable"] = False - d["stacktrace"] = traceback.format_exc() - return d - - @provide_session - def pickle(self, session=NEW_SESSION) -> DagPickle: - dag = session.scalar(select(DagModel).where(DagModel.dag_id == self.dag_id).limit(1)) - dp = None - if dag and dag.pickle_id: - dp = session.scalar(select(DagPickle).where(DagPickle.id == dag.pickle_id).limit(1)) - if not dp or dp.pickle != self: - dp = DagPickle(dag=self) - session.add(dp) - self.last_pickled = timezone.utcnow() - session.commit() - self.pickle_id = dp.id - - return dp - def cli(self): """Exposes a CLI specific to this DAG.""" check_cycle(self) @@ -2041,13 +2001,9 @@ class DagModel(Base): is_active = Column(Boolean, default=False) # Last time the scheduler started last_parsed_time = Column(UtcDateTime) - # Last time this DAG was pickled - last_pickled = Column(UtcDateTime) # Time when the DAG last received a refresh signal # (e.g. the DAG's "refresh" button was clicked in the web UI) last_expired = Column(UtcDateTime) - # Foreign key to the latest pickle_id - pickle_id = Column(Integer) # The location of the file containing the DAG object # Note: Do not depend on fileloc pointing to a file; in the case of a # packaged DAG, it will point to the subpath of the DAG within the diff --git a/airflow/models/dagpickle.py b/airflow/models/dagpickle.py deleted file mode 100644 index c06ef09709f1..000000000000 --- a/airflow/models/dagpickle.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from typing import TYPE_CHECKING - -import dill -from sqlalchemy import BigInteger, Column, Integer, PickleType - -from airflow.models.base import Base -from airflow.utils import timezone -from airflow.utils.sqlalchemy import UtcDateTime - -if TYPE_CHECKING: - from airflow.models.dag import DAG - - -class DagPickle(Base): - """ - Represents a version of a DAG and becomes a source of truth for an execution. - - Dags can originate from different places (user repos, main repo, ...) and also get executed - in different places (different executors). A pickle is a native python serialized object, - and in this case gets stored in the database for the duration of the job. - - The executors pick up the DagPickle id and read the dag definition from the database. - """ - - id = Column(Integer, primary_key=True) - pickle = Column(PickleType(pickler=dill)) - created_dttm = Column(UtcDateTime, default=timezone.utcnow) - pickle_hash = Column(BigInteger) - - __tablename__ = "dag_pickle" - - def __init__(self, dag: DAG) -> None: - self.dag_id = dag.dag_id - if hasattr(dag, "template_env"): - dag.template_env = None # type: ignore[attr-defined] - self.pickle_hash = hash(dag) - self.pickle = dag diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index e86c47778246..dfd776e685a0 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2030,7 +2030,6 @@ def _command_as_list( wait_for_past_depends_before_skipping: bool = False, ignore_ti_state: bool = False, local: bool = False, - pickle_id: int | None = None, raw: bool = False, pool: str | None = None, cfg_path: str | None = None, @@ -2047,14 +2046,11 @@ def _command_as_list( if dag is None: raise ValueError("DagModel is empty") - should_pass_filepath = not pickle_id and dag - path: PurePath | None = None - if should_pass_filepath: - path = dag.relative_fileloc + path = dag.relative_fileloc - if path: - if not path.is_absolute(): - path = "DAGS_FOLDER" / path + if path: + if not path.is_absolute(): + path = "DAGS_FOLDER" / path return TaskInstance.generate_command( ti.dag_id, @@ -2067,7 +2063,6 @@ def _command_as_list( wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, ignore_ti_state=ignore_ti_state, local=local, - pickle_id=pickle_id, file_path=path, raw=raw, pool=pool, @@ -2084,7 +2079,6 @@ def command_as_list( wait_for_past_depends_before_skipping: bool = False, ignore_ti_state: bool = False, local: bool = False, - pickle_id: int | None = None, raw: bool = False, pool: str | None = None, cfg_path: str | None = None, @@ -2103,7 +2097,6 @@ def command_as_list( wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, ignore_ti_state=ignore_ti_state, local=local, - pickle_id=pickle_id, raw=raw, pool=pool, cfg_path=cfg_path, @@ -2121,7 +2114,6 @@ def generate_command( ignore_task_deps: bool = False, ignore_ti_state: bool = False, local: bool = False, - pickle_id: int | None = None, file_path: PurePath | str | None = None, raw: bool = False, pool: str | None = None, @@ -2144,8 +2136,6 @@ def generate_command( and trigger rule :param ignore_ti_state: Ignore the task instance's previous failure/success :param local: Whether to run the task locally - :param pickle_id: If the DAG was serialized to the DB, the ID - associated with the pickled DAG :param file_path: path to the file containing the DAG definition :param raw: raw mode (needs more details) :param pool: the Airflow pool that the task should run in @@ -2155,8 +2145,6 @@ def generate_command( cmd = ["airflow", "tasks", "run", dag_id, task_id, run_id] if mark_success: cmd.extend(["--mark-success"]) - if pickle_id: - cmd.extend(["--pickle", str(pickle_id)]) if ignore_all_deps: cmd.extend(["--ignore-all-dependencies"]) if ignore_task_deps: diff --git a/airflow/serialization/pydantic/dag.py b/airflow/serialization/pydantic/dag.py index 4e37a633da05..83bbea760054 100644 --- a/airflow/serialization/pydantic/dag.py +++ b/airflow/serialization/pydantic/dag.py @@ -80,9 +80,7 @@ class DagModelPydantic(BaseModelPydantic): is_paused: bool = is_paused_at_creation is_active: Optional[bool] = False last_parsed_time: Optional[datetime] - last_pickled: Optional[datetime] last_expired: Optional[datetime] - pickle_id: Optional[int] fileloc: str processor_subdir: Optional[str] owners: Optional[str] diff --git a/airflow/serialization/pydantic/taskinstance.py b/airflow/serialization/pydantic/taskinstance.py index bf121353ca80..d5573922b839 100644 --- a/airflow/serialization/pydantic/taskinstance.py +++ b/airflow/serialization/pydantic/taskinstance.py @@ -486,7 +486,6 @@ def command_as_list( wait_for_past_depends_before_skipping: bool = False, ignore_ti_state: bool = False, local: bool = False, - pickle_id: int | None = None, raw: bool = False, pool: str | None = None, cfg_path: str | None = None, @@ -505,7 +504,6 @@ def command_as_list( wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, ignore_ti_state=ignore_ti_state, local=local, - pickle_id=pickle_id, raw=raw, pool=pool, cfg_path=cfg_path, diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 79403860f5fa..52b0bcb1530a 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1577,10 +1577,6 @@ class SerializedDAG(DAG, BaseSerialization): A stringified DAG can only be used in the scope of scheduler and webserver, because fields that are not serializable, such as functions and customer defined classes, are casted to strings. - - Compared with SimpleDAG: SerializedDAG contains all information for webserver. - Compared with DagPickle: DagPickle contains all information for worker, but some DAGs are - not pickle-able. SerializedDAG works for all DAGs. """ _decorated_fields = {"default_args", "access_control"} diff --git a/airflow/task/standard_task_runner.py b/airflow/task/standard_task_runner.py index a5641002c961..bc846574f024 100644 --- a/airflow/task/standard_task_runner.py +++ b/airflow/task/standard_task_runner.py @@ -99,7 +99,6 @@ def __init__(self, job_runner: LocalTaskJobRunner): self._cfg_path = cfg_path self._command = popen_prepend + self._task_instance.command_as_list( raw=True, - pickle_id=self.job_runner.pickle_id, mark_success=self.job_runner.mark_success, pool=self.job_runner.pool, cfg_path=cfg_path, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index cf51451c98bd..906bb43df988 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -313,18 +313,6 @@ export const $DAGDetailsResponse = { ], title: "Last Parsed Time", }, - last_pickled: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Last Pickled", - }, last_expired: { anyOf: [ { @@ -337,18 +325,6 @@ export const $DAGDetailsResponse = { ], title: "Last Expired", }, - pickle_id: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Pickle Id", - }, default_view: { anyOf: [ { @@ -631,9 +607,7 @@ export const $DAGDetailsResponse = { "is_paused", "is_active", "last_parsed_time", - "last_pickled", "last_expired", - "pickle_id", "default_view", "fileloc", "description", @@ -712,18 +686,6 @@ export const $DAGResponse = { ], title: "Last Parsed Time", }, - last_pickled: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Last Pickled", - }, last_expired: { anyOf: [ { @@ -736,18 +698,6 @@ export const $DAGResponse = { ], title: "Last Expired", }, - pickle_id: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Pickle Id", - }, default_view: { anyOf: [ { @@ -899,9 +849,7 @@ export const $DAGResponse = { "is_paused", "is_active", "last_parsed_time", - "last_pickled", "last_expired", - "pickle_id", "default_view", "fileloc", "description", @@ -1267,18 +1215,6 @@ export const $DAGWithLatestDagRunsResponse = { ], title: "Last Parsed Time", }, - last_pickled: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Last Pickled", - }, last_expired: { anyOf: [ { @@ -1291,18 +1227,6 @@ export const $DAGWithLatestDagRunsResponse = { ], title: "Last Expired", }, - pickle_id: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Pickle Id", - }, default_view: { anyOf: [ { @@ -1461,9 +1385,7 @@ export const $DAGWithLatestDagRunsResponse = { "is_paused", "is_active", "last_parsed_time", - "last_pickled", "last_expired", - "pickle_id", "default_view", "fileloc", "description", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 18d5bc296eb2..afe771e5a1f6 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -82,9 +82,7 @@ export type DAGDetailsResponse = { is_paused: boolean; is_active: boolean; last_parsed_time: string | null; - last_pickled: string | null; last_expired: string | null; - pickle_id: string | null; default_view: string | null; fileloc: string; description: string | null; @@ -143,9 +141,7 @@ export type DAGResponse = { is_paused: boolean; is_active: boolean; last_parsed_time: string | null; - last_pickled: string | null; last_expired: string | null; - pickle_id: string | null; default_view: string | null; fileloc: string; description: string | null; @@ -272,9 +268,7 @@ export type DAGWithLatestDagRunsResponse = { is_paused: boolean; is_active: boolean; last_parsed_time: string | null; - last_pickled: string | null; last_expired: string | null; - pickle_id: string | null; default_view: string | null; fileloc: string; description: string | null; diff --git a/airflow/ui/src/pages/DagsList/DagCard.test.tsx b/airflow/ui/src/pages/DagsList/DagCard.test.tsx index 3e60146baa05..447058633434 100644 --- a/airflow/ui/src/pages/DagsList/DagCard.test.tsx +++ b/airflow/ui/src/pages/DagsList/DagCard.test.tsx @@ -43,7 +43,6 @@ const mockDag = { is_paused: false, last_expired: null, last_parsed_time: "2024-08-22T13:50:10.372238+00:00", - last_pickled: null, latest_dag_runs: [], max_active_runs: 16, max_active_tasks: 16, @@ -53,7 +52,6 @@ const mockDag = { next_dagrun_data_interval_end: "2024-08-23T00:00:00+00:00", next_dagrun_data_interval_start: "2024-08-22T00:00:00+00:00", owners: ["airflow"], - pickle_id: null, tags: [], timetable_description: "", timetable_summary: "", diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py index 1142c5ba0b62..81b09f9d1104 100644 --- a/airflow/utils/cli.py +++ b/airflow/utils/cli.py @@ -32,7 +32,6 @@ from typing import TYPE_CHECKING, Callable, TypeVar, cast import re2 -from sqlalchemy import select from airflow import settings from airflow.api_internal.internal_api_call import InternalApiConfig @@ -41,13 +40,10 @@ from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler from airflow.utils.log.secrets_masker import should_hide_value_for_key from airflow.utils.platform import getuser, is_terminal_support_colors -from airflow.utils.session import NEW_SESSION, provide_session T = TypeVar("T", bound=Callable) if TYPE_CHECKING: - from sqlalchemy.orm import Session - from airflow.models.dag import DAG logger = logging.getLogger(__name__) @@ -274,18 +270,6 @@ def get_dags(subdir: str | None, dag_id: str, use_regex: bool = False): return matched_dags -@provide_session -def get_dag_by_pickle(pickle_id: int, session: Session = NEW_SESSION) -> DAG: - """Fetch DAG from the database using pickling.""" - from airflow.models import DagPickle - - dag_pickle = session.scalar(select(DagPickle).where(DagPickle.id == pickle_id).limit(1)) - if not dag_pickle: - raise AirflowException(f"pickle_id could not be found in DagPickle.id list: {pickle_id}") - pickle_dag = dag_pickle.pickle - return pickle_dag - - def setup_locations(process, pid=None, stdout=None, stderr=None, log=None): """Create logging paths.""" if not stderr: diff --git a/airflow/utils/db.py b/airflow/utils/db.py index dd3e8c5d2002..d23f54068b59 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -97,7 +97,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "d8cd3297971e", + "3.0.0": "d03e4a635aa3", } diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 7526c340b29f..cd602384b846 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1007,13 +1007,6 @@ export interface components { * *New in version 2.3.0* */ last_parsed_time?: string | null; - /** - * Format: date-time - * @description The last time the DAG was pickled. - * - * *New in version 2.3.0* - */ - last_pickled?: string | null; /** * Format: date-time * @description Time when the DAG last received a refresh signal diff --git a/dev/perf/scheduler_dag_execution_timing.py b/dev/perf/scheduler_dag_execution_timing.py index d150eed41df0..cbc4ca6e8fc6 100755 --- a/dev/perf/scheduler_dag_execution_timing.py +++ b/dev/perf/scheduler_dag_execution_timing.py @@ -278,7 +278,7 @@ def main(num_runs, repeat, pre_create_dag_runs, executor_class, dag_ids): executor = ShortCircuitExecutor(dag_ids_to_watch=dag_ids, num_runs=num_runs) scheduler_job = Job(executor=executor) - job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids, do_pickle=False) + job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids) executor.job_runner = job_runner total_tasks = sum(len(dag.tasks) for dag in dags) @@ -301,7 +301,7 @@ def main(num_runs, repeat, pre_create_dag_runs, executor_class, dag_ids): reset_dag(dag, session) executor.reset(dag_ids) scheduler_job = Job(executor=executor) - job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids, do_pickle=False) + job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids) executor.scheduler_job = scheduler_job gc.disable() diff --git a/dev/perf/sql_queries.py b/dev/perf/sql_queries.py index 6303d5b6fcd3..60ca8f33f710 100644 --- a/dev/perf/sql_queries.py +++ b/dev/perf/sql_queries.py @@ -123,7 +123,7 @@ def run_scheduler_job(with_db_reset=False) -> None: if with_db_reset: reset_db() - job_runner = SchedulerJobRunner(job=Job(), subdir=DAG_FOLDER, do_pickle=False, num_runs=3) + job_runner = SchedulerJobRunner(job=Job(), subdir=DAG_FOLDER, num_runs=3) run_job(job=job_runner.job, execute_callable=job_runner._execute) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 8adffd106eae..572ce439c231 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -1d781ee92cc59e7647d7f72ddc542b7f17e03fc8b822950db74415c38279d40f \ No newline at end of file +5ec1019b1b0f43b29fc83638c2a13c0bda90b7e4f0ff542aeab401bbfa9a83e4 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 1b0d5b346c95..ba935dd6c4be 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + log @@ -169,2136 +169,2106 @@ [TEXT] NOT NULL - - -dag_pickle - -dag_pickle - -id - - [INTEGER] - NOT NULL - -created_dttm - - [TIMESTAMP] - -pickle - - [BYTEA] - -pickle_hash - - [BIGINT] - - + connection - -connection - -id - - [INTEGER] - NOT NULL - -conn_id - - [VARCHAR(250)] - NOT NULL - -conn_type - - [VARCHAR(500)] - NOT NULL - -description - - [TEXT] - -extra - - [TEXT] - -host - - [VARCHAR(500)] - -is_encrypted - - [BOOLEAN] - -is_extra_encrypted - - [BOOLEAN] - -login - - [TEXT] - -password - - [TEXT] - -port - - [INTEGER] - -schema - - [VARCHAR(500)] + +connection + +id + + [INTEGER] + NOT NULL + +conn_id + + [VARCHAR(250)] + NOT NULL + +conn_type + + [VARCHAR(500)] + NOT NULL + +description + + [TEXT] + +extra + + [TEXT] + +host + + [VARCHAR(500)] + +is_encrypted + + [BOOLEAN] + +is_extra_encrypted + + [BOOLEAN] + +login + + [TEXT] + +password + + [TEXT] + +port + + [INTEGER] + +schema + + [VARCHAR(500)] - + variable - -variable - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -is_encrypted - - [BOOLEAN] - -key - - [VARCHAR(250)] - -val - - [TEXT] + +variable + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +is_encrypted + + [BOOLEAN] + +key + + [VARCHAR(250)] + +val + + [TEXT] - + import_error - -import_error - -id - - [INTEGER] - NOT NULL - -filename - - [VARCHAR(1024)] - -processor_subdir - - [VARCHAR(2000)] - -stacktrace - - [TEXT] - -timestamp - - [TIMESTAMP] + +import_error + +id + + [INTEGER] + NOT NULL + +filename + + [VARCHAR(1024)] + +processor_subdir + + [VARCHAR(2000)] + +stacktrace + + [TEXT] + +timestamp + + [TIMESTAMP] - + job - -job - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -end_date - - [TIMESTAMP] - -executor_class - - [VARCHAR(500)] - -hostname - - [VARCHAR(500)] - -job_type - - [VARCHAR(30)] - -latest_heartbeat - - [TIMESTAMP] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -unixname - - [VARCHAR(1000)] + +job + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +end_date + + [TIMESTAMP] + +executor_class + + [VARCHAR(500)] + +hostname + + [VARCHAR(500)] + +job_type + + [VARCHAR(30)] + +latest_heartbeat + + [TIMESTAMP] + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +unixname + + [VARCHAR(1000)] - + serialized_dag - -serialized_dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -data - - [JSON] - -data_compressed - - [BYTEA] - -fileloc - - [VARCHAR(2000)] - NOT NULL - -fileloc_hash - - [BIGINT] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -processor_subdir - - [VARCHAR(2000)] + +serialized_dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +data + + [JSON] + +data_compressed + + [BYTEA] + +fileloc + + [VARCHAR(2000)] + NOT NULL + +fileloc_hash + + [BIGINT] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +processor_subdir + + [VARCHAR(2000)] - + asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL - + asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset - -0..N -1 + +0..N +1 - + asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 - + dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset_alias--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 - + asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_alias_asset - -0..N -1 + +0..N +1 - + asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_active - -1 -1 + +1 +1 asset--asset_active - -1 -1 + +1 +1 - + dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--dag_schedule_asset_reference - -0..N -1 + +0..N +1 - + task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--task_outlet_asset_reference - -0..N -1 + +0..N +1 - + asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL asset--asset_dag_run_queue - -0..N -1 + +0..N +1 - + asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 - + dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 - + dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -dag_display_name - - [VARCHAR(2000)] - -default_view - - [VARCHAR(25)] - -description - - [TEXT] - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_active - - [BOOLEAN] - -is_paused - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parsed_time - - [TIMESTAMP] - -last_pickled - - [TIMESTAMP] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -pickle_id - - [INTEGER] - -processor_subdir - - [VARCHAR(2000)] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +dag_display_name + + [VARCHAR(2000)] + +default_view + + [VARCHAR(25)] + +description + + [TEXT] + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_active + + [BOOLEAN] + +is_paused + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parsed_time + + [TIMESTAMP] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +processor_subdir + + [VARCHAR(2000)] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] dag--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 dag--dag_schedule_asset_reference - -0..N -1 + +0..N +1 dag--task_outlet_asset_reference - -0..N -1 + +0..N +1 dag--asset_dag_run_queue - -0..N -1 + +0..N +1 - + dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -1 + +0..N +1 - + dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -1 + +0..N +1 - + dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -1 + +0..N +1 - + log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL - + dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [BYTEA] - -creating_job_id - - [INTEGER] - -dag_hash - - [VARCHAR(32)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [BYTEA] + +creating_job_id + + [INTEGER] + +dag_hash + + [VARCHAR(32)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 - + task_instance - -task_instance - -id - - [UUID] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 - + backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} - + dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 - + task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 - + rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 - + task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSON] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSON] + +length + + [INTEGER] + NOT NULL task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 - + xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [BYTEA] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [BYTEA] task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance_note - -task_instance_note - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 - + task_instance_history - -task_instance_history - -id - - [INTEGER] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +id + + [INTEGER] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 - + backfill - -backfill - -id - - [INTEGER] - NOT NULL - -completed_at - - [TIMESTAMP] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_run_conf - - [JSON] - -from_date - - [TIMESTAMP] - NOT NULL - -is_paused - - [BOOLEAN] - -max_active_runs - - [INTEGER] - NOT NULL - -reprocess_behavior - - [VARCHAR(250)] - NOT NULL - -to_date - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +backfill + +id + + [INTEGER] + NOT NULL + +completed_at + + [TIMESTAMP] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_run_conf + + [JSON] + NOT NULL + +from_date + + [TIMESTAMP] + NOT NULL + +is_paused + + [BOOLEAN] + +max_active_runs + + [INTEGER] + NOT NULL + +reprocess_behavior + + [VARCHAR(250)] + NOT NULL + +to_date + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 - + trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} + + + +alembic_version + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL session - -session - -id - - [INTEGER] - NOT NULL - -data - - [BYTEA] - -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] - - - -alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +session + +id + + [INTEGER] + NOT NULL + +data + + [BYTEA] + +expiry + + [TIMESTAMP] + +session_id + + [VARCHAR(255)] - + ab_user - -ab_user - -id - - [INTEGER] - NOT NULL - -active - - [BOOLEAN] - -changed_by_fk - - [INTEGER] - -changed_on - - [TIMESTAMP] - -created_by_fk - - [INTEGER] - -created_on - - [TIMESTAMP] - -email - - [VARCHAR(512)] - NOT NULL - -fail_login_count - - [INTEGER] - -first_name - - [VARCHAR(256)] - NOT NULL - -last_login - - [TIMESTAMP] - -last_name - - [VARCHAR(256)] - NOT NULL - -login_count - - [INTEGER] - -password - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_user + +id + + [INTEGER] + NOT NULL + +active + + [BOOLEAN] + +changed_by_fk + + [INTEGER] + +changed_on + + [TIMESTAMP] + +created_by_fk + + [INTEGER] + +created_on + + [TIMESTAMP] + +email + + [VARCHAR(512)] + NOT NULL + +fail_login_count + + [INTEGER] + +first_name + + [VARCHAR(256)] + NOT NULL + +last_login + + [TIMESTAMP] + +last_name + + [VARCHAR(256)] + NOT NULL + +login_count + + [INTEGER] + +password + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} - + ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} - + ab_register_user - -ab_register_user - -id - - [INTEGER] - NOT NULL - -email - - [VARCHAR(512)] - NOT NULL - -first_name - - [VARCHAR(256)] - NOT NULL - -last_name - - [VARCHAR(256)] - NOT NULL - -password - - [VARCHAR(256)] - -registration_date - - [TIMESTAMP] - -registration_hash - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_register_user + +id + + [INTEGER] + NOT NULL + +email + + [VARCHAR(512)] + NOT NULL + +first_name + + [VARCHAR(256)] + NOT NULL + +last_name + + [VARCHAR(256)] + NOT NULL + +password + + [VARCHAR(256)] + +registration_date + + [TIMESTAMP] + +registration_hash + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL - + ab_permission - -ab_permission - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL - + ab_permission_view - -ab_permission_view - -id - - [INTEGER] - NOT NULL - -permission_id - - [INTEGER] - -view_menu_id - - [INTEGER] + +ab_permission_view + +id + + [INTEGER] + NOT NULL + +permission_id + + [INTEGER] + +view_menu_id + + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} - + ab_view_menu - -ab_view_menu - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_role - -ab_role - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(64)] - NOT NULL + +ab_role + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(64)] + NOT NULL ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} - + alembic_version_fab - -alembic_version_fab - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version_fab + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index f133a67e08ef..61dde39958e2 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``d8cd3297971e`` (head) | ``5f57a45b8433`` | ``3.0.0`` | Add last_heartbeat_at directly to TI. | +| ``d03e4a635aa3`` (head) | ``d8cd3297971e`` | ``3.0.0`` | Drop DAG pickling. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``d8cd3297971e`` | ``5f57a45b8433`` | ``3.0.0`` | Add last_heartbeat_at directly to TI. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``5f57a45b8433`` | ``486ac7936b78`` | ``3.0.0`` | Drop task_fail table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/newsfragments/aip-72.significant.rst b/newsfragments/aip-72.significant.rst index 6c4467696170..2baafad7ab8b 100644 --- a/newsfragments/aip-72.significant.rst +++ b/newsfragments/aip-72.significant.rst @@ -13,3 +13,7 @@ As part of this change the following breaking changes have occurred: There were two build in options for this, Standard (the default) which used Fork or a new process as appropriate, and CGroupRunner to launch tasks in a new CGroup (not usable inside docker or Kubernetes). With the move of the execution time code into the TaskSDK we are using this opportunity to reduce complexity for seldom used features. + +- Shipping DAGs via pickle is no longer supported + + This was a feature that was not widely used and was a security risk. It has been removed. diff --git a/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py b/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py index acd1afcba995..a8c69871ab9c 100644 --- a/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -56,6 +56,7 @@ class CeleryKubernetesExecutor(BaseExecutor): """ supports_ad_hoc_ti_run: bool = True + # TODO: Remove this flag once providers depend on Airflow 3.0 supports_pickling: bool = True supports_sentry: bool = False @@ -159,7 +160,6 @@ def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, - pickle_id: int | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, wait_for_past_depends_before_skipping: bool = False, @@ -167,6 +167,7 @@ def queue_task_instance( ignore_ti_state: bool = False, pool: str | None = None, cfg_path: str | None = None, + **kwargs, ) -> None: """Queues task instance via celery or kubernetes executor.""" from airflow.models.taskinstance import SimpleTaskInstance @@ -175,10 +176,14 @@ def queue_task_instance( self.log.debug( "Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key ) + + # TODO: Remove this once providers depend on Airflow 3.0 + if not hasattr(task_instance, "pickle_id"): + del kwargs["pickle_id"] + executor.queue_task_instance( task_instance=task_instance, mark_success=mark_success, - pickle_id=pickle_id, ignore_all_deps=ignore_all_deps, ignore_depends_on_past=ignore_depends_on_past, wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, @@ -186,6 +191,7 @@ def queue_task_instance( ignore_ti_state=ignore_ti_state, pool=pool, cfg_path=cfg_path, + **kwargs, ) def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index 63755d3d11a1..d24a59a95d10 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -45,6 +45,7 @@ class LocalKubernetesExecutor(BaseExecutor): """ supports_ad_hoc_ti_run: bool = True + # TODO: Remove this attribute once providers rely on Airflow >=3.0.0 supports_pickling: bool = False supports_sentry: bool = False @@ -146,7 +147,6 @@ def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, - pickle_id: int | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, wait_for_past_depends_before_skipping: bool = False, @@ -154,6 +154,7 @@ def queue_task_instance( ignore_ti_state: bool = False, pool: str | None = None, cfg_path: str | None = None, + **kwargs, ) -> None: """Queues task instance via local or kubernetes executor.""" from airflow.models.taskinstance import SimpleTaskInstance @@ -162,10 +163,13 @@ def queue_task_instance( self.log.debug( "Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key ) + + if not hasattr(task_instance, "pickle_id"): + del kwargs["pickle_id"] + executor.queue_task_instance( task_instance=task_instance, mark_success=mark_success, - pickle_id=pickle_id, ignore_all_deps=ignore_all_deps, ignore_depends_on_past=ignore_depends_on_past, wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, @@ -173,6 +177,7 @@ def queue_task_instance( ignore_ti_state=ignore_ti_state, pool=pool, cfg_path=cfg_path, + **kwargs, ) def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: diff --git a/providers/tests/celery/executors/test_celery_executor.py b/providers/tests/celery/executors/test_celery_executor.py index 71fae6691c6f..2fa72deab0aa 100644 --- a/providers/tests/celery/executors/test_celery_executor.py +++ b/providers/tests/celery/executors/test_celery_executor.py @@ -110,9 +110,6 @@ def teardown_method(self) -> None: db.clear_db_runs() db.clear_db_jobs() - def test_supports_pickling(self): - assert CeleryExecutor.supports_pickling - def test_supports_sentry(self): assert CeleryExecutor.supports_sentry diff --git a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py index 13ca0ed828c6..ea143edd8298 100644 --- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1750,9 +1750,6 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat "Reading from k8s pod logs failed: error_fetching_pod_log", ] - def test_supports_pickling(self): - assert KubernetesExecutor.supports_pickling - def test_supports_sentry(self): assert not KubernetesExecutor.supports_sentry diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index 5249944ea113..1cd014ccb0cf 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -185,7 +185,6 @@ def test_should_respond_200(self): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -224,7 +223,6 @@ def test_should_respond_200_with_schedule_none(self, session): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -311,7 +309,6 @@ def test_should_respond_200(self, url_safe_serializer): "last_expired": None, "last_parsed": last_parsed, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_active_tasks": 16, "max_consecutive_failed_dag_runs": 0, @@ -372,7 +369,6 @@ def test_should_respond_200_with_asset_expression(self, url_safe_serializer): "last_expired": None, "last_parsed": last_parsed, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_consecutive_failed_dag_runs": 0, "max_active_tasks": 16, @@ -428,7 +424,6 @@ def test_should_response_200_with_doc_md_none(self, url_safe_serializer): "last_expired": None, "last_parsed": last_parsed, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_consecutive_failed_dag_runs": 0, "max_active_tasks": 16, @@ -477,7 +472,6 @@ def test_should_response_200_for_null_start_date(self, url_safe_serializer): "last_expired": None, "last_parsed": last_parsed, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_consecutive_failed_dag_runs": 0, "max_active_tasks": 16, @@ -528,7 +522,6 @@ def test_should_respond_200_serialized(self, url_safe_serializer): "is_paused_upon_creation": None, "last_expired": None, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_consecutive_failed_dag_runs": 0, "max_active_tasks": 16, @@ -587,7 +580,6 @@ def test_should_respond_200_serialized(self, url_safe_serializer): "is_paused_upon_creation": None, "last_expired": None, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_consecutive_failed_dag_runs": 0, "max_active_tasks": 16, @@ -699,7 +691,6 @@ def test_should_respond_200(self, session, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -725,7 +716,6 @@ def test_should_respond_200(self, session, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -763,7 +753,6 @@ def test_only_active_true_returns_active_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -802,7 +791,6 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -828,7 +816,6 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -983,7 +970,6 @@ def test_paused_true_returns_paused_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1021,7 +1007,6 @@ def test_paused_false_returns_unpaused_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1059,7 +1044,6 @@ def test_paused_none_returns_all_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1085,7 +1069,6 @@ def test_paused_none_returns_all_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1173,7 +1156,6 @@ def test_should_respond_200_on_patch_is_paused(self, url_safe_serializer, sessio "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1270,7 +1252,6 @@ def test_should_respond_200_with_update_mask(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1363,7 +1344,6 @@ def test_should_respond_200_on_patch_is_paused(self, session, url_safe_serialize "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1389,7 +1369,6 @@ def test_should_respond_200_on_patch_is_paused(self, session, url_safe_serialize "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1440,7 +1419,6 @@ def test_should_respond_200_on_patch_is_paused_using_update_mask(self, session, "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1466,7 +1444,6 @@ def test_should_respond_200_on_patch_is_paused_using_update_mask(self, session, "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1557,7 +1534,6 @@ def test_only_active_true_returns_active_dags(self, url_safe_serializer, session "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1604,7 +1580,6 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer, session): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1630,7 +1605,6 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer, session): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1827,7 +1801,6 @@ def test_should_respond_200_and_pause_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1853,7 +1826,6 @@ def test_should_respond_200_and_pause_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1900,7 +1872,6 @@ def test_should_respond_200_and_pause_dag_pattern(self, session, url_safe_serial "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1926,7 +1897,6 @@ def test_should_respond_200_and_pause_dag_pattern(self, session, url_safe_serial "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1975,7 +1945,6 @@ def test_should_respond_200_and_reverse_ordering(self, session, url_safe_seriali "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -2001,7 +1970,6 @@ def test_should_respond_200_and_reverse_ordering(self, session, url_safe_seriali "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, diff --git a/tests/api_connexion/schemas/test_dag_schema.py b/tests/api_connexion/schemas/test_dag_schema.py index 4a6829a5c831..a14365f07c1e 100644 --- a/tests/api_connexion/schemas/test_dag_schema.py +++ b/tests/api_connexion/schemas/test_dag_schema.py @@ -67,7 +67,6 @@ def test_serialize_test_dag_schema(url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -102,7 +101,6 @@ def test_serialize_test_dag_collection_schema(url_safe_serializer): "last_expired": None, "max_active_tasks": 16, "max_consecutive_failed_dag_runs": 0, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -128,7 +126,6 @@ def test_serialize_test_dag_collection_schema(url_safe_serializer): "last_expired": None, "max_active_tasks": 16, "max_consecutive_failed_dag_runs": 0, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, diff --git a/tests/api_fastapi/core_api/routes/public/test_dags.py b/tests/api_fastapi/core_api/routes/public/test_dags.py index 0e9b7a408583..f913fd36e4bb 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dags.py +++ b/tests/api_fastapi/core_api/routes/public/test_dags.py @@ -312,7 +312,6 @@ def test_dag_details( "last_expired": None, "last_parsed": last_parsed, "last_parsed_time": last_parsed_time, - "last_pickled": None, "max_active_runs": 16, "max_active_tasks": 16, "max_consecutive_failed_dag_runs": 0, @@ -329,7 +328,6 @@ def test_dag_details( "value": 1, } }, - "pickle_id": None, "render_template_as_native_obj": False, "timetable_summary": None, "start_date": start_date.replace(tzinfo=None).isoformat() + "Z", # pydantic datetime format @@ -381,12 +379,10 @@ def test_get_dag(self, test_client, query_params, dag_id, expected_status_code, "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": "grid", "last_parsed_time": last_parsed_time, "timetable_description": "Never, external triggers only", "has_import_errors": False, - "pickle_id": None, } assert res_json == expected diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index 5a4e0b279242..ed1a2c28754f 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -21,7 +21,6 @@ import json import logging import os -import re import shutil import sys from argparse import ArgumentParser @@ -288,7 +287,6 @@ def test_run_with_existing_dag_run_id(self, mock_local_job_runner): wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, - pickle_id=None, pool=None, external_executor_id=None, ) @@ -323,7 +321,6 @@ def test_run_with_read_from_db(self, mock_local_job_runner, caplog, from_db): wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, - pickle_id=None, pool=None, external_executor_id=None, ) @@ -606,31 +603,6 @@ def test_task_render_with_custom_timetable(self, mock_dagrun, mock_scalars, mock ) assert "data_interval" in mock_dagrun.call_args.kwargs - def test_cli_run_when_pickle_and_dag_cli_method_selected(self): - """ - tasks run should return an AirflowException when invalid pickle_id is passed - """ - pickle_id = "pickle_id" - - with pytest.raises( - AirflowException, - match=re.escape("You cannot use the --pickle option when using DAG.cli() method."), - ): - task_command.task_run( - self.parser.parse_args( - [ - "tasks", - "run", - "example_bash_operator", - "runme_0", - DEFAULT_DATE.isoformat(), - "--pickle", - pickle_id, - ] - ), - self.dag, - ) - def test_task_state(self): task_command.task_state( self.parser.parse_args( @@ -784,7 +756,6 @@ def test_external_executor_id_present_for_fork_run_task(self, mock_local_job): job=mock.ANY, task_instance=mock.ANY, mark_success=False, - pickle_id=None, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, @@ -806,7 +777,6 @@ def test_external_executor_id_present_for_process_run_task(self, mock_local_job) job=mock.ANY, task_instance=mock.ANY, mark_success=False, - pickle_id=None, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 891223e2cd67..192a12358e8d 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -81,8 +81,8 @@ class FakeDagFileProcessorRunner(DagFileProcessorProcess): # This fake processor will return the zombies it received in constructor # as its processing result w/o actually parsing anything. - def __init__(self, file_path, pickle_dags, dag_ids, dag_directory, callbacks): - super().__init__(file_path, pickle_dags, dag_ids, dag_directory, callbacks) + def __init__(self, file_path, dag_ids, dag_directory, callbacks): + super().__init__(file_path, dag_ids, dag_directory, callbacks) # We need a "real" selectable handle for waitable_handle to work readable, writable = multiprocessing.Pipe(duplex=False) writable.send("abc") @@ -110,10 +110,9 @@ def result(self): return self._result @staticmethod - def _create_process(file_path, callback_requests, dag_ids, dag_directory, pickle_dags): + def _create_process(file_path, callback_requests, dag_ids, dag_directory): return FakeDagFileProcessorRunner( file_path, - pickle_dags, dag_ids, dag_directory, callback_requests, @@ -179,7 +178,6 @@ def test_remove_file_clears_import_error(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=async_mode, ), ) @@ -215,7 +213,6 @@ def test_max_runs_when_no_files(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=async_mode, ), ) @@ -239,7 +236,6 @@ def test_start_new_processes_with_same_filepath(self, _): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -273,7 +269,6 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -298,7 +293,6 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -332,7 +326,6 @@ def test_file_paths_in_queue_sorted_alphabetically( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -364,7 +357,6 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -429,7 +421,6 @@ def test_file_paths_in_queue_sorted_by_modified_time( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -469,7 +460,6 @@ def test_file_paths_in_queue_excludes_missing_file( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -506,7 +496,6 @@ def test_add_new_file_to_parsing_queue( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -554,7 +543,6 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -615,7 +603,6 @@ def test_file_paths_in_queue_sorted_by_priority( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -644,7 +631,6 @@ def test_scan_stale_dags(self): processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -722,7 +708,6 @@ def test_scan_stale_dags_standalone_mode(self): processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -779,14 +764,12 @@ def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid, mock_waitable processor_timeout=timedelta(seconds=5), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) processor = DagFileProcessorProcess( file_path="abc.txt", - pickle_dags=False, dag_ids=[], dag_directory=TEST_DAG_FOLDER, callback_requests=[], @@ -812,14 +795,12 @@ def test_kill_timed_out_processors_no_kill(self, mock_dag_file_processor, mock_p processor_timeout=timedelta(seconds=5), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) processor = DagFileProcessorProcess( file_path="abc.txt", - pickle_dags=False, dag_ids=[], dag_directory=str(TEST_DAG_FOLDER), callback_requests=[], @@ -854,7 +835,6 @@ def test_dag_with_system_exit(self): max_runs=1, processor_timeout=timedelta(seconds=5), signal_conn=child_pipe, - pickle_dags=False, async_mode=True, ), ) @@ -901,7 +881,6 @@ def test_import_error_with_dag_directory(self, tmp_path): max_runs=1, signal_conn=child_pipe, processor_timeout=timedelta(seconds=5), - pickle_dags=False, async_mode=False, ), ) @@ -922,7 +901,6 @@ def test_import_error_with_dag_directory(self, tmp_path): max_runs=1, signal_conn=child_pipe, processor_timeout=timedelta(seconds=5), - pickle_dags=False, async_mode=True, ), ) @@ -992,7 +970,6 @@ def fake_processor_(*args, **kwargs): max_runs=100, processor_timeout=timedelta(seconds=5), signal_conn=child_pipe, - pickle_dags=False, async_mode=True, ) @@ -1034,7 +1011,6 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=async_mode, ), ) @@ -1068,7 +1044,6 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -1098,7 +1073,6 @@ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -1144,7 +1118,6 @@ def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, t processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -1194,7 +1167,6 @@ def test_fetch_callbacks_from_database(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=False, ), ) @@ -1241,7 +1213,6 @@ def test_fetch_callbacks_for_current_dag_directory_only(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=False, ), ) @@ -1281,7 +1252,6 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=False, ), ) @@ -1322,7 +1292,6 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=False, ), ) @@ -1345,7 +1314,6 @@ def test_callback_queue(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -1446,9 +1414,7 @@ class path, thus when reloading logging module the airflow.processor_manager os.remove(log_file_loc) # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent( - test_dag_path, 0, timedelta(days=365), [], False, async_mode - ) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) processor_agent.start() if not async_mode: processor_agent.run_single_parsing_loop() @@ -1467,7 +1433,7 @@ def test_parse_once(self): test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") - processor_agent = DagFileProcessorAgent(test_dag_path, 1, timedelta(days=365), [], False, async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 1, timedelta(days=365), [], async_mode) processor_agent.start() if not async_mode: processor_agent.run_single_parsing_loop() @@ -1495,7 +1461,7 @@ def test_launch_process(self): os.remove(log_file_loc) # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], False, async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) processor_agent.start() if not async_mode: processor_agent.run_single_parsing_loop() @@ -1505,21 +1471,21 @@ def test_launch_process(self): assert os.path.isfile(log_file_loc) def test_single_parsing_loop_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._process = Mock() processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.run_single_parsing_loop() def test_single_parsing_loop_no_process(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._process = None with pytest.raises(ValueError, match="Process not started"): processor_agent.run_single_parsing_loop() def test_single_parsing_loop_process_isnt_alive(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._process = Mock() processor_agent._parent_signal_conn = Mock() processor_agent._process.is_alive.return_value = False @@ -1527,7 +1493,7 @@ def test_single_parsing_loop_process_isnt_alive(self): assert not ret_val def test_single_parsing_loop_process_conn_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._process = Mock() processor_agent._parent_signal_conn = Mock() processor_agent._process.is_alive.return_value = True @@ -1536,25 +1502,25 @@ def test_single_parsing_loop_process_conn_error(self): assert not ret_val def test_get_callbacks_pipe(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() retval = processor_agent.get_callbacks_pipe() assert retval == processor_agent._parent_signal_conn def test_get_callbacks_pipe_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.get_callbacks_pipe() def test_wait_until_finished_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.wait_until_finished() def test_wait_until_finished_poll_eof_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1563,13 +1529,13 @@ def test_wait_until_finished_poll_eof_error(self): assert ret_val is None def test_heartbeat_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.heartbeat() def test_heartbeat_poll_eof_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1578,7 +1544,7 @@ def test_heartbeat_poll_eof_error(self): assert ret_val is None def test_heartbeat_poll_connection_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1587,7 +1553,7 @@ def test_heartbeat_poll_connection_error(self): assert ret_val is None def test_heartbeat_poll_process_message(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.side_effect = [True, False] processor_agent._parent_signal_conn.recv = Mock() @@ -1598,19 +1564,19 @@ def test_heartbeat_poll_process_message(self): def test_process_message_invalid_type(self): message = "xyz" - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) with pytest.raises(RuntimeError, match="Unexpected message received of type str"): processor_agent._process_message(message) def test_heartbeat_manager(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent._heartbeat_manager() @mock.patch("airflow.utils.process_utils.reap_process_group") def test_heartbeat_manager_process_restart(self, mock_pg): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._process = MagicMock() processor_agent.start = Mock() @@ -1624,7 +1590,7 @@ def test_heartbeat_manager_process_restart(self, mock_pg): @mock.patch("time.monotonic") @mock.patch("airflow.dag_processing.manager.reap_process_group") def test_heartbeat_manager_process_reap(self, mock_pg, mock_time_monotonic, mock_stats): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._process = Mock() processor_agent._process.pid = 12345 @@ -1645,7 +1611,7 @@ def test_heartbeat_manager_process_reap(self, mock_pg, mock_time_monotonic, mock processor_agent.start.assert_called() def test_heartbeat_manager_terminate(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._process = Mock() processor_agent._process.is_alive.return_value = True @@ -1655,7 +1621,7 @@ def test_heartbeat_manager_terminate(self): processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) def test_heartbeat_manager_terminate_conn_err(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._process = Mock() processor_agent._process.is_alive.return_value = True processor_agent._parent_signal_conn = Mock() @@ -1666,7 +1632,7 @@ def test_heartbeat_manager_terminate_conn_err(self): processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) def test_heartbeat_manager_end_no_process(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._process = Mock() processor_agent._process.__bool__ = Mock(return_value=False) processor_agent._process.side_effect = [None] @@ -1682,7 +1648,7 @@ def test_log_to_stdout(self, capfd): async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], False, async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) processor_agent.start() if not async_mode: processor_agent.run_single_parsing_loop() @@ -1701,7 +1667,7 @@ def test_not_log_to_stdout(self, capfd): async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], False, async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) processor_agent.start() if not async_mode: processor_agent.run_single_parsing_loop() diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 439c1123f995..f117b3ffe458 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -112,7 +112,7 @@ def _process_file(self, file_path, dag_directory, session): dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock() ) - dag_file_processor.process_file(file_path, [], False) + dag_file_processor.process_file(file_path, []) @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode @patch.object(TaskInstance, "handle_failure") @@ -594,7 +594,6 @@ def test_import_error_tracebacks_zip_depth(self, tmp_path): def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for_file): processor = DagFileProcessorProcess( file_path="abc.txt", - pickle_dags=False, dag_ids=[], dag_directory=[], callback_requests=[], @@ -603,7 +602,6 @@ def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for result_channel=MagicMock(), parent_channel=MagicMock(), file_path="fake_file_path", - pickle_dags=False, dag_ids=[], thread_name="fake_thread_name", callback_requests=[], @@ -618,7 +616,6 @@ def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for def test_dag_parser_output_when_logging_to_file(self, mock_redirect_stdout_for_file): processor = DagFileProcessorProcess( file_path="abc.txt", - pickle_dags=False, dag_ids=[], dag_directory=[], callback_requests=[], @@ -627,7 +624,6 @@ def test_dag_parser_output_when_logging_to_file(self, mock_redirect_stdout_for_f result_channel=MagicMock(), parent_channel=MagicMock(), file_path="fake_file_path", - pickle_dags=False, dag_ids=[], thread_name="fake_thread_name", callback_requests=[], @@ -645,7 +641,6 @@ def test_no_valueerror_with_parseable_dag_in_zip(self, mock_context, tmp_path): processor = DagFileProcessorProcess( file_path=zip_filename, - pickle_dags=False, dag_ids=[], dag_directory=[], callback_requests=[], @@ -662,7 +657,6 @@ def test_nullbyte_exception_handling_when_preimporting_airflow(self, mock_contex processor = DagFileProcessorProcess( file_path=dag_filename, - pickle_dags=False, dag_ids=[], dag_directory=[], callback_requests=[], @@ -696,7 +690,6 @@ def test_error_when_waiting_in_async_mode(self, tmp_path): max_runs=1, processor_timeout=datetime.timedelta(1), dag_ids=[], - pickle_dags=False, async_mode=True, ) self.processor_agent.start() @@ -709,7 +702,6 @@ def test_default_multiprocessing_behaviour(self, tmp_path): max_runs=1, processor_timeout=datetime.timedelta(1), dag_ids=[], - pickle_dags=False, async_mode=False, ) self.processor_agent.start() @@ -723,7 +715,6 @@ def test_spawn_multiprocessing_behaviour(self, tmp_path): max_runs=1, processor_timeout=datetime.timedelta(1), dag_ids=[], - pickle_dags=False, async_mode=False, ) self.processor_agent.start() diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index da7422737ac4..be3ad517d70c 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -44,10 +44,6 @@ def test_supports_sentry(): assert not BaseExecutor.supports_sentry -def test_supports_pickling(): - assert BaseExecutor.supports_pickling - - def test_is_local_default_value(): assert not BaseExecutor.is_local diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py index 9443f0395fb1..7bd4fbec203b 100644 --- a/tests/executors/test_local_executor.py +++ b/tests/executors/test_local_executor.py @@ -34,9 +34,6 @@ class TestLocalExecutor: TEST_SUCCESS_COMMANDS = 5 - def test_supports_pickling(self): - assert not LocalExecutor.supports_pickling - def test_supports_sentry(self): assert not LocalExecutor.supports_sentry diff --git a/tests/executors/test_sequential_executor.py b/tests/executors/test_sequential_executor.py index 54e2a9170316..f6cb7aae575b 100644 --- a/tests/executors/test_sequential_executor.py +++ b/tests/executors/test_sequential_executor.py @@ -23,9 +23,6 @@ class TestSequentialExecutor: - def test_supports_pickling(self): - assert not SequentialExecutor.supports_pickling - def test_supports_sentry(self): assert not SequentialExecutor.supports_sentry diff --git a/tests/listeners/test_dag_import_error_listener.py b/tests/listeners/test_dag_import_error_listener.py index 5709ba19a8de..cae92af19844 100644 --- a/tests/listeners/test_dag_import_error_listener.py +++ b/tests/listeners/test_dag_import_error_listener.py @@ -99,7 +99,7 @@ def _process_file(self, file_path, dag_directory, session): dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock() ) - dag_file_processor.process_file(file_path, [], False) + dag_file_processor.process_file(file_path, []) @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode def test_newly_added_import_error(self, tmp_path, session): diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 218f635ad91d..e38beb2110ca 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1308,13 +1308,6 @@ def test_fractional_seconds(self): assert start_date == run.start_date, "dag run start_date loses precision " self._clean_up(dag_id) - def test_pickling(self): - test_dag_id = "test_pickling" - args = {"owner": "airflow", "start_date": DEFAULT_DATE} - dag = DAG(test_dag_id, schedule=None, default_args=args) - dag_pickle = dag.pickle() - assert dag_pickle.pickle.dag_id == dag.dag_id - def test_rich_comparison_ops(self): test_dag_id = "test_rich_comparison_ops" diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py index ba018cdad36d..e60146b9558f 100644 --- a/tests/utils/test_cli_util.py +++ b/tests/utils/test_cli_util.py @@ -33,7 +33,7 @@ from airflow.exceptions import AirflowException from airflow.models.log import Log from airflow.utils import cli, cli_action_loggers, timezone -from airflow.utils.cli import _search_for_dag_file, get_dag_by_pickle +from airflow.utils.cli import _search_for_dag_file # Mark entire module as db_test because ``action_cli`` wrapper still could use DB on callbacks: # - ``cli_action_loggers.on_pre_execution`` @@ -169,22 +169,6 @@ def test_setup_locations_none_pid_path(self): pid, _, _, _ = cli.setup_locations(process=process_name) assert pid == default_pid_path - def test_get_dag_by_pickle(self, session, dag_maker): - from airflow.models.dagpickle import DagPickle - - with dag_maker(dag_id="test_get_dag_by_pickle") as dag: - pass - - dp = DagPickle(dag=dag) - session.add(dp) - session.commit() - - dp_from_db = get_dag_by_pickle(pickle_id=dp.id, session=session) - assert dp_from_db.dag_id == "test_get_dag_by_pickle" - - with pytest.raises(AirflowException, match="pickle_id could not be found .* -42"): - get_dag_by_pickle(pickle_id=-42, session=session) - @pytest.mark.parametrize( ["given_command", "expected_masked_command"], [ diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index c05e0ceb5050..47e93c1616d6 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -338,7 +338,6 @@ def test_no_models_missing(self): "log_template", # not a significant source of data; age not indicative of staleness "dag_tag", # not a significant source of data; age not indicative of staleness, "dag_owner_attributes", # not a significant source of data; age not indicative of staleness, - "dag_pickle", # unsure of consequences "dag_code", # self-maintaining "dag_warning", # self-maintaining "connection", # leave alone diff --git a/tests/www/views/test_views_home.py b/tests/www/views/test_views_home.py index 44684cdb9ca7..59a2a288241b 100644 --- a/tests/www/views/test_views_home.py +++ b/tests/www/views/test_views_home.py @@ -205,7 +205,7 @@ def client_single_dag_edit(app, user_single_dag_edit): def _process_file(file_path): dag_file_processor = DagFileProcessor(dag_ids=[], dag_directory="/tmp", log=mock.MagicMock()) - dag_file_processor.process_file(file_path, [], False) + dag_file_processor.process_file(file_path, []) @pytest.fixture