From ed9080ae6a17d7b2478652b676579f162462bb70 Mon Sep 17 00:00:00 2001 From: shohamy7 <46799583+shohamy7@users.noreply.github.com> Date: Fri, 29 Dec 2023 14:53:44 +0200 Subject: [PATCH] change spark connection form and add spark connections docs (#36419) * change spark connection form and add spark connections docs * make SQL letter upercase in spark-sql connection header * rename from spark to spark-submit and add default values in connection form --- .../apache/spark/hooks/spark_jdbc.py | 2 +- .../providers/apache/spark/hooks/spark_sql.py | 25 +++++++++ .../apache/spark/hooks/spark_submit.py | 45 ++++++++++++++-- .../apache/spark/operators/spark_jdbc.py | 2 +- .../apache/spark/operators/spark_submit.py | 2 +- .../connections/index.rst | 28 ++++++++++ .../connections/spark-connect.rst | 54 +++++++++++++++++++ .../connections/spark-sql.rst | 48 +++++++++++++++++ .../{spark.rst => spark-submit.rst} | 31 +++++------ .../index.rst | 2 +- .../operators.rst | 4 +- 11 files changed, 214 insertions(+), 29 deletions(-) create mode 100644 docs/apache-airflow-providers-apache-spark/connections/index.rst create mode 100644 docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst create mode 100644 docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst rename docs/apache-airflow-providers-apache-spark/connections/{spark.rst => spark-submit.rst} (58%) diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py b/airflow/providers/apache/spark/hooks/spark_jdbc.py index 8c8d02f1ecc4..b904ca4260e6 100644 --- a/airflow/providers/apache/spark/hooks/spark_jdbc.py +++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py @@ -29,7 +29,7 @@ class SparkJDBCHook(SparkSubmitHook): Extends the SparkSubmitHook for performing data transfers to/from JDBC-based databases with Apache Spark. :param spark_app_name: Name of the job (default airflow-spark-jdbc) - :param spark_conn_id: The :ref:`spark connection id ` + :param spark_conn_id: The :ref:`spark connection id ` as configured in Airflow administration :param spark_conf: Any additional Spark configuration properties :param spark_py_files: Additional python files used (.zip, .egg, or .py) diff --git a/airflow/providers/apache/spark/hooks/spark_sql.py b/airflow/providers/apache/spark/hooks/spark_sql.py index 41dc741ccdd3..46eec49f3013 100644 --- a/airflow/providers/apache/spark/hooks/spark_sql.py +++ b/airflow/providers/apache/spark/hooks/spark_sql.py @@ -54,6 +54,31 @@ class SparkSqlHook(BaseHook): conn_type = "spark_sql" hook_name = "Spark SQL" + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Return custom field behaviour.""" + return { + "hidden_fields": ["schema", "login", "password", "extra"], + "relabeling": {}, + } + + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Returns connection widgets to add to connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + from wtforms.validators import Optional + + return { + "queue": StringField( + lazy_gettext("YARN queue"), + widget=BS3TextFieldWidget(), + description="Default YARN queue to use", + validators=[Optional()], + ) + } + def __init__( self, sql: str, diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py index b495cafeb608..b96d992bba95 100644 --- a/airflow/providers/apache/spark/hooks/spark_submit.py +++ b/airflow/providers/apache/spark/hooks/spark_submit.py @@ -33,7 +33,8 @@ with contextlib.suppress(ImportError, NameError): from airflow.providers.cncf.kubernetes import kube_client -ALLOWED_SPARK_BINARIES = ["spark-submit", "spark2-submit", "spark3-submit"] +DEFAULT_SPARK_BINARY = "spark-submit" +ALLOWED_SPARK_BINARIES = [DEFAULT_SPARK_BINARY, "spark2-submit", "spark3-submit"] class SparkSubmitHook(BaseHook, LoggingMixin): @@ -41,7 +42,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): Wrap the spark-submit binary to kick off a spark-submit job; requires "spark-submit" binary in the PATH. :param conf: Arbitrary Spark configuration properties - :param spark_conn_id: The :ref:`spark connection id ` as configured + :param spark_conn_id: The :ref:`spark connection id ` as configured in Airflow administration. When an invalid connection_id is supplied, it will default to yarn. :param files: Upload additional files to the executor running the job, separated by a @@ -98,10 +99,44 @@ class SparkSubmitHook(BaseHook, LoggingMixin): def get_ui_field_behaviour(cls) -> dict[str, Any]: """Return custom field behaviour.""" return { - "hidden_fields": ["schema", "login", "password"], + "hidden_fields": ["schema", "login", "password", "extra"], "relabeling": {}, } + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Returns connection widgets to add to connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + from wtforms.validators import Optional, any_of + + return { + "queue": StringField( + lazy_gettext("YARN queue"), + widget=BS3TextFieldWidget(), + description="Default YARN queue to use", + validators=[Optional()], + ), + "deploy-mode": StringField( + lazy_gettext("Deploy mode"), + widget=BS3TextFieldWidget(), + description="Must be client or cluster", + validators=[any_of(["client", "cluster"])], + default="client", + ), + "spark-binary": StringField( + lazy_gettext("Spark binary"), + widget=BS3TextFieldWidget(), + description=f"Must be one of: {', '.join(ALLOWED_SPARK_BINARIES)}", + validators=[any_of(ALLOWED_SPARK_BINARIES)], + default=DEFAULT_SPARK_BINARY, + ), + "namespace": StringField( + lazy_gettext("Kubernetes namespace"), widget=BS3TextFieldWidget(), validators=[Optional()] + ), + } + def __init__( self, conf: dict[str, Any] | None = None, @@ -198,7 +233,7 @@ def _resolve_connection(self) -> dict[str, Any]: "master": "yarn", "queue": None, "deploy_mode": None, - "spark_binary": self.spark_binary or "spark-submit", + "spark_binary": self.spark_binary or DEFAULT_SPARK_BINARY, "namespace": None, } @@ -216,7 +251,7 @@ def _resolve_connection(self) -> dict[str, Any]: conn_data["queue"] = self._queue if self._queue else extra.get("queue") conn_data["deploy_mode"] = self._deploy_mode if self._deploy_mode else extra.get("deploy-mode") if not self.spark_binary: - self.spark_binary = extra.get("spark-binary", "spark-submit") + self.spark_binary = extra.get("spark-binary", DEFAULT_SPARK_BINARY) if self.spark_binary is not None and self.spark_binary not in ALLOWED_SPARK_BINARIES: raise RuntimeError( f"The spark-binary extra can be on of {ALLOWED_SPARK_BINARIES} and it" diff --git a/airflow/providers/apache/spark/operators/spark_jdbc.py b/airflow/providers/apache/spark/operators/spark_jdbc.py index 4b4dd648a67c..e5ff5f9c65a4 100644 --- a/airflow/providers/apache/spark/operators/spark_jdbc.py +++ b/airflow/providers/apache/spark/operators/spark_jdbc.py @@ -37,7 +37,7 @@ class SparkJDBCOperator(SparkSubmitOperator): :ref:`howto/operator:SparkJDBCOperator` :param spark_app_name: Name of the job (default airflow-spark-jdbc) - :param spark_conn_id: The :ref:`spark connection id ` + :param spark_conn_id: The :ref:`spark connection id ` as configured in Airflow administration :param spark_conf: Any additional Spark configuration properties :param spark_py_files: Additional python files used (.zip, .egg, or .py) diff --git a/airflow/providers/apache/spark/operators/spark_submit.py b/airflow/providers/apache/spark/operators/spark_submit.py index be2f2d0ac5a4..bd8480b8151f 100644 --- a/airflow/providers/apache/spark/operators/spark_submit.py +++ b/airflow/providers/apache/spark/operators/spark_submit.py @@ -37,7 +37,7 @@ class SparkSubmitOperator(BaseOperator): :param application: The application that submitted as a job, either jar or py file. (templated) :param conf: Arbitrary Spark configuration properties (templated) - :param conn_id: The :ref:`spark connection id ` as configured + :param conn_id: The :ref:`spark connection id ` as configured in Airflow administration. When an invalid connection_id is supplied, it will default to yarn. :param files: Upload additional files to the executor running the job, separated by a comma. Files will be placed in the working directory of each executor. diff --git a/docs/apache-airflow-providers-apache-spark/connections/index.rst b/docs/apache-airflow-providers-apache-spark/connections/index.rst new file mode 100644 index 000000000000..71716ec9d69f --- /dev/null +++ b/docs/apache-airflow-providers-apache-spark/connections/index.rst @@ -0,0 +1,28 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +Apache Spark Connections +======================== + + +.. toctree:: + :maxdepth: 1 + :glob: + + * diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst new file mode 100644 index 000000000000..aa5ef071578b --- /dev/null +++ b/docs/apache-airflow-providers-apache-spark/connections/spark-connect.rst @@ -0,0 +1,54 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/connection:spark-connect: + +Apache Spark Connect Connection +=============================== + +The Apache Spark Connect connection type enables connection to Apache Spark via the Spark connect interface. + +Default Connection IDs +---------------------- + +The Spark Connect hook uses ``spark_connect_default`` by default. + +Configuring the Connection +-------------------------- +Host (required) + The host to connect to, should be a valid hostname. + +Port (optional) + Specify the port in case of host be an URL. + +User ID (optional, only applies to Spark Connect) + The user ID to authenticate with the proxy. + +Token (optional, only applies to Spark Connect) + The token to authenticate with the proxy. + +Use SSL (optional, only applies to Spark Connect) + Whether to use SSL when connecting. + +.. warning:: + + Make sure you trust your users with the ability to configure the host settings as it may enable the connection to + establish communication with external servers. It's crucial to understand that directing the connection towards a + malicious server can lead to significant security vulnerabilities, including the risk of encountering + Remote Code Execution (RCE) attacks. diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst new file mode 100644 index 000000000000..c4e4c606de18 --- /dev/null +++ b/docs/apache-airflow-providers-apache-spark/connections/spark-sql.rst @@ -0,0 +1,48 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/connection:spark-sql: + +Apache Spark SQL Connection +=========================== + +The Apache Spark SQL connection type enables connection to Apache Spark via the ``spark-sql`` command. + +Default Connection IDs +---------------------- + +SparkSqlHook uses ``spark_sql_default`` by default. + +Configuring the Connection +-------------------------- +Host (required) + The host to connect to, it can be ``local``, ``yarn`` or an URL. + +Port (optional) + Specify the port in case of host be an URL. + +YARN Queue + The name of the YARN queue to which the application is submitted. + +.. warning:: + + Make sure you trust your users with the ability to configure the host settings as it may enable the connection to + establish communication with external servers. It's crucial to understand that directing the connection towards a + malicious server can lead to significant security vulnerabilities, including the risk of encountering + Remote Code Execution (RCE) attacks. diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark.rst b/docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst similarity index 58% rename from docs/apache-airflow-providers-apache-spark/connections/spark.rst rename to docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst index 05b92ce75cd7..f38a2908ba40 100644 --- a/docs/apache-airflow-providers-apache-spark/connections/spark.rst +++ b/docs/apache-airflow-providers-apache-spark/connections/spark-submit.rst @@ -17,17 +17,17 @@ -.. _howto/connection:spark: +.. _howto/connection:spark-submit: -Apache Spark Connection -======================= +Apache Spark Submit Connection +============================== -The Apache Spark connection type enables connection to Apache Spark. +The Apache Spark Submit connection type enables connection to Apache Spark via the ``spark-submit`` command. Default Connection IDs ---------------------- -Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default. Spark SQL hooks and operators point to ``spark_sql_default`` by default. The Spark Connect hook uses ``spark_connect_default`` by default. +Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default. Configuring the Connection -------------------------- @@ -37,22 +37,17 @@ Host (required) Port (optional) Specify the port in case of host be an URL. -Extra (optional) - Specify the extra parameters (as json dictionary) that can be used in spark connection. The following parameters out of the standard python parameters are supported: +YARN Queue (optional, only applies to spark on YARN applications) + The name of the YARN queue to which the application is submitted. - * ``queue`` - The name of the YARN queue to which the application is submitted. - * ``deploy-mode`` - Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client). - * ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value. - * ``namespace`` - Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota). +Deploy mode (optional) + Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client). -User ID (optional, only applies to Spark Connect) - The user ID to authenticate with the proxy. +Spark binary (optional) + The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value. -Token (optional, only applies to Spark Connect) - The token to authenticate with the proxy. - -Use SSL (optional, only applies to Spark Connect) - Whether to use SSL when connecting. +Kubernetes namespace (optional, only applies to spark on kubernetes applications) + Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota). When specifying the connection in environment variable you should specify it using URI syntax. diff --git a/docs/apache-airflow-providers-apache-spark/index.rst b/docs/apache-airflow-providers-apache-spark/index.rst index 7bc8959b63c1..fa5698d61d9b 100644 --- a/docs/apache-airflow-providers-apache-spark/index.rst +++ b/docs/apache-airflow-providers-apache-spark/index.rst @@ -33,7 +33,7 @@ :maxdepth: 1 :caption: Guides - Connection types + Connection types Decorators Operators diff --git a/docs/apache-airflow-providers-apache-spark/operators.rst b/docs/apache-airflow-providers-apache-spark/operators.rst index 30d23f47cd65..f6c20985f24c 100644 --- a/docs/apache-airflow-providers-apache-spark/operators.rst +++ b/docs/apache-airflow-providers-apache-spark/operators.rst @@ -23,9 +23,9 @@ Prerequisite ------------ * To use :class:`~airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator` - you must configure :doc:`Spark Connection `. + you must configure :doc:`Spark Connection `. * To use :class:`~airflow.providers.apache.spark.operators.spark_jdbc.SparkJDBCOperator` - you must configure both :doc:`Spark Connection ` + you must configure both :doc:`Spark Connection ` and :doc:`JDBC connection `. * :class:`~airflow.providers.apache.spark.operators.spark_sql.SparkSqlOperator` gets all the configurations from operator parameters.