Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): Return the correct port in coordinator's API #4312

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/flex-interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
# install gsctl
python3 -m pip install ${GITHUB_WORKSPACE}/python/dist/*.whl
# launch service: 8080 for coordinator http port; 7687 for cypher port;
gsctl instance deploy --type interactive --image-registry graphscope --image-tag latest --interactive-config ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
gsctl instance deploy --type interactive --image-registry graphscope --image-tag latest --cypher-port 7688 --interactive-config ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
sleep 20
# test
python3 -m pip install --no-cache-dir pytest pytest-cov pytest-timeout pytest-xdist
Expand All @@ -66,6 +66,10 @@ jobs:
--exitfirst \
$(dirname $(python3 -c "import graphscope.gsctl as gsctl; print(gsctl.__file__)"))/tests/test_interactive.py

# test coordinator
res=`curl http://127.0.0.1:8080/api/v1/service`
echo $res | grep 7688 || exit 1

# destroy instance
gsctl instance destroy --type interactive -y

Expand Down
2 changes: 2 additions & 0 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from graphscope.config import Config
from graphscope.proto import coordinator_service_pb2_grpc

from gscoordinator.flex.core.client_wrapper import initialize_client_wrapper
from gscoordinator.flex.encoder import JSONEncoder
from gscoordinator.monitor import Monitor
from gscoordinator.servicer import init_graphscope_one_service_servicer
Expand Down Expand Up @@ -125,6 +126,7 @@ def get_servicer(config: Config):


def start_http_service(config):
initialize_client_wrapper(config)
app = connexion.App(__name__, specification_dir="./flex/openapi/")
app.app.json_encoder = JSONEncoder
app.add_api(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from gscoordinator.flex.models.schema_mapping import SchemaMapping # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -26,7 +26,7 @@ def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501
"""
if connexion.request.is_json:
schema_mapping = SchemaMapping.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.bind_datasource_in_batch(graph_id, schema_mapping)
return get_client_wrapper().bind_datasource_in_batch(graph_id, schema_mapping)


@handle_api_exception()
Expand All @@ -40,7 +40,7 @@ def get_datasource_by_id(graph_id): # noqa: E501

:rtype: Union[SchemaMapping, Tuple[SchemaMapping, int], Tuple[SchemaMapping, int, Dict[str, str]]
"""
return client_wrapper.get_datasource_by_id(graph_id)
return get_client_wrapper().get_datasource_by_id(graph_id)


@handle_api_exception()
Expand All @@ -60,7 +60,7 @@ def unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_

:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_vertex_type)
return get_client_wrapper().unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_vertex_type)


@handle_api_exception()
Expand All @@ -76,4 +76,4 @@ def unbind_vertex_datasource(graph_id, type_name): # noqa: E501

:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.unbind_vertex_datasource(graph_id, type_name)
return get_client_wrapper().unbind_vertex_datasource(graph_id, type_name)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from gscoordinator.flex.models.running_deployment_status import RunningDeploymentStatus # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -24,7 +24,7 @@ def get_deployment_info(): # noqa: E501

:rtype: Union[RunningDeploymentInfo, Tuple[RunningDeploymentInfo, int], Tuple[RunningDeploymentInfo, int, Dict[str, str]]
"""
return client_wrapper.get_deployment_info()
return get_client_wrapper().get_deployment_info()


@handle_api_exception()
Expand All @@ -42,7 +42,7 @@ def get_deployment_pod_log(pod_name, component, from_cache): # noqa: E501

:rtype: Union[GetPodLogResponse, Tuple[GetPodLogResponse, int], Tuple[GetPodLogResponse, int, Dict[str, str]]
"""
return client_wrapper.get_deployment_pod_log(pod_name, component, from_cache)
return get_client_wrapper().get_deployment_pod_log(pod_name, component, from_cache)


@handle_api_exception()
Expand All @@ -54,7 +54,7 @@ def get_deployment_resource_usage(): # noqa: E501

:rtype: Union[GetResourceUsageResponse, Tuple[GetResourceUsageResponse, int], Tuple[GetResourceUsageResponse, int, Dict[str, str]]
"""
return client_wrapper.get_deployment_resource_usage()
return get_client_wrapper().get_deployment_resource_usage()


@handle_api_exception()
Expand All @@ -66,7 +66,7 @@ def get_deployment_status(): # noqa: E501

:rtype: Union[RunningDeploymentStatus, Tuple[RunningDeploymentStatus, int], Tuple[RunningDeploymentStatus, int, Dict[str, str]]
"""
return client_wrapper.get_deployment_status()
return get_client_wrapper().get_deployment_status()


@handle_api_exception()
Expand All @@ -78,4 +78,4 @@ def get_storage_usage(): # noqa: E501

:rtype: Union[GetStorageUsageResponse, Tuple[GetStorageUsageResponse, int], Tuple[GetStorageUsageResponse, int, Dict[str, str]]
"""
return client_wrapper.get_storage_usage()
return get_client_wrapper().get_storage_usage()
22 changes: 11 additions & 11 deletions coordinator/gscoordinator/flex/controllers/graph_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from gscoordinator.flex.models.get_graph_schema_response import GetGraphSchemaResponse # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -32,7 +32,7 @@ def create_edge_type(graph_id, create_edge_type=None): # noqa: E501
"""
if connexion.request.is_json:
create_edge_type = CreateEdgeType.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.create_edge_type(graph_id, create_edge_type)
return get_client_wrapper().create_edge_type(graph_id, create_edge_type)


@handle_api_exception()
Expand All @@ -48,7 +48,7 @@ def create_graph(create_graph_request): # noqa: E501
"""
if connexion.request.is_json:
create_graph_request = CreateGraphRequest.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.create_graph(create_graph_request)
return get_client_wrapper().create_graph(create_graph_request)


@handle_api_exception()
Expand All @@ -66,7 +66,7 @@ def create_vertex_type(graph_id, create_vertex_type): # noqa: E501
"""
if connexion.request.is_json:
create_vertex_type = CreateVertexType.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.create_vertex_type(graph_id, create_vertex_type)
return get_client_wrapper().create_vertex_type(graph_id, create_vertex_type)


@handle_api_exception()
Expand All @@ -86,7 +86,7 @@ def delete_edge_type_by_name(graph_id, type_name, source_vertex_type, destinatio

:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.delete_edge_type_by_name(
return get_client_wrapper().delete_edge_type_by_name(
graph_id, type_name, source_vertex_type, destination_vertex_type
)

Expand All @@ -102,7 +102,7 @@ def delete_graph_by_id(graph_id): # noqa: E501

:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.delete_graph_by_id(graph_id)
return get_client_wrapper().delete_graph_by_id(graph_id)


@handle_api_exception()
Expand All @@ -118,7 +118,7 @@ def delete_vertex_type_by_name(graph_id, type_name): # noqa: E501

:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.delete_vertex_type_by_name(graph_id, type_name)
return get_client_wrapper().delete_vertex_type_by_name(graph_id, type_name)


@handle_api_exception()
Expand All @@ -132,7 +132,7 @@ def get_graph_by_id(graph_id): # noqa: E501

:rtype: Union[GetGraphResponse, Tuple[GetGraphResponse, int], Tuple[GetGraphResponse, int, Dict[str, str]]
"""
return client_wrapper.get_graph_by_id(graph_id)
return get_client_wrapper().get_graph_by_id(graph_id)


@handle_api_exception()
Expand All @@ -146,7 +146,7 @@ def get_schema_by_id(graph_id): # noqa: E501

:rtype: Union[GetGraphSchemaResponse, Tuple[GetGraphSchemaResponse, int], Tuple[GetGraphSchemaResponse, int, Dict[str, str]]
"""
return client_wrapper.get_schema_by_id(graph_id)
return get_client_wrapper().get_schema_by_id(graph_id)


@handle_api_exception()
Expand All @@ -164,7 +164,7 @@ def import_schema_by_id(graph_id, create_graph_schema_request): # noqa: E501
"""
if connexion.request.is_json:
create_graph_schema_request = CreateGraphSchemaRequest.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.import_schema(graph_id, create_graph_schema_request)
return get_client_wrapper().import_schema(graph_id, create_graph_schema_request)


@handle_api_exception()
Expand All @@ -176,4 +176,4 @@ def list_graphs(): # noqa: E501

:rtype: Union[List[GetGraphResponse], Tuple[List[GetGraphResponse], int], Tuple[List[GetGraphResponse], int, Dict[str, str]]
"""
return client_wrapper.list_graphs()
return get_client_wrapper().list_graphs()
12 changes: 6 additions & 6 deletions coordinator/gscoordinator/flex/controllers/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from gscoordinator.flex.models.job_status import JobStatus # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -27,7 +27,7 @@ def delete_job_by_id(job_id, delete_scheduler=None): # noqa: E501

:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.delete_job_by_id(job_id, delete_scheduler)
return get_client_wrapper().delete_job_by_id(job_id, delete_scheduler)


@handle_api_exception()
Expand All @@ -45,7 +45,7 @@ def get_dataloading_job_config(graph_id, dataloading_job_config): # noqa: E501
"""
if connexion.request.is_json:
dataloading_job_config = DataloadingJobConfig.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.get_dataloading_job_config(graph_id, dataloading_job_config)
return get_client_wrapper().get_dataloading_job_config(graph_id, dataloading_job_config)


@handle_api_exception()
Expand All @@ -59,7 +59,7 @@ def get_job_by_id(job_id): # noqa: E501

:rtype: Union[JobStatus, Tuple[JobStatus, int], Tuple[JobStatus, int, Dict[str, str]]
"""
return client_wrapper.get_job_by_id(job_id)
return get_client_wrapper().get_job_by_id(job_id)


@handle_api_exception()
Expand All @@ -71,7 +71,7 @@ def list_jobs(): # noqa: E501

:rtype: Union[List[JobStatus], Tuple[List[JobStatus], int], Tuple[List[JobStatus], int, Dict[str, str]]
"""
return client_wrapper.list_jobs()
return get_client_wrapper().list_jobs()


@handle_api_exception()
Expand All @@ -89,4 +89,4 @@ def submit_dataloading_job(graph_id, dataloading_job_config): # noqa: E501
"""
if connexion.request.is_json:
dataloading_job_config = DataloadingJobConfig.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.submit_dataloading_job(graph_id, dataloading_job_config)
return get_client_wrapper().submit_dataloading_job(graph_id, dataloading_job_config)
12 changes: 6 additions & 6 deletions coordinator/gscoordinator/flex/controllers/service_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from gscoordinator.flex.models.start_service_request import StartServiceRequest # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -22,7 +22,7 @@ def get_service_status_by_id(graph_id): # noqa: E501

:rtype: Union[ServiceStatus, Tuple[ServiceStatus, int], Tuple[ServiceStatus, int, Dict[str, str]]
"""
return client_wrapper.get_service_status_by_id(graph_id)
return get_client_wrapper().get_service_status_by_id(graph_id)


@handle_api_exception()
Expand All @@ -34,7 +34,7 @@ def list_service_status(): # noqa: E501

:rtype: Union[List[ServiceStatus], Tuple[List[ServiceStatus], int], Tuple[List[ServiceStatus], int, Dict[str, str]]
"""
return client_wrapper.list_service_status()
return get_client_wrapper().list_service_status()


@handle_api_exception()
Expand All @@ -46,7 +46,7 @@ def restart_service(): # noqa: E501

:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.restart_service()
return get_client_wrapper().restart_service()


@handle_api_exception()
Expand All @@ -62,7 +62,7 @@ def start_service(start_service_request=None): # noqa: E501
"""
if connexion.request.is_json:
start_service_request = StartServiceRequest.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.start_service(start_service_request)
return get_client_wrapper().start_service(start_service_request)


@handle_api_exception()
Expand All @@ -74,4 +74,4 @@ def stop_service(): # noqa: E501

:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.stop_service()
return get_client_wrapper().stop_service()
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from gscoordinator.flex.models.update_stored_proc_request import UpdateStoredProcRequest # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -29,7 +29,7 @@ def create_stored_procedure(graph_id, create_stored_proc_request): # noqa: E501
"""
if connexion.request.is_json:
create_stored_proc_request = CreateStoredProcRequest.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.create_stored_procedure(graph_id, create_stored_proc_request)
return get_client_wrapper().create_stored_procedure(graph_id, create_stored_proc_request)


@handle_api_exception()
Expand All @@ -45,7 +45,7 @@ def delete_stored_procedure_by_id(graph_id, stored_procedure_id): # noqa: E501

:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.delete_stored_procedure_by_id(graph_id, stored_procedure_id)
return get_client_wrapper().delete_stored_procedure_by_id(graph_id, stored_procedure_id)


@handle_api_exception()
Expand All @@ -61,7 +61,7 @@ def get_stored_procedure_by_id(graph_id, stored_procedure_id): # noqa: E501

:rtype: Union[GetStoredProcResponse, Tuple[GetStoredProcResponse, int], Tuple[GetStoredProcResponse, int, Dict[str, str]]
"""
return client_wrapper.get_stored_procedure_by_id(graph_id, stored_procedure_id)
return get_client_wrapper().get_stored_procedure_by_id(graph_id, stored_procedure_id)


@handle_api_exception()
Expand All @@ -75,7 +75,7 @@ def list_stored_procedures(graph_id): # noqa: E501

:rtype: Union[List[GetStoredProcResponse], Tuple[List[GetStoredProcResponse], int], Tuple[List[GetStoredProcResponse], int, Dict[str, str]]
"""
return client_wrapper.list_stored_procedures(graph_id)
return get_client_wrapper().list_stored_procedures(graph_id)


@handle_api_exception()
Expand All @@ -95,4 +95,4 @@ def update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_p
"""
if connexion.request.is_json:
update_stored_proc_request = UpdateStoredProcRequest.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_proc_request)
return get_client_wrapper().update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_proc_request)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from gscoordinator.flex.models.upload_file_response import UploadFileResponse # noqa: E501
from gscoordinator.flex import util

from gscoordinator.flex.core import client_wrapper
from gscoordinator.flex.core import get_client_wrapper
from gscoordinator.flex.core import handle_api_exception


Expand All @@ -22,4 +22,4 @@ def upload_file(filestorage=None): # noqa: E501

:rtype: Union[UploadFileResponse, Tuple[UploadFileResponse, int], Tuple[UploadFileResponse, int, Dict[str, str]]
"""
return client_wrapper.upload_file(filestorage)
return get_client_wrapper().upload_file(filestorage)
Loading
Loading