Skip to content

Commit

Permalink
improving data preprocessing script
Browse files Browse the repository at this point in the history
  • Loading branch information
gushob21 committed Jan 22, 2025
1 parent 739126f commit c83ad24
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,30 @@

# RAY_CLUSTER_HOST = os.environ["RAY_CLUSTER_HOST"]
#IMAGE_BUCKET = os.environ["PROCESSING_BUCKET"]
IMAGE_BUCKET = "gkebatchexpce3c8dcb-gushob-rag-data"
#IMAGE_BUCKET = "gkebatchexpce3c8dcb-gushob-rag-data"
class RayUtils:

logger = logging.getLogger(__name__)

def __init__(self,ray_cluster_host,df,ray_resources,ray_runtime,package_name,module_name,class_name,method_name):
def __init__(self,ray_cluster_host,ray_resources,ray_runtime,package_name,module_name,class_name,method_name,df,gcs_bucket):
self.ray_cluster_host = ray_cluster_host
self.df = df
self.ray_resource = ray_resources
self.ray_runtime = ray_runtime
self.module_name = module_name
self.class_name = class_name
self.method_name = method_name
self.package_name = package_name
self.df = df
self.gcs_bucket = gcs_bucket


@ray.remote(resources={"cpu": 1})
def invoke_process_data(self, preprocessor, df, ray_worker_node_id,IMAGE_BUCKET):
def invoke_process_data(self, preprocessor, df, ray_worker_node_id,gcs_bucket):
def func_not_found(): # just in case we dont have the function
print ('No Function '+self.method_name+' Found!')
func = getattr(preprocessor,self.method_name,func_not_found)
#return preprocessor.process_data(df, ray_worker_node_id,IMAGE_BUCKET)
return func(df, ray_worker_node_id,IMAGE_BUCKET)
return func(df, ray_worker_node_id,gcs_bucket)

def run_remote(self):
# Initiate a driver: start and connect with Ray cluster
Expand All @@ -62,7 +63,7 @@ def run_remote(self):
self.logger.debug("Data Preparation started")
start_time = time.time()
#results = ray.get([self.process_data.remote(preprocessor=preprocessor, df=self.df[i], ray_worker_node_id=i) for i in range(len(self.df))])
results = ray.get([self.invoke_process_data.remote(self,preprocessor, self.df[i], i,IMAGE_BUCKET) for i in range(len(self.df))])
results = ray.get([self.invoke_process_data.remote(self,preprocessor, self.df[i], i,gcs_bucket) for i in range(len(self.df))])
#self_ref = ray.put(self)
#results = ray.get([self.invoke_process_data.remote(self,preprocessor, self.df[i], i,IMAGE_BUCKET) for i in range(len(self.df))])
duration = time.time() - start_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def graceful_shutdown(signal_number, stack_frame):
res = data_prep.split_dataframe()

# pass res to RayUtils object
ray_obj = RayUtils(RAY_CLUSTER_HOST,res,ray_resources,ray_runtime_env,package_name,module_name,class_name,method_name)
ray_obj = RayUtils(RAY_CLUSTER_HOST,ray_resources,ray_runtime_env,package_name,module_name,class_name,method_name,res,IMAGE_BUCKET)
result_df = ray_obj.run_remote()
# Replace NaN with None
result_df = result_df.replace({np.nan: None})
Expand Down

0 comments on commit c83ad24

Please sign in to comment.