Skip to content

Commit

Permalink
change spark connection form and add spark connections docs (apache#3…
Browse files Browse the repository at this point in the history
…6419)

* change spark connection form and add spark connections docs

* make SQL letter upercase in spark-sql connection header

* rename from spark to spark-submit and add default values in connection form
  • Loading branch information
shohamy7 authored Dec 29, 2023
1 parent 5551e14 commit ed9080a
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 29 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/apache/spark/hooks/spark_jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SparkJDBCHook(SparkSubmitHook):
Extends the SparkSubmitHook for performing data transfers to/from JDBC-based databases with Apache Spark.
:param spark_app_name: Name of the job (default airflow-spark-jdbc)
:param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>`
:param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>`
as configured in Airflow administration
:param spark_conf: Any additional Spark configuration properties
:param spark_py_files: Additional python files used (.zip, .egg, or .py)
Expand Down
25 changes: 25 additions & 0 deletions airflow/providers/apache/spark/hooks/spark_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,31 @@ class SparkSqlHook(BaseHook):
conn_type = "spark_sql"
hook_name = "Spark SQL"

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour."""
return {
"hidden_fields": ["schema", "login", "password", "extra"],
"relabeling": {},
}

@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Returns connection widgets to add to connection form."""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField
from wtforms.validators import Optional

return {
"queue": StringField(
lazy_gettext("YARN queue"),
widget=BS3TextFieldWidget(),
description="Default YARN queue to use",
validators=[Optional()],
)
}

