-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add GCP Financial Services API #43446
base: main
Are you sure you want to change the base?
Conversation
financial_services_discovery.json is added because the discovery doc requires a developer key when accessed through the googleapiclient.discovery.build method
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
Hi there! Thanks for your PR, sounds like a good feature to add :) |
sure! |
gcp_conn_id: str = "google_cloud_default", | ||
**kwargs, | ||
) -> None: | ||
if kwargs.get("delegate_to") is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is not needed, because you created a new Hook. And this if-clause was added for deprecate delegate_to
parameter in old hooks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comment. I've removed this part from the code.
) | ||
self.discovery_doc = discovery_doc | ||
|
||
def get_conn(self) -> Resource: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we try to download a discovery document for an authorized user? Firstly authorize the user, then download a discovery doc and after that build a connection. Similar to this code for firebase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The credentials retrieved using GoogleBaseHook
don't authenticate the Financial Services; for this service a developer key should be provided due to the API is not currently being publicly General Available. For some use cases, we expect that providing the discovery document may be more feasible than creating and sharing an API key in GCP. However, I agree that having the option to use the developer key could also be useful.
I have made updates to enable users to supply either a developer API key OR a discovery document as a local .json file by setting Airflow variables. An exception will be raised if neither of these variables are set.
:returns: A dictionary containing the instance metadata | ||
""" | ||
conn = self.get_conn() | ||
response = conn.projects().locations().instances().get(name=instance_resource_uri).execute() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all our methods in hooks we divide name
for parameters: project_id
, location_id
and instance_id
and then unite these parameters in the name
inside the method. For example https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py#L117
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comment. I have made the updates to hook and operator code accordingly.
Create a Financial Services Anti-Money Laundering AI instance. | ||
|
||
:param instance_id: Identifier for the instance to create | ||
:param kms_key_uri: URI of the KMS key to that will be used for instance encryption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same as mentioned above for location_resource_uri
and kms_key_uri
. These URIs should be divided on parameters.
Delete a Financial Services Anti-Money Laundering AI instance. | ||
|
||
:param instance_resource_uri: URI of the instance to delete (format: | ||
'projects/<Project ID>/locations/<Location>/instances/<Instance ID>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same instance_resource_uri
should be divided on parameters.
Get a Financial Services Anti-Money Laundering AI operation. | ||
|
||
:param operation_resource_uri: URI of the operation to get (format: | ||
'projects/<Project ID>/locations/<Location>/operations/<Operation ID>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same operation_resource_uri
should be divided on parameters.
|
||
if TYPE_CHECKING: | ||
from airflow.utils.context import Context | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All Operators below should be changed after changes in Hook will be made. Example how the operator should look: https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/google/cloud/operators/dataproc_metastore.py#L196C1-L224C1
from airflow.utils.context import Context | ||
|
||
|
||
class FinancialServicesOperationSensor(BaseSensorOperator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain the reason for this Sensor? I see that this Sensor is waiting when Operation is completed. And if it is the purpose of this Sensor then we don't need this Sensor. Because for this reason in our Operators we use the wait_for_operation
function. Here's the code with the implementation in Hook and how we use it in the Operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several factors affect the feasibility of following the standard wait_for_operation
pattern:
- we don't currently have a python client API for Financial Services
- type URLs are not registered for the Financial Services API types
I have pushed a possible solution that patches the missing types with an Empty
protobuf. The trade-of is that the operation metadata and instance data aren't returned by the operator after parsing the the json response from a call to the Financial Services API.
Given the operators are for allowlist-only service, I don't think they should be placed in the main Google provider, but instead in a separate one. |
Thanks for the suggestion. Would you recommend adding to somewhere existing in the provider packages or introducing a new provider? |
can you give us the reason why we want to add this new service to Google provider? Right now it looks like it has very narrow use case (it requires specific configuration to be able to run something using its API), so I would suggest not add it to Google provider without specific reason. |
I think it should be a new, separate one. Ideally, providers should come with dashboards with system tests, similar to the ones we already have:
|
Can I clarify that your concern about the system test is access to the Financial Services API? I assume that the API is not currently available in the system test environment. Would it be possible to enable the API in the system test environment, and would that address your concerns? |
For disclosure, I work with @claw89. Few more responses to question from @VladaZakharova. I should also add we are not Google, we're a small consulting company working with this product.
This is a publicly documented Google GA product, it's hard to see it fitting anywhere else in a different provider for in airflow-repository community supported providers. The procedure for adding a new provider involves a vote on the public list and I think it's unlikely we're going to get that passed (see here), but I will concede a third party package is an option.
We want to add it because there are several very large Google cloud customers (banks) who already use this API in production. They are either already using cloud composer and/or airflow to orchestrate related data workflows and this API is very data/data engineering workflow dependent. Bringing this in under the primary airflow repository would make it easier for them to adopt the AML AI product, since Airflow is often an approved product in these institutions or an obvious choice. Bringing in additional software often requires extra paperwork! We've observed more than one have resorted to building their own providers/hooks. I will concede that the usecase is narrow, but it's deep enough for Google to have released as a cloud product. We're also planning on adding addition operators. |
Quick addition; if the concern is CI / Systems test, as @claw89 says, we can probably work to get the system test project allow-listed for system test purposes if that would help with getting consensus toward mergability. It would also allow the use of the discovery document in those system tests, too. |
There is work planned on splitting the Google provider into a few smaller ones, to e.g. reduce the dependency tree size for users who want to use certain cloud services, ads, workspace, etc. The Cloud AI products, like Financial Services API, are distinct enough to be placed in a separate provider in a clean way. The concern about CI is also there - there should be system tests implemented for the new operators and exposed in a dashboard. If you propose the new provider & provide a CI dashboard with system tests for it, I'd be voting +1 in a vote on adding it. @potiuk please chime in if you see any major obstacles for it. |
I think the issues are:
If the intention/new policy is that any additional services added to the google provider should be channelled down as an incremental process as a path to close #15933, fine - but it's going to be quite confusing to users in the meantime and requires a proper roadmap rather than a piecemeal approach. Otherwise, I'm concerned that #15933 has been open for 3 years now and blocking incremental additions behind what is outwardly a considerable change doesn't seem to have happened elsewhere - unless work is already underway? |
I think the operators from this PR should be placed in a new, google-cloud-ai provider. I am not asking to contribute to the split - just create a separate, small provider for the operators you're contributing.
No - only operators for services that are very different (e.g. authentication point above, allowlist required), not useful to majority of users (because of required allowlisting / dedicated account vs. other cloud services available straightaway for cloud users), or introducing heavy dependency footprint (this is not the case with this PR), or other reason along these lines.
|
I am here with @michalmodras -> the example thing to separate from Google provider is This is precisely what happened with ads provider. And If Google team wants to introduce and maintain the dashboard - that's ok. The condition for approving new provider is that there is someone who we can trust will keep an eye on it. It does not have to be original author - and here I see we have Google team confirming they will. |
Though.... After looking at the code - there is a bit of difficulty there @michalmodras -> this one is using GoogleBaseHook and I believe the dependency on this common GoogleBaseHook is main reason why we do not want to splitt the provider into "many" sub-providers. I am not sure @claw89 if GoogleBAseHook as base is necessary there or not though? |
@potiuk Our implementation of the FinancialServicesHook needs to inherit from the GoogleBaseHook because we use the standard authentication methods in GoogleBaseHook to connect to the Financial Services API. The different pattern that @michalmodras mentioned is around accessing the discovery document (either with a developer key or as a stand alone discovery document json) - once this document is available, the connections to the Financial Services API follow the same authentication pattern as other Google Cloud APIs. From what I can see in the Ads hook code, it seems that authentication is quite different for the Ads provider. Can anyone clarify how Ads is distinguished as a separate provider? Other 'sub-providers' (e.g., marketing_platform) in the google directory do use the GoogleBaseHook. Also, the relevant system tests for marketing platform are included in the main Google CI dashboard. I couldn't see any distinctions between Cloud and Ads that don't also apply to Cloud and Marketing Platform in the code. |
Options for Financial Services API in AirflowTo summarize the discussion, we have outlined the possible directions we could take with this PR. We'd welcome your comments on any alternative solutions or additional pros/cons so we can reach a consensus on the best way forward. 1. Existing Google provider
Pros
Cons
2. Separate provider with duplicated base code
Pros
Cons
3. Shared code in separate core provider
Pros
Cons
4. Delay merge of Financial ServicesUntil either 1) Financial Services discovery doc is released without need for developer key or 2) Google provider is split into smaller providers. |
Google Cloud Financial Services API Operators
This PR adds operators to interact with the GCP Financial Services Anti-Money Laundering AI API.
New Features
Hooks
Financial Services Hook
providers/src/airflow/providers/google/cloud/hooks/financial_services.py
adds a hook for calling the Financial Services API. The methods implemented in this PR are create, get, and delete on the instances endpoint.Note: To initialize the hook, the API discovery document must be provided as an argument in json format. This is because the Financial Services API is not currently publically General Available (customers must be allow-listed by Google), and requests to retrieve the discovery document require a developer key associated with that project. This discovery document does not contain any information beyond that already available from the public documentation and we have permission from a Google representative to check it in.
Operators
Financial Services Create Instance Operator
providers/src/airflow/providers/google/cloud/operators/financial_services.py::FinancialServicesCreateInstanceOperator
can be used for airflow tasks that create a new Financial Services instance. This call creates a long-running operation and returns the name of the operation for monitoring.Financial Services Get Instance Operator
providers/src/airflow/providers/google/cloud/operators/financial_services.py::FinancialServicesGetInstanceOperator
can be used for airflow tasks that fetch an existing Financial Services instance. This call returns the instance resource data.Financial Services Delete Instance Operator
providers/src/airflow/providers/google/cloud/operators/financial_services.py::FinancialServicesDeleteInstanceOperator
can be used for airflow tasks that delete a Financial Services instance. This call creates a long-running operation and returns the name of the operation for monitoring.Sensors
Financial Services Operation Sensor
providers/src/airflow/providers/google/cloud/sensors/financial_services.py::FinancialServicesOperationSensor
can be used to monitor the status of Financial Services operations. Thepoke
method access thedone
field of the operation and completes whendone=true
.Tests
Unit Tests
Unit tests added for the new hook, operator, and sensor features.
System Tests
An example DAG is added to the system tests directory. This DAG creates an instance, fetches the instance, and finally deletes it, with operation sensors for the create and delete operations.
Running the system test requires access to the Financial Services API. Some environment variables should also be set:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.