Skip to content

Commit

Permalink
Changes to support streamer
Browse files Browse the repository at this point in the history
* Allow for arbitrary number of Test Agents in a given test
* Update rust to 0.1.1 release
* Remove invoke_method, as it is L2 API
* Add few tests for streamer
  • Loading branch information
matthewd0123 committed Jul 18, 2024
1 parent 7f30d18 commit 6186617
Show file tree
Hide file tree
Showing 24 changed files with 324 additions and 687 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/tck-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ jobs:
java-version: '17'
distribution: 'temurin'
cache: maven
- name: Install dependencies
run: |
cd scripts
python build_up_java_latest.py
#- name: Install dependencies
# run: |
# cd scripts
# python build_up_java_latest.py
- name: Build up_client_socket_java with Maven
working-directory: up_client_socket/java
run: |
Expand Down Expand Up @@ -88,11 +88,11 @@ jobs:
if ("ue2" in feature){
for (var language_two in feature["ue2"]){
var second_ue = feature["ue2"][language_two]
var command_str = "behave --define uE1=" + port_language + " --define uE2=" + second_ue + " --define transport=" + port_transport + " --format json --outfile './reports/" + feature["feature_name"] + "_" + port_language + "_" + second_ue + ".json' --format html --outfile './reports/" + feature["feature_name"] + "_" + port_language + "_" + second_ue + ".html' './features/tests/" + feature["path"] + "/" + feature["feature_name"] + ".feature'"
var command_str = "behave --define uE1=" + port_language + " --define uE2=" + second_ue + " --define transport1=" + port_transport + " --define transport2=" + port_transport + " --format json --outfile './reports/" + feature["feature_name"] + "_" + port_language + "_" + second_ue + ".json' --format html --outfile './reports/" + feature["feature_name"] + "_" + port_language + "_" + second_ue + ".html' './features/tests/" + feature["path"] + "/" + feature["feature_name"] + ".feature'"
command_list.push(command_str);
}
} else {
var command_str = "behave --define uE1=" + port_language + " --define transport=" + port_transport + " --format json --outfile './reports/" + feature["feature_name"] + "_" + port_language + ".json' --format html --outfile './reports/" + feature["feature_name"] + "_" + port_language + ".html' './features/tests/" + feature["path"] + "/" + feature["feature_name"] + ".feature'"
var command_str = "behave --define uE1=" + port_language + " --define transport1=" + port_transport + " --format json --outfile './reports/" + feature["feature_name"] + "_" + port_language + ".json' --format html --outfile './reports/" + feature["feature_name"] + "_" + port_language + ".html' './features/tests/" + feature["path"] + "/" + feature["feature_name"] + ".feature'"
command_list.push(command_str);
}
}
Expand Down
5 changes: 5 additions & 0 deletions test_agent/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<artifactId>json</artifactId>
<version>20231013</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>
<build>
<finalName>tck-test-agent-java</finalName>
Expand Down
60 changes: 35 additions & 25 deletions test_agent/java/src/main/java/org/eclipse/uprotocol/TestAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

package org.eclipse.uprotocol;

import org.apache.commons.cli.*;


