diff --git a/test_agent/java/src/main/java/org/eclipse/uprotocol/TestAgent.java b/test_agent/java/src/main/java/org/eclipse/uprotocol/TestAgent.java index 530105d6..92c41cc2 100644 --- a/test_agent/java/src/main/java/org/eclipse/uprotocol/TestAgent.java +++ b/test_agent/java/src/main/java/org/eclipse/uprotocol/TestAgent.java @@ -66,6 +66,7 @@ public class TestAgent { private static final Logger logger = Logger.getLogger("JavaTestAgent"); private static final UListener listener = TestAgent::handleOnReceive; private static final Gson gson = new Gson(); + private static final UUri RESPONSE_URI; static { actionHandlers.put(ActionCommands.SEND_COMMAND, TestAgent::handleSendCommand); @@ -81,9 +82,13 @@ public class TestAgent { actionHandlers.put(ActionCommands.VALIDATE_UATTRIBUTES, TestAgent::handleUAttributesValidateCommand); } + static { + RESPONSE_URI = UUri.newBuilder().setUeId(1).setUeVersionMajor(1).setResourceId(0).build(); + } + static { try { - transport = new SocketUTransport(); + transport = new SocketUTransport(RESPONSE_URI); clientSocket = new Socket(Constant.TEST_MANAGER_IP, Constant.TEST_MANAGER_PORT); } catch (IOException e) { throw new RuntimeException(e); diff --git a/test_agent/python/testagent.py b/test_agent/python/testagent.py index 3f7c431f..6ef8f243 100644 --- a/test_agent/python/testagent.py +++ b/test_agent/python/testagent.py @@ -61,6 +61,8 @@ logger = logging.getLogger("File:Line# Debugger") logger.setLevel(logging.DEBUG) +RESPONSE_URI = UUri(ue_id=1, ue_version_major=1, resource_id=0) + class SocketUListener(UListener): def on_receive(self, umsg: UMessage) -> None: @@ -458,7 +460,7 @@ def receive_from_tm(): if __name__ == "__main__": listener = SocketUListener() - transport = SocketUTransport() + transport = SocketUTransport(RESPONSE_URI) ta_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ta_socket.connect(constants.TEST_MANAGER_ADDR) thread = Thread(target=receive_from_tm) diff --git a/up_client_socket/java/src/main/java/org/eclipse/uprotocol/SocketUTransport.java b/up_client_socket/java/src/main/java/org/eclipse/uprotocol/SocketUTransport.java index 1ab38f09..631a89a4 100644 --- a/up_client_socket/java/src/main/java/org/eclipse/uprotocol/SocketUTransport.java +++ b/up_client_socket/java/src/main/java/org/eclipse/uprotocol/SocketUTransport.java @@ -45,19 +45,16 @@ public class SocketUTransport implements UTransport, RpcClient { private static final String DISPATCHER_IP = "127.0.0.1"; private static final Integer DISPATCHER_PORT = 44444; private static final int BYTES_MSG_LENGTH = 32767; - private static final UUri RESPONSE_URI; - - static { - RESPONSE_URI = UUri.newBuilder().setUeId(1).setUeVersionMajor(1).setResourceId(0).build(); - } private final Socket socket; private final ConcurrentHashMap> reqid_to_future; private final ConcurrentHashMap> uri_to_listener; private final Object lock = new Object(); + private UUri source; - public SocketUTransport() throws IOException { + public SocketUTransport(UUri newSource) throws IOException { + source = newSource; reqid_to_future = new ConcurrentHashMap<>(); uri_to_listener = new ConcurrentHashMap<>(); socket = new Socket(DISPATCHER_IP, DISPATCHER_PORT); @@ -246,7 +243,7 @@ public CompletionStage unregisterListener(UUri sourceFilter, UUri sinkF * @return A CompletableFuture that will hold the response message for the request. */ public CompletionStage invokeMethod(UUri methodUri, UPayload requestPayload, CallOptions options) { - UMessage umsg = UMessageBuilder.request(RESPONSE_URI, methodUri, options.timeout()).build(requestPayload); + UMessage umsg = UMessageBuilder.request(source, methodUri, options.timeout()).build(requestPayload); UUID requestId = umsg.getAttributes().getId(); CompletionStage responseFuture = new CompletableFuture<>(); reqid_to_future.put(requestId, responseFuture); @@ -298,6 +295,6 @@ public void close() { * @return The source. */ public UUri getSource() { - return RESPONSE_URI; + return source; } } diff --git a/up_client_socket/python/socket_transport.py b/up_client_socket/python/socket_transport.py index cfb02d59..2fa4dade 100644 --- a/up_client_socket/python/socket_transport.py +++ b/up_client_socket/python/socket_transport.py @@ -41,7 +41,6 @@ logger = logging.getLogger(__name__) DISPATCHER_ADDR: tuple = ("127.0.0.1", 44444) BYTES_MSG_LENGTH: int = 32767 -RESPONSE_URI = UUri(ue_id=1, ue_version_major=1, resource_id=0) def timeout_counter(response, req_id, timeout): @@ -59,11 +58,13 @@ def timeout_counter(response, req_id, timeout): class SocketUTransport(UTransport, RpcClient): - def __init__(self): + def __init__(self, source: UUri): """ Creates a uEntity with Socket Connection, as well as a map of registered topics. + param source: The URI associated with the UTransport. """ + self.source = source self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect(DISPATCHER_ADDR) @@ -189,7 +190,7 @@ def invoke_method(self, method_uri: UUri, request_payload: UPayload, options: Ca """ Invokes a method with the provided URI, request payload, and options. """ - umsg = UMessageBuilder.request(RESPONSE_URI, method_uri, options.timeout).build_from_upayload(request_payload) + umsg = UMessageBuilder.request(self.source, method_uri, options.timeout).build_from_upayload(request_payload) # Get uAttributes's request id request_id = umsg.attributes.id @@ -207,4 +208,11 @@ def get_source(self) -> UUri: """ Returns the source URI of the UTransport. """ - return RESPONSE_URI + return self.source + + def close(self): + """ + Closes the socket connection. + """ + self.socket.close() + logger.info(f"{self.__class__.__name__} Socket Connection Closed")