diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto index d5310db577..adb6c0227e 100644 --- a/protos/feast/registry/RegistryServer.proto +++ b/protos/feast/registry/RegistryServer.proto @@ -72,8 +72,6 @@ service RegistryServer{ // Search RPCs rpc ExpediaSearchProjects (ExpediaSearchProjectsRequest) returns (ExpediaSearchProjectsResponse) {} rpc ExpediaSearchFeatureViews (ExpediaSearchFeatureViewsRequest) returns (ExpediaSearchFeatureViewsResponse) {} - rpc ExpediaSearchAIWBTableByProject (ExpediaSearchProjectsRequest) returns (ExpediaSearchAIWBTableResponse) {} - rpc ExpediaSearchAIWBTableByFeatureView (ExpediaSearchFeatureViewsRequest) returns (ExpediaSearchAIWBTableResponse) {} } message ExpediaProjectAndRelatedFeatureViews { @@ -81,12 +79,6 @@ message ExpediaProjectAndRelatedFeatureViews { repeated feast.core.FeatureView feature_views = 2; } -message ExpediaSearchAIWBTableResponse { - repeated ExpediaProjectAndRelatedFeatureViews project_and_related_feature_views = 1; - int32 total_projects = 3; - int32 total_page_indices = 4; -} - message ExpediaSearchFeatureViewsRequest { string search_text = 1; google.protobuf.BoolValue online = 2; @@ -112,9 +104,9 @@ message ExpediaSearchProjectsRequest { } message ExpediaSearchProjectsResponse { - repeated feast.core.ProjectMetadata projects = 1; - int32 total_projects = 2; - int32 total_page_indices = 3; + repeated ExpediaProjectAndRelatedFeatureViews projects_and_related_feature_views = 1; + int32 total_projects = 3; + int32 total_page_indices = 4; } message RefreshRequest { diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index f41252b728..a72919449a 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -5,7 +5,7 @@ from datetime import datetime, timezone from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Union from pydantic import StrictStr from sqlalchemy import ( # type: ignore @@ -1052,18 +1052,7 @@ def expedia_search_projects( updated_at: Optional[datetime] = None, page_size: int = 10, page_index: int = 0, - ) -> Tuple[List[ProjectMetadata], int, int]: - """ - Search for projects based on the provided search parameters with pagination, - using SQL queries to handle filtering efficiently, including a LIKE statement for matching search_text. - Returns a tuple of the list of ProjectMetadata objects, and the total number of pages. - - :param search_text: Filter by project name using a LIKE statement. - :param updated_at: Filter projects updated after this timestamp (datetime). - :param page_size: The number of results to return per page. - :param page_index: The index for fetching the next page (used as an offset). - :return: A tuple containing the list of ProjectMetadata objects, and the total number of pages. - """ + ) -> RegistryServer_pb2.ExpediaSearchProjectsResponse: project_metadata_dict: Dict[str, ProjectMetadata] = {} with self.engine.begin() as conn: @@ -1112,84 +1101,6 @@ def expedia_search_projects( rows = conn.execute(stmt).all() - for row in rows: - project_id = row._mapping["project_id"] - keys = row._mapping["keys"] - values = row._mapping["values"] - last_updated_timestamp = row._mapping["last_updated_timestamp"] - - if project_id not in project_metadata_dict: - project_metadata_dict[project_id] = ProjectMetadata( - project_name=project_id, - last_updated_timestamp=datetime.utcfromtimestamp( - last_updated_timestamp - ), - ) - - project_metadata: ProjectMetadata = project_metadata_dict[project_id] - - for key, value in zip(keys, values): - if key == FeastMetadataKeys.PROJECT_UUID.value: - project_metadata.project_uuid = value - - project_list = list(project_metadata_dict.values()) - - return project_list, total_count, total_page_indices - - def expedia_search_aiwb_table_by_project( - self, - search_text: str = "", - updated_at: Optional[datetime] = None, - page_size: int = 10, - page_index: int = 0, - ) -> RegistryServer_pb2.ExpediaSearchAIWBTableResponse: - project_metadata_dict: Dict[str, ProjectMetadata] = {} - - with self.engine.begin() as conn: - # Base SQL query to count total number of matching projects - count_stmt = select(func.count(feast_metadata.c.project_id.distinct())) - - if search_text: - count_stmt = count_stmt.where( - feast_metadata.c.project_id.like(f"%{search_text}%") - ) - - if updated_at is not None: - updated_at_timestamp = updated_at.timestamp() - count_stmt = count_stmt.where( - feast_metadata.c.last_updated_timestamp >= updated_at_timestamp - ) - - total_count = conn.execute(count_stmt).scalar() or 0 - total_page_indices = (total_count + page_size - 1) // page_size - - # Base SQL query for retrieving projects, grouped by project_id - stmt = ( - select( - feast_metadata.c.project_id, - func.array_agg(feast_metadata.c.metadata_key).label("keys"), - func.array_agg(feast_metadata.c.metadata_value).label("values"), - func.max(feast_metadata.c.last_updated_timestamp).label( - "last_updated_timestamp" - ), - ) - .group_by(feast_metadata.c.project_id) - .order_by(feast_metadata.c.project_id) - .limit(page_size) - .offset(page_index * page_size) - ) - - if search_text: - stmt = stmt.where(feast_metadata.c.project_id.like(f"%{search_text}%")) - - if updated_at is not None: - updated_at_timestamp = updated_at.timestamp() - stmt = stmt.where( - feast_metadata.c.last_updated_timestamp >= updated_at_timestamp - ) - - rows = conn.execute(stmt).all() - for row in rows: project_id = row._mapping["project_id"] keys = row._mapping["keys"] @@ -1242,7 +1153,7 @@ def process_project(project): feature_views=feature_views_proto, ) - project_and_related_feature_views: List[ + projects_and_related_feature_views: List[ RegistryServer_pb2.ExpediaProjectAndRelatedFeatureViews ] = [] @@ -1256,17 +1167,17 @@ def process_project(project): for future in as_completed(future_to_project): try: result = future.result() - project_and_related_feature_views.append(result) + projects_and_related_feature_views.append(result) except Exception as e: logger.error(f"Error processing project: {e}") # Sort the results by project name, which was lost during parallel processing - project_and_related_feature_views.sort( + projects_and_related_feature_views.sort( key=lambda x: x.project_metadata.project.lower() ) - return RegistryServer_pb2.ExpediaSearchAIWBTableResponse( - project_and_related_feature_views=project_and_related_feature_views, + return RegistryServer_pb2.ExpediaSearchProjectsResponse( + projects_and_related_feature_views=projects_and_related_feature_views, total_projects=total_count, total_page_indices=total_page_indices, ) @@ -1281,7 +1192,7 @@ def expedia_search_feature_views( updated_at: Optional[datetime] = None, page_size: int = 10, page_index: int = 0, - ) -> Tuple[List[FeatureView], int, int]: + ) -> RegistryServer_pb2.ExpediaSearchFeatureViewsResponse: """ Search for feature views based on the provided search parameters with pagination. """ @@ -1306,12 +1217,13 @@ def expedia_search_feature_views( rows = conn.execute(stmt).all() - results = [ - FeatureView.from_proto( - FeatureViewProto.FromString(row._mapping["feature_view_proto"]) + for row in rows: + feature_view_proto = FeatureViewProto.FromString( + row._mapping["feature_view_proto"] ) - for row in rows - ] + # for some reason, project is not set in the proto, so we set it here + feature_view_proto.spec.project = row._mapping["project_id"] + results.append(feature_view_proto) total_stmt = ( select(func.count()) @@ -1322,7 +1234,11 @@ def expedia_search_feature_views( total_page_indices = (total_count + page_size - 1) // page_size # early return to avoid fetching data again - return results, total_count, total_page_indices + return RegistryServer_pb2.ExpediaSearchFeatureViewsResponse( + feature_views=results, + total_feature_views=total_count, + total_page_indices=total_page_indices, + ) # Doing in-memory filtering below stmt = select(feature_views) @@ -1337,6 +1253,8 @@ def expedia_search_feature_views( feature_view_proto = FeatureViewProto.FromString( row._mapping["feature_view_proto"] ) + # for some reason, project is not set in the proto, so we set it here + feature_view_proto.spec.project = row._mapping["project_id"] add_to_results = True if online is not None and feature_view_proto.spec.online != online: @@ -1366,7 +1284,7 @@ def expedia_search_feature_views( add_to_results = False if add_to_results: - filtered_results.append(FeatureView.from_proto(feature_view_proto)) + filtered_results.append(feature_view_proto) # Calculate total filtered results total_count = len(filtered_results) @@ -1377,243 +1295,8 @@ def expedia_search_feature_views( # Apply pagination to the filtered results paginated_results = filtered_results[offset : offset + page_size] - return paginated_results, total_count, total_page_indices - - def expedia_search_aiwb_table_by_feature_view( - self, - search_text: Optional[str] = None, - online: Optional[bool] = None, - application: Optional[str] = None, - team: Optional[str] = None, - created_at: Optional[datetime] = None, - updated_at: Optional[datetime] = None, - page_size: int = 10, - page_index: int = 0, - ) -> RegistryServer_pb2.ExpediaSearchAIWBTableResponse: - """ - Search for feature views based on the provided search parameters with pagination. - Returns projects and their related feature views. - """ - offset = page_index * page_size - - # Determine if in-memory filtering is required - in_memory_filtering_required = any( - [online is not None, application, team, created_at, updated_at] - ) - - feature_views_by_project: Dict[str, List[FeatureViewProto]] = {} - total_projects_set: Set[str] = set() - - if not in_memory_filtering_required: - # No in-memory filtering required; perform filtering directly in SQL - with self.engine.begin() as conn: - # Count total number of distinct projects - count_stmt = select( - func.count(func.distinct(feature_views.c.project_id)) - ) - - if search_text: - count_stmt = count_stmt.where( - feature_views.c.feature_view_name.like(f"%{search_text}%") - ) - - total_projects = conn.execute(count_stmt).scalar() or 0 - total_page_indices = (total_projects + page_size - 1) // page_size - - # Get paginated list of project IDs - project_stmt = select(func.distinct(feature_views.c.project_id)) - - if search_text: - project_stmt = project_stmt.where( - feature_views.c.feature_view_name.like(f"%{search_text}%") - ) - - project_stmt = ( - project_stmt.order_by(feature_views.c.project_id) - .limit(page_size) - .offset(offset) - ) - project_ids = [row[0] for row in conn.execute(project_stmt).fetchall()] - - # Fetch feature views for the paginated projects - fv_stmt = select( - feature_views.c.project_id, - feature_views.c.feature_view_proto, - ).where(feature_views.c.project_id.in_(project_ids)) - - if search_text: - fv_stmt = fv_stmt.where( - feature_views.c.feature_view_name.like(f"%{search_text}%") - ) - - fv_rows = conn.execute(fv_stmt).fetchall() - - # Group feature views by project - for row in fv_rows: - project_id = row._mapping["project_id"] - feature_view_proto = FeatureViewProto.FromString( - row._mapping["feature_view_proto"] - ) - - if project_id not in feature_views_by_project: - feature_views_by_project[project_id] = [] - - feature_views_by_project[project_id].append(feature_view_proto) - total_projects_set.add(project_id) - else: - # In-memory filtering is required - with self.engine.begin() as conn: - stmt = select( - feature_views.c.project_id, - feature_views.c.feature_view_proto, - feature_views.c.last_updated_timestamp, - ) - - if search_text: - stmt = stmt.where( - feature_views.c.feature_view_name.like(f"%{search_text}%") - ) - - rows = conn.execute(stmt).fetchall() - - # Function to filter a single feature view - def filter_feature_view(row): - project_id = row._mapping["project_id"] - feature_view_proto = FeatureViewProto.FromString( - row._mapping["feature_view_proto"] - ) - - add_to_results = True - - if online is not None and feature_view_proto.spec.online != online: - add_to_results = False - - if ( - application - and feature_view_proto.spec.tags.get("application") != application - ): - add_to_results = False - - if team and feature_view_proto.spec.tags.get("team") != team: - add_to_results = False - - if ( - created_at - and feature_view_proto.meta.created_timestamp.ToDatetime() - < created_at - ): - add_to_results = False - - if ( - updated_at - and feature_view_proto.meta.last_updated_timestamp.ToDatetime() - < updated_at - ): - add_to_results = False - - if add_to_results: - return project_id, feature_view_proto - else: - return None - - # Use ThreadPoolExecutor to process feature views in parallel - with ThreadPoolExecutor() as executor: - futures = [executor.submit(filter_feature_view, row) for row in rows] - - for future in as_completed(futures): - result = future.result() - if result: - project_id, feature_view_proto = result - if project_id not in feature_views_by_project: - feature_views_by_project[project_id] = [] - feature_views_by_project[project_id].append(feature_view_proto) - total_projects_set.add(project_id) - - # After filtering, paginate the projects - total_projects = len(total_projects_set) - total_page_indices = (total_projects + page_size - 1) // page_size - - project_ids = sorted(feature_views_by_project.keys()) - paginated_project_ids = project_ids[offset : offset + page_size] - - # Adjust feature_views_by_project to only include paginated projects - feature_views_by_project = { - pid: feature_views_by_project[pid] for pid in paginated_project_ids - } - # Fetch project metadata for the paginated projects - with self.engine.begin() as conn: - metadata_stmt = ( - select( - feast_metadata.c.project_id, - feast_metadata.c.metadata_key, - feast_metadata.c.metadata_value, - func.max(feast_metadata.c.last_updated_timestamp).label( - "last_updated_timestamp" - ), - ) - .where(feast_metadata.c.project_id.in_(feature_views_by_project.keys())) - .group_by( - feast_metadata.c.project_id, - feast_metadata.c.metadata_key, - feast_metadata.c.metadata_value, - ) - ) - - metadata_rows = conn.execute(metadata_stmt).fetchall() - - # Build a mapping of project_id to ProjectMetadata - project_metadata_dict: Dict[str, ProjectMetadata] = {} - for row in metadata_rows: - project_id = row._mapping["project_id"] - metadata_key = row._mapping["metadata_key"] - metadata_value = row._mapping["metadata_value"] - last_updated_timestamp = row._mapping["last_updated_timestamp"] - - if project_id not in project_metadata_dict: - project_metadata_dict[project_id] = ProjectMetadata( - project_name=project_id, - last_updated_timestamp=datetime.utcfromtimestamp( - last_updated_timestamp - ), - ) - project_metadata = project_metadata_dict[project_id] - - if metadata_key == FeastMetadataKeys.PROJECT_UUID.value: - project_metadata.project_uuid = metadata_value - - # Prepare the response - project_and_related_feature_views: List[ - RegistryServer_pb2.ExpediaProjectAndRelatedFeatureViews - ] = [] - - # Use ThreadPoolExecutor to process projects in parallel - def process_project(project_id): - project_metadata = project_metadata_dict.get( - project_id, ProjectMetadata(project_name=project_id) - ) - feature_views_proto = feature_views_by_project[project_id] - project_metadata_proto = project_metadata.to_proto() - return RegistryServer_pb2.ExpediaProjectAndRelatedFeatureViews( - project_metadata=project_metadata_proto, - feature_views=feature_views_proto, - ) - - with ThreadPoolExecutor() as executor: - futures = [ - executor.submit(process_project, pid) - for pid in feature_views_by_project.keys() - ] - for future in as_completed(futures): - result = future.result() - project_and_related_feature_views.append(result) - - # Sort the results by project name - project_and_related_feature_views.sort( - key=lambda x: x.project_metadata.project.lower() - ) - - return RegistryServer_pb2.ExpediaSearchAIWBTableResponse( - project_and_related_feature_views=project_and_related_feature_views, - total_projects=total_projects, + return RegistryServer_pb2.ExpediaSearchFeatureViewsResponse( + feature_views=paginated_results, + total_feature_views=total_count, total_page_indices=total_page_indices, )