import com.google.gson.Gson;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
Expand All @@ -46,7 +49,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand All @@ -67,12 +69,13 @@ public class TestAgent {
private static final UListener listener = TestAgent::handleOnReceive;
private static final Gson gson = new Gson();
private static final UUri RESPONSE_URI;
private static String transportName;
private static String sdkName;

static {
actionHandlers.put(ActionCommands.SEND_COMMAND, TestAgent::handleSendCommand);
actionHandlers.put(ActionCommands.REGISTER_LISTENER_COMMAND, TestAgent::handleRegisterListenerCommand);
actionHandlers.put(ActionCommands.UNREGISTER_LISTENER_COMMAND, TestAgent::handleUnregisterListenerCommand);
actionHandlers.put(ActionCommands.INVOKE_METHOD_COMMAND, TestAgent::handleInvokeMethodCommand);
actionHandlers.put(ActionCommands.SERIALIZE_URI, TestAgent::handleSerializeUriCommand);
actionHandlers.put(ActionCommands.DESERIALIZE_URI, TestAgent::handleDeserializeUriCommand);
actionHandlers.put(ActionCommands.VALIDATE_URI, TestAgent::handleValidateUriCommand);
Expand Down Expand Up @@ -134,7 +137,7 @@ private static void sendToTestManager(Message proto, String action, String recei

private static void writeDataToTMSocket(JSONObject responseDict, String action) {
responseDict.put("action", action);
responseDict.put("ue", "java");
responseDict.put("ue", sdkName);
try {
OutputStream outputStream = clientSocket.getOutputStream();
outputStream.write(responseDict.toString().getBytes(StandardCharsets.UTF_8));
Expand Down Expand Up @@ -165,27 +168,6 @@ private static CompletionStage<UStatus> handleUnregisterListenerCommand(Map<Stri
return transport.unregisterListener(uri, listener);
}

private static Object handleInvokeMethodCommand(Map<String, Object> jsonData) {
Map<String, Object> data = (Map<String, Object>) jsonData.get("data");
// Convert data and payload to protocol buffers
UUri uri = (UUri) ProtoConverter.dictToProto(data, UUri.newBuilder());
String payload = (String) data.get("payload");
ByteString value = null;
if (payload instanceof String && (payload).startsWith("BYTES:")) {
String byteString = (payload).substring(6); // Remove 'BYTES:' prefix
value = ByteString.copyFromUtf8(byteString);
} else if (payload instanceof String) {
value = ByteString.copyFromUtf8(payload);
}

UPayload setPayload = new UPayload(value, UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY);
CompletionStage<UPayload> responseFuture = transport.invokeMethod(uri, setPayload, CallOptions.DEFAULT);
responseFuture.whenComplete((responseMessage, exception) -> {
sendToTestManager(Map.of("payload", responseMessage.data().toStringUtf8()), ActionCommands.INVOKE_METHOD_COMMAND, (String) jsonData.get("test_id"));
});
return null;
}

private static Object handleSerializeUriCommand(Map<String, Object> jsonData) {
Map<String, Object> data = (Map<String, Object>) jsonData.get("data");
UUri uri = (UUri) ProtoConverter.dictToProto(data, UUri.newBuilder());
Expand Down Expand Up @@ -497,7 +479,35 @@ private static void handleOnReceive(UMessage uMessage) {

}

@SuppressWarnings("null")
public static void main(String[] args) {

Options options = new Options();

Option input = new Option("t", "transport", true, "Select Transport");
input.setRequired(true);
options.addOption(input);

Option output = new Option("s", "sdkname", true, "Select SDK Name");
output.setRequired(true);
options.addOption(output);

CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
CommandLine cmd = null;//not a good practice, it serves it purpose

try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
System.out.println(e.getMessage());
formatter.printHelp("utility-name", options);

System.exit(1);
}

transportName = cmd.getOptionValue("transport");
sdkName = cmd.getOptionValue("sdkname");

Thread receiveThread = new Thread(() -> {
try {
receiveFromTM();
Expand All @@ -509,7 +519,7 @@ public static void main(String[] args) {
});
receiveThread.start();
JSONObject obj = new JSONObject();
obj.put("SDK_name", "java");
obj.put("SDK_name", sdkName);
sendToTestManager(obj, "initialize");
}

Expand Down
50 changes: 22 additions & 28 deletions test_agent/python/testagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import socket
import sys
import time
from concurrent.futures import Future
from argparse import ArgumentParser
from datetime import datetime, timezone
from threading import Thread
from typing import Any, Dict, List, Union
Expand All @@ -31,7 +31,6 @@
from google.protobuf.descriptor import FieldDescriptor
from google.protobuf.message import Message
from google.protobuf.wrappers_pb2 import StringValue
from uprotocol.communication.calloptions import CallOptions
from uprotocol.communication.upayload import UPayload
from uprotocol.transport.builder.umessagebuilder import UMessageBuilder
from uprotocol.transport.ulistener import UListener
Expand Down Expand Up @@ -63,6 +62,8 @@

RESPONSE_URI = UUri(ue_id=1, ue_version_major=1, resource_id=0)

sdkname = "python"


class SocketUListener(UListener):
def on_receive(self, umsg: UMessage) -> None:
Expand Down Expand Up @@ -135,7 +136,7 @@ def send_to_test_manager(
response_dict = {
"data": response,
"action": action,
"ue": "python",
"ue": sdkname,
"test_id": received_test_id,
}
response_dict = json.dumps(response_dict).encode("utf-8")
Expand Down Expand Up @@ -198,27 +199,6 @@ def handle_unregister_listener_command(json_msg):
return transport.unregister_listener(uri, listener)


def handle_invoke_method_command(json_msg):
uri = dict_to_proto(json_msg["data"], UUri())
logger.info(json_msg["data"]["payload"])
value = json_msg["data"]["payload"]
if isinstance(value, str) and "BYTES:" in value:
value = value.replace("BYTES:", "")
value = value.encode("utf-8")
payload = UPayload(data=value)
res_future: Future = transport.invoke_method(uri, payload, CallOptions(timeout=10000))

def handle_response(message):
message: Message = message.result()
send_to_test_manager(
message,
actioncommands.INVOKE_METHOD_COMMAND,
received_test_id=json_msg["test_id"],
)

res_future.add_done_callback(handle_response)


def handle_serialize_uuri(json_msg: Dict[str, Any]):
uri: UUri = dict_to_proto(json_msg["data"], UUri())
serialized_uuri: str = UriSerializer.serialize(uri).lower()
Expand Down Expand Up @@ -425,7 +405,6 @@ def handle_uattributes_validate_command(json_msg: Dict[str, Any]):
actioncommands.SEND_COMMAND: handle_send_command,
actioncommands.REGISTER_LISTENER_COMMAND: handle_register_listener_command,
actioncommands.UNREGISTER_LISTENER_COMMAND: handle_unregister_listener_command,
actioncommands.INVOKE_METHOD_COMMAND: handle_invoke_method_command,
actioncommands.SERIALIZE_URI: handle_serialize_uuri,
actioncommands.DESERIALIZE_URI: handle_deserialize_uri,
actioncommands.SERIALIZE_UUID: handle_serialize_uuid,
Expand Down Expand Up @@ -459,10 +438,25 @@ def receive_from_tm():


if __name__ == "__main__":
listener = SocketUListener()
transport = SocketUTransport(RESPONSE_URI)
parser = ArgumentParser()

parser.add_argument("-t", "--transport", dest="transport", help="Select Transport", metavar="TRANSPORT")

parser.add_argument("-s", "--sdkname", dest="sdkname", help="Write SDK Name", metavar="SDKNAME")

args = parser.parse_args()

if args.transport == "socket":
listener = SocketUListener()
transport = SocketUTransport(RESPONSE_URI)
else:
raise ValueError("Invalid Transport")

if args.sdkname is not None:
sdkname = args.sdkname

ta_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ta_socket.connect(constants.TEST_MANAGER_ADDR)
thread = Thread(target=receive_from_tm)
thread.start()
send_to_test_manager({"SDK_name": "python"}, "initialize")
send_to_test_manager({"SDK_name": sdkname}, "initialize")
Loading

0 comments on commit 6186617

Please sign in to comment.