def __init__(
self,
sql: str,
Expand Down
45 changes: 40 additions & 5 deletions airflow/providers/apache/spark/hooks/spark_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@
with contextlib.suppress(ImportError, NameError):
from airflow.providers.cncf.kubernetes import kube_client

ALLOWED_SPARK_BINARIES = ["spark-submit", "spark2-submit", "spark3-submit"]
DEFAULT_SPARK_BINARY = "spark-submit"
ALLOWED_SPARK_BINARIES = [DEFAULT_SPARK_BINARY, "spark2-submit", "spark3-submit"]


class SparkSubmitHook(BaseHook, LoggingMixin):
"""
Wrap the spark-submit binary to kick off a spark-submit job; requires "spark-submit" binary in the PATH.
:param conf: Arbitrary Spark configuration properties
:param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>` as configured
:param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>` as configured
in Airflow administration. When an invalid connection_id is supplied, it will default
to yarn.
:param files: Upload additional files to the executor running the job, separated by a
Expand Down Expand Up @@ -98,10 +99,44 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour."""
return {
"hidden_fields": ["schema", "login", "password"],
"hidden_fields": ["schema", "login", "password", "extra"],
"relabeling": {},
}

@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Returns connection widgets to add to connection form."""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField
from wtforms.validators import Optional, any_of

return {
"queue": StringField(
lazy_gettext("YARN queue"),
widget=BS3TextFieldWidget(),
description="Default YARN queue to use",
validators=[Optional()],
),
"deploy-mode": StringField(
lazy_gettext("Deploy mode"),
widget=BS3TextFieldWidget(),
description="Must be client or cluster",
validators=[any_of(["client", "cluster"])],
default="client",
),
"spark-binary": StringField(
lazy_gettext("Spark binary"),
widget=BS3TextFieldWidget(),
description=f"Must be one of: {', '.join(ALLOWED_SPARK_BINARIES)}",
validators=[any_of(ALLOWED_SPARK_BINARIES)],
default=DEFAULT_SPARK_BINARY,
),
"namespace": StringField(
lazy_gettext("Kubernetes namespace"), widget=BS3TextFieldWidget(), validators=[Optional()]
),
}

def __init__(
self,
conf: dict[str, Any] | None = None,
Expand Down Expand Up @@ -198,7 +233,7 @@ def _resolve_connection(self) -> dict[str, Any]:
"master": "yarn",
"queue": None,
"deploy_mode": None,
"spark_binary": self.spark_binary or "spark-submit",
"spark_binary": self.spark_binary or DEFAULT_SPARK_BINARY,
"namespace": None,
}

Expand All @@ -216,7 +251,7 @@ def _resolve_connection(self) -> dict[str, Any]:
conn_data["queue"] = self._queue if self._queue else extra.get("queue")
conn_data["deploy_mode"] = self._deploy_mode if self._deploy_mode else extra.get("deploy-mode")
if not self.spark_binary:
self.spark_binary = extra.get("spark-binary", "spark-submit")
self.spark_binary = extra.get("spark-binary", DEFAULT_SPARK_BINARY)
if self.spark_binary is not None and self.spark_binary not in ALLOWED_SPARK_BINARIES:
raise RuntimeError(
f"The spark-binary extra can be on of {ALLOWED_SPARK_BINARIES} and it"
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/spark/operators/spark_jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class SparkJDBCOperator(SparkSubmitOperator):
:ref:`howto/operator:SparkJDBCOperator`
:param spark_app_name: Name of the job (default airflow-spark-jdbc)
:param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>`
:param spark_conn_id: The :ref:`spark connection id <howto/connection:spark-submit>`
as configured in Airflow administration
:param spark_conf: Any additional Spark configuration properties
:param spark_py_files: Additional python files used (.zip, .egg, or .py)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/spark/operators/spark_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class SparkSubmitOperator(BaseOperator):
:param application: The application that submitted as a job, either jar or py file. (templated)
:param conf: Arbitrary Spark configuration properties (templated)
:param conn_id: The :ref:`spark connection id <howto/connection:spark>` as configured
:param conn_id: The :ref:`spark connection id <howto/connection:spark-submit>` as configured
in Airflow administration. When an invalid connection_id is supplied, it will default to yarn.
:param files: Upload additional files to the executor running the job, separated by a
comma. Files will be placed in the working directory of each executor.
Expand Down
28 changes: 28 additions & 0 deletions docs/apache-airflow-providers-apache-spark/connections/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Apache Spark Connections
========================


.. toctree::
:maxdepth: 1
:glob:

*
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. _howto/connection:spark-connect:

Apache Spark Connect Connection
===============================

The Apache Spark Connect connection type enables connection to Apache Spark via the Spark connect interface.

Default Connection IDs
----------------------

The Spark Connect hook uses ``spark_connect_default`` by default.

Configuring the Connection
--------------------------
Host (required)
The host to connect to, should be a valid hostname.

Port (optional)
Specify the port in case of host be an URL.

User ID (optional, only applies to Spark Connect)
The user ID to authenticate with the proxy.

Token (optional, only applies to Spark Connect)
The token to authenticate with the proxy.

Use SSL (optional, only applies to Spark Connect)
Whether to use SSL when connecting.

.. warning::

Make sure you trust your users with the ability to configure the host settings as it may enable the connection to
establish communication with external servers. It's crucial to understand that directing the connection towards a
malicious server can lead to significant security vulnerabilities, including the risk of encountering
Remote Code Execution (RCE) attacks.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. _howto/connection:spark-sql:

Apache Spark SQL Connection
===========================

The Apache Spark SQL connection type enables connection to Apache Spark via the ``spark-sql`` command.

Default Connection IDs
----------------------

SparkSqlHook uses ``spark_sql_default`` by default.

Configuring the Connection
--------------------------
Host (required)
The host to connect to, it can be ``local``, ``yarn`` or an URL.

Port (optional)
Specify the port in case of host be an URL.

YARN Queue
The name of the YARN queue to which the application is submitted.

.. warning::

Make sure you trust your users with the ability to configure the host settings as it may enable the connection to
establish communication with external servers. It's crucial to understand that directing the connection towards a
malicious server can lead to significant security vulnerabilities, including the risk of encountering
Remote Code Execution (RCE) attacks.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
.. _howto/connection:spark:
.. _howto/connection:spark-submit:

Apache Spark Connection
=======================
Apache Spark Submit Connection
==============================

The Apache Spark connection type enables connection to Apache Spark.
The Apache Spark Submit connection type enables connection to Apache Spark via the ``spark-submit`` command.

Default Connection IDs
----------------------

Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default. Spark SQL hooks and operators point to ``spark_sql_default`` by default. The Spark Connect hook uses ``spark_connect_default`` by default.
Spark Submit and Spark JDBC hooks and operators use ``spark_default`` by default.

Configuring the Connection
--------------------------
Expand All @@ -37,22 +37,17 @@ Host (required)
Port (optional)
Specify the port in case of host be an URL.

Extra (optional)
Specify the extra parameters (as json dictionary) that can be used in spark connection. The following parameters out of the standard python parameters are supported:
YARN Queue (optional, only applies to spark on YARN applications)
The name of the YARN queue to which the application is submitted.

* ``queue`` - The name of the YARN queue to which the application is submitted.
* ``deploy-mode`` - Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client).
* ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value.
* ``namespace`` - Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota).
Deploy mode (optional)
Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client).

User ID (optional, only applies to Spark Connect)
The user ID to authenticate with the proxy.
Spark binary (optional)
The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit``, ``spark2-submit`` or ``spark3-submit`` are allowed as value.

Token (optional, only applies to Spark Connect)
The token to authenticate with the proxy.

Use SSL (optional, only applies to Spark Connect)
Whether to use SSL when connecting.
Kubernetes namespace (optional, only applies to spark on kubernetes applications)
Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota).

When specifying the connection in environment variable you should specify
it using URI syntax.
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-apache-spark/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
:maxdepth: 1
:caption: Guides

Connection types <connections/spark>
Connection types <connections/index>
Decorators <decorators/pyspark>
Operators <operators>

Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow-providers-apache-spark/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ Prerequisite
------------

* To use :class:`~airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator`
you must configure :doc:`Spark Connection <connections/spark>`.
you must configure :doc:`Spark Connection <connections/spark-submit>`.
* To use :class:`~airflow.providers.apache.spark.operators.spark_jdbc.SparkJDBCOperator`
you must configure both :doc:`Spark Connection <connections/spark>`
you must configure both :doc:`Spark Connection <connections/spark-submit>`
and :doc:`JDBC connection <apache-airflow-providers-jdbc:connections/jdbc>`.
* :class:`~airflow.providers.apache.spark.operators.spark_sql.SparkSqlOperator`
gets all the configurations from operator parameters.
Expand Down

0 comments on commit ed9080a

Please sign in to comment.