From c83ad24e4193ecb96956ecdcaf979a10992d7aa1 Mon Sep 17 00:00:00 2001 From: gushob21 Date: Wed, 22 Jan 2025 20:00:26 +0000 Subject: [PATCH] improving data preprocessing script --- .../ray/src/datapreprocessing/ray_utils.py | 13 +++++++------ .../data-processing/ray/src/main.py | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/use-cases/model-fine-tuning-pipeline/data-processing/ray/src/datapreprocessing/ray_utils.py b/use-cases/model-fine-tuning-pipeline/data-processing/ray/src/datapreprocessing/ray_utils.py index 7c069cc7..a1c3f410 100644 --- a/use-cases/model-fine-tuning-pipeline/data-processing/ray/src/datapreprocessing/ray_utils.py +++ b/use-cases/model-fine-tuning-pipeline/data-processing/ray/src/datapreprocessing/ray_utils.py @@ -14,29 +14,30 @@ # RAY_CLUSTER_HOST = os.environ["RAY_CLUSTER_HOST"] #IMAGE_BUCKET = os.environ["PROCESSING_BUCKET"] -IMAGE_BUCKET = "gkebatchexpce3c8dcb-gushob-rag-data" +#IMAGE_BUCKET = "gkebatchexpce3c8dcb-gushob-rag-data" class RayUtils: logger = logging.getLogger(__name__) - def __init__(self,ray_cluster_host,df,ray_resources,ray_runtime,package_name,module_name,class_name,method_name): + def __init__(self,ray_cluster_host,ray_resources,ray_runtime,package_name,module_name,class_name,method_name,df,gcs_bucket): self.ray_cluster_host = ray_cluster_host - self.df = df self.ray_resource = ray_resources self.ray_runtime = ray_runtime self.module_name = module_name self.class_name = class_name self.method_name = method_name self.package_name = package_name + self.df = df + self.gcs_bucket = gcs_bucket @ray.remote(resources={"cpu": 1}) - def invoke_process_data(self, preprocessor, df, ray_worker_node_id,IMAGE_BUCKET): + def invoke_process_data(self, preprocessor, df, ray_worker_node_id,gcs_bucket): def func_not_found(): # just in case we dont have the function print ('No Function '+self.method_name+' Found!') func = getattr(preprocessor,self.method_name,func_not_found) #return preprocessor.process_data(df, ray_worker_node_id,IMAGE_BUCKET) - return func(df, ray_worker_node_id,IMAGE_BUCKET) + return func(df, ray_worker_node_id,gcs_bucket) def run_remote(self): # Initiate a driver: start and connect with Ray cluster @@ -62,7 +63,7 @@ def run_remote(self): self.logger.debug("Data Preparation started") start_time = time.time() #results = ray.get([self.process_data.remote(preprocessor=preprocessor, df=self.df[i], ray_worker_node_id=i) for i in range(len(self.df))]) - results = ray.get([self.invoke_process_data.remote(self,preprocessor, self.df[i], i,IMAGE_BUCKET) for i in range(len(self.df))]) + results = ray.get([self.invoke_process_data.remote(self,preprocessor, self.df[i], i,gcs_bucket) for i in range(len(self.df))]) #self_ref = ray.put(self) #results = ray.get([self.invoke_process_data.remote(self,preprocessor, self.df[i], i,IMAGE_BUCKET) for i in range(len(self.df))]) duration = time.time() - start_time diff --git a/use-cases/model-fine-tuning-pipeline/data-processing/ray/src/main.py b/use-cases/model-fine-tuning-pipeline/data-processing/ray/src/main.py index 2887cda5..8b0c33c5 100644 --- a/use-cases/model-fine-tuning-pipeline/data-processing/ray/src/main.py +++ b/use-cases/model-fine-tuning-pipeline/data-processing/ray/src/main.py @@ -86,7 +86,7 @@ def graceful_shutdown(signal_number, stack_frame): res = data_prep.split_dataframe() # pass res to RayUtils object - ray_obj = RayUtils(RAY_CLUSTER_HOST,res,ray_resources,ray_runtime_env,package_name,module_name,class_name,method_name) + ray_obj = RayUtils(RAY_CLUSTER_HOST,ray_resources,ray_runtime_env,package_name,module_name,class_name,method_name,res,IMAGE_BUCKET) result_df = ray_obj.run_remote() # Replace NaN with None result_df = result_df.replace({np.nan: None})