From e624c433c3ef29e8be9befb9f5f4bb6dd6ded499 Mon Sep 17 00:00:00 2001 From: deepgabani8 Date: Tue, 20 Sep 2022 14:42:15 +0000 Subject: [PATCH 1/2] Added some classes in weather_mv init --- weather_mv/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/weather_mv/__init__.py b/weather_mv/__init__.py index 4fbd93bb..de469066 100644 --- a/weather_mv/__init__.py +++ b/weather_mv/__init__.py @@ -11,3 +11,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from .loader_pipeline.ee import FilterFilesTransform, ConvertToAsset, IngestIntoEETransform From f28c263394c4628886aeaa813eec24a4b2b2d988 Mon Sep 17 00:00:00 2001 From: deepgabani8 Date: Wed, 21 Sep 2022 13:19:18 +0000 Subject: [PATCH 2/2] Extending weather-mv pipeline level for custom pipelines. --- custom_pipeline.py | 36 ++++++++++++++++++++++++++++++++++++ weather_mv/__init__.py | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 custom_pipeline.py diff --git a/custom_pipeline.py b/custom_pipeline.py new file mode 100644 index 00000000..9a2b5f5e --- /dev/null +++ b/custom_pipeline.py @@ -0,0 +1,36 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import weather_mv + + +class MyCustomPipeline(weather_mv.Pipeline): + """A custom pipeline extending weather mover pipeline.""" + def add_args(self): + """Adds extra arguments to the pipeline.""" + self.extra_args.extend('--runner DataflowRunner \ + --project grid-intelligence-sandbox \ + --region us-central1 \ + --job_name test-wt-as-library-2 \ + --temp_location gs://deep-tmp/ \ + --sdk_container_image gcr.io/grid-intelligence-sandbox/miniconda3-beam:testfinal-fix1' + .split()) + + +if __name__ == '__main__': + p = MyCustomPipeline(f'{os.getcwd()}/weather_mv/setup.py') + p.add_args() + p.run() diff --git a/weather_mv/__init__.py b/weather_mv/__init__.py index de469066..d9f2f4c1 100644 --- a/weather_mv/__init__.py +++ b/weather_mv/__init__.py @@ -12,4 +12,36 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .loader_pipeline.ee import FilterFilesTransform, ConvertToAsset, IngestIntoEETransform +import logging +import sys +import argparse +import typing as t + +from .loader_pipeline.pipeline import run as wmv_run +from .loader_pipeline.pipeline import pipeline as wmv_pipeline + + +class Pipeline(): + """A class for custom pipelines to extend.""" + def __init__(self, setup_file_path: t.Optional[str] = None): + self.extra_args = [] + if setup_file_path: + self.extra_args.extend(f'--setup_file {setup_file_path}'.split()) + + def add_args(self): + """Adds extra arguments to self.extra_args.""" + pass + + def pipeline(self, known_args: argparse.Namespace, pipeline_args: t.List[str]) -> None: + """Runs the pipeline.""" + wmv_pipeline(known_args, pipeline_args) + + def extract_args(self) -> t.Tuple[argparse.Namespace, t.List[str]]: + """Extracts arguments.""" + all_args = sys.argv + self.extra_args + return wmv_run(all_args) + + def run(self): + """Extracts kw arguments.""" + logging.getLogger().setLevel(logging.INFO) + self.pipeline(*self.extract_args())