diff --git a/.gitignore b/.gitignore index 9eca9b6..378eac2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1 @@ build -bin diff --git a/agent/agent.pb.go b/agent/agent.pb.go index 7156c53..2f1e281 100644 --- a/agent/agent.pb.go +++ b/agent/agent.pb.go @@ -1,12 +1,13 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.23.3 +// protoc-gen-go v1.25.0 +// protoc v3.12.4 // source: agent/agent.proto package agent import ( + proto "github.com/golang/protobuf/proto" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" @@ -20,6 +21,10 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + type RunRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -213,7 +218,7 @@ type DataRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Dataset string `protobuf:"bytes,1,opt,name=dataset,proto3" json:"dataset,omitempty"` + Dataset []byte `protobuf:"bytes,1,opt,name=dataset,proto3" json:"dataset,omitempty"` } func (x *DataRequest) Reset() { @@ -248,11 +253,11 @@ func (*DataRequest) Descriptor() ([]byte, []int) { return file_agent_agent_proto_rawDescGZIP(), []int{4} } -func (x *DataRequest) GetDataset() string { +func (x *DataRequest) GetDataset() []byte { if x != nil { return x.Dataset } - return "" + return nil } type DataResponse struct { @@ -405,7 +410,7 @@ var file_agent_agent_proto_rawDesc = []byte{ 0x72, 0x69, 0x74, 0x68, 0x6d, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x49, 0x44, 0x22, 0x27, 0x0a, 0x0b, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x64, 0x61, 0x74, - 0x61, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x64, 0x61, 0x74, 0x61, + 0x61, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x22, 0x2c, 0x0a, 0x0c, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x49, diff --git a/agent/agent.proto b/agent/agent.proto index 0050da5..2788cda 100644 --- a/agent/agent.proto +++ b/agent/agent.proto @@ -19,12 +19,10 @@ message AlgoRequest { bytes algorithm = 1; } message AlgoResponse { string algorithmID = 1; } -message DataRequest { string dataset = 1; } +message DataRequest { bytes dataset = 1; } message DataResponse { string datasetID = 1; } message ResultRequest {} -message ResultResponse { - bytes file = 1; -} +message ResultResponse { bytes file = 1; } diff --git a/agent/agent_grpc.pb.go b/agent/agent_grpc.pb.go index dba8111..ca209c9 100644 --- a/agent/agent_grpc.pb.go +++ b/agent/agent_grpc.pb.go @@ -1,8 +1,4 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 -// source: agent/agent.proto package agent @@ -18,13 +14,6 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 -const ( - AgentService_Run_FullMethodName = "/agent.AgentService/Run" - AgentService_Algo_FullMethodName = "/agent.AgentService/Algo" - AgentService_Data_FullMethodName = "/agent.AgentService/Data" - AgentService_Result_FullMethodName = "/agent.AgentService/Result" -) - // AgentServiceClient is the client API for AgentService service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -45,7 +34,7 @@ func NewAgentServiceClient(cc grpc.ClientConnInterface) AgentServiceClient { func (c *agentServiceClient) Run(ctx context.Context, in *RunRequest, opts ...grpc.CallOption) (*RunResponse, error) { out := new(RunResponse) - err := c.cc.Invoke(ctx, AgentService_Run_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/agent.AgentService/Run", in, out, opts...) if err != nil { return nil, err } @@ -54,7 +43,7 @@ func (c *agentServiceClient) Run(ctx context.Context, in *RunRequest, opts ...gr func (c *agentServiceClient) Algo(ctx context.Context, in *AlgoRequest, opts ...grpc.CallOption) (*AlgoResponse, error) { out := new(AlgoResponse) - err := c.cc.Invoke(ctx, AgentService_Algo_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/agent.AgentService/Algo", in, out, opts...) if err != nil { return nil, err } @@ -63,7 +52,7 @@ func (c *agentServiceClient) Algo(ctx context.Context, in *AlgoRequest, opts ... func (c *agentServiceClient) Data(ctx context.Context, in *DataRequest, opts ...grpc.CallOption) (*DataResponse, error) { out := new(DataResponse) - err := c.cc.Invoke(ctx, AgentService_Data_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/agent.AgentService/Data", in, out, opts...) if err != nil { return nil, err } @@ -72,7 +61,7 @@ func (c *agentServiceClient) Data(ctx context.Context, in *DataRequest, opts ... func (c *agentServiceClient) Result(ctx context.Context, in *ResultRequest, opts ...grpc.CallOption) (*ResultResponse, error) { out := new(ResultResponse) - err := c.cc.Invoke(ctx, AgentService_Result_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/agent.AgentService/Result", in, out, opts...) if err != nil { return nil, err } @@ -129,7 +118,7 @@ func _AgentService_Run_Handler(srv interface{}, ctx context.Context, dec func(in } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: AgentService_Run_FullMethodName, + FullMethod: "/agent.AgentService/Run", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(AgentServiceServer).Run(ctx, req.(*RunRequest)) @@ -147,7 +136,7 @@ func _AgentService_Algo_Handler(srv interface{}, ctx context.Context, dec func(i } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: AgentService_Algo_FullMethodName, + FullMethod: "/agent.AgentService/Algo", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(AgentServiceServer).Algo(ctx, req.(*AlgoRequest)) @@ -165,7 +154,7 @@ func _AgentService_Data_Handler(srv interface{}, ctx context.Context, dec func(i } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: AgentService_Data_FullMethodName, + FullMethod: "/agent.AgentService/Data", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(AgentServiceServer).Data(ctx, req.(*DataRequest)) @@ -183,7 +172,7 @@ func _AgentService_Result_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: AgentService_Result_FullMethodName, + FullMethod: "/agent.AgentService/Result", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(AgentServiceServer).Result(ctx, req.(*ResultRequest)) diff --git a/agent/api/grpc/requests.go b/agent/api/grpc/requests.go index a2f7a95..0532f9e 100644 --- a/agent/api/grpc/requests.go +++ b/agent/api/grpc/requests.go @@ -25,7 +25,7 @@ func (req algoReq) validate() error { } type dataReq struct { - Dataset string `protobuf:"bytes,1,opt,name=dataset,proto3" json:"dataset,omitempty"` + Dataset []byte `protobuf:"bytes,1,opt,name=dataset,proto3" json:"dataset,omitempty"` } func (req dataReq) validate() error { diff --git a/agent/api/grpc/responses.go b/agent/api/grpc/responses.go index 63d6aae..45e88e4 100644 --- a/agent/api/grpc/responses.go +++ b/agent/api/grpc/responses.go @@ -13,5 +13,5 @@ type dataRes struct { } type resultRes struct { - File []byte `json:"-"` + File []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` } diff --git a/agent/api/logging.go b/agent/api/logging.go index 31356cf..35bf66a 100644 --- a/agent/api/logging.go +++ b/agent/api/logging.go @@ -53,7 +53,7 @@ func (lm *loggingMiddleware) Algo(ctx context.Context, algorithm []byte) (respon return lm.svc.Algo(ctx, algorithm) } -func (lm *loggingMiddleware) Data(ctx context.Context, dataset string) (response string, err error) { +func (lm *loggingMiddleware) Data(ctx context.Context, dataset []byte) (response string, err error) { defer func(begin time.Time) { message := fmt.Sprintf("Method Data took %s to complete", time.Since(begin)) if err != nil { diff --git a/agent/api/metrics.go b/agent/api/metrics.go index 9c73de2..cd848bb 100644 --- a/agent/api/metrics.go +++ b/agent/api/metrics.go @@ -50,7 +50,7 @@ func (ms *metricsMiddleware) Algo(ctx context.Context, algorithm []byte) (string return ms.svc.Algo(ctx, algorithm) } -func (ms *metricsMiddleware) Data(ctx context.Context, dataset string) (string, error) { +func (ms *metricsMiddleware) Data(ctx context.Context, dataset []byte) (string, error) { defer func(begin time.Time) { ms.counter.With("method", "data").Add(1) ms.latency.With("method", "data").Observe(time.Since(begin).Seconds()) diff --git a/agent/service.go b/agent/service.go index 5de2857..5902f1f 100644 --- a/agent/service.go +++ b/agent/service.go @@ -7,6 +7,10 @@ import ( "context" "encoding/json" "errors" + "fmt" + "os/exec" + + socket "github.com/ultravioletrs/agent/pkg" ) var ( @@ -26,13 +30,19 @@ type Metadata map[string]interface{} type Service interface { Run(ctx context.Context, cmp Computation) (string, error) Algo(ctx context.Context, algorithm []byte) (string, error) - Data(ctx context.Context, dataset string) (string, error) + Data(ctx context.Context, dataset []byte) (string, error) Result(ctx context.Context) ([]byte, error) } type agentService struct { + computation Computation + algorithms [][]byte + datasets [][]byte + result []byte } +const socketPath = "unix_socket" + var _ Service = (*agentService)(nil) // New instantiates the agent service implementation. @@ -40,12 +50,14 @@ func New() Service { return &agentService{} } -func (ks *agentService) Run(ctx context.Context, cmp Computation) (string, error) { +func (as *agentService) Run(ctx context.Context, cmp Computation) (string, error) { cmpJSON, err := json.Marshal(cmp) if err != nil { return "", err } + as.computation = cmp + return string(cmpJSON), nil // return the JSON string as the function's string return value } @@ -53,6 +65,8 @@ func (as *agentService) Algo(ctx context.Context, algorithm []byte) (string, err // Implement the logic for the Algo method based on your requirements // Use the provided ctx and algorithm parameters as needed + as.algorithms = append(as.algorithms, algorithm) + // Perform some processing on the algorithm byte array // For example, generate a unique ID for the algorithm algorithmID := "algo123" @@ -61,10 +75,12 @@ func (as *agentService) Algo(ctx context.Context, algorithm []byte) (string, err return algorithmID, nil } -func (as *agentService) Data(ctx context.Context, dataset string) (string, error) { +func (as *agentService) Data(ctx context.Context, dataset []byte) (string, error) { // Implement the logic for the Data method based on your requirements // Use the provided ctx and dataset parameters as needed + as.datasets = append(as.datasets, dataset) + // Perform some processing on the dataset string // For example, generate a unique ID for the dataset datasetID := "dataset456" @@ -77,10 +93,47 @@ func (as *agentService) Result(ctx context.Context) ([]byte, error) { // Implement the logic for the Result method based on your requirements // Use the provided ctx parameter as needed - // Perform some processing to retrieve the computation result file - // For example, read the file from storage or generate a dummy result - result := []byte("This is the computation result file.") + result, err := run(as.algorithms[0], as.datasets[0]) + if err != nil { + return nil, fmt.Errorf("error performing computation: %v", err) + } + as.result = result // Return the result file or an error - return result, nil + return as.result, nil +} + +func run(algoContent []byte, dataContent []byte) ([]byte, error) { + listener, err := socket.StartUnixSocketServer(socketPath) + if err != nil { + return nil, fmt.Errorf("error creating stdout pipe: %v", err) + } + defer listener.Close() + + // Create channels for received data and errors + dataChannel := make(chan []byte) + errorChannel := make(chan error) + go socket.AcceptConnection(listener, dataChannel, errorChannel) + + // Construct the Python script content with CSV data as a command-line argument + script := string(algoContent) + data := string(dataContent) + cmd := exec.Command("python3", "-c", script, data, socketPath) + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("error starting Python script: %v", err) + } + + var receivedData []byte + select { + case receivedData = <-dataChannel: + case err = <-errorChannel: + return nil, fmt.Errorf("error receiving data: %v", err) + } + + if err := cmd.Wait(); err != nil { + return nil, fmt.Errorf("python script execution error: %v", err) + } + + return receivedData, nil } diff --git a/agent/tracing/tracing.go b/agent/tracing/tracing.go index a4b596e..65a0fb8 100644 --- a/agent/tracing/tracing.go +++ b/agent/tracing/tracing.go @@ -44,10 +44,8 @@ func (tm *tracingMiddleware) Algo(ctx context.Context, algorithm []byte) (string return tm.svc.Algo(ctx, algorithm) } -func (tm *tracingMiddleware) Data(ctx context.Context, dataset string) (string, error) { - ctx, span := tm.tracer.Start(ctx, "data", trace.WithAttributes( - attribute.String("dataset", dataset), - )) +func (tm *tracingMiddleware) Data(ctx context.Context, dataset []byte) (string, error) { + ctx, span := tm.tracer.Start(ctx, "data") defer span.End() return tm.svc.Data(ctx, dataset) diff --git a/cli/algorithms.go b/cli/algorithms.go index 630975a..5eab55f 100644 --- a/cli/algorithms.go +++ b/cli/algorithms.go @@ -2,6 +2,7 @@ package cli import ( "log" + "os" "github.com/spf13/cobra" agentsdk "github.com/ultravioletrs/agent/pkg/sdk" @@ -18,13 +19,19 @@ func NewAlgorithmsCmd(sdk agentsdk.SDK) *cobra.Command { log.Println("Uploading algorithm binary:", algorithmFile) - response, err := sdk.UploadAlgorithm([]byte(algorithmFile)) + algorithm, err := os.ReadFile(algorithmFile) + if err != nil { + log.Println("Error reading dataset file:", err) + return + } + + response, err := sdk.UploadAlgorithm(algorithm) if err != nil { log.Println("Error uploading algorithm:", err) return } - log.Println("Response:", response) + log.Println("Succesfully uploaded algorithm:", response) }, } } diff --git a/cli/datasets.go b/cli/datasets.go index efd8e6a..02bc863 100644 --- a/cli/datasets.go +++ b/cli/datasets.go @@ -2,6 +2,7 @@ package cli import ( "log" + "os" "github.com/spf13/cobra" agentsdk "github.com/ultravioletrs/agent/pkg/sdk" @@ -18,7 +19,13 @@ func NewDatasetsCmd(sdk agentsdk.SDK) *cobra.Command { log.Println("Uploading dataset CSV:", datasetFile) - response, err := sdk.UploadDataset(datasetFile) + dataset, err := os.ReadFile(datasetFile) + if err != nil { + log.Println("Error reading dataset file:", err) + return + } + + response, err := sdk.UploadDataset(dataset) if err != nil { log.Println("Error uploading dataset:", err) return diff --git a/cli/openapi.yaml b/cli/openapi.yaml index 4825d0a..da75c1c 100644 --- a/cli/openapi.yaml +++ b/cli/openapi.yaml @@ -5,7 +5,30 @@ info: servers: - url: https://api.example.com/v1 paths: - /algorithm: + /run: + post: + summary: Run a computation + requestBody: + required: true + content: + application/octet-stream: + schema: + type: string + format: binary + description: The computation binary file (Linux executable) + responses: + "200": + description: Computation started + content: + application/json: + schema: + type: object + properties: + computationId: + type: string + description: Identifier for the computation + + /algo: post: summary: Upload algorithm binary requestBody: @@ -28,16 +51,17 @@ paths: type: string description: Identifier for the uploaded algorithm binary - /dataset: + /data: post: summary: Upload dataset CSV file requestBody: required: true content: - text/plain: + application/octet-stream: schema: type: string - description: The dataset CSV file as a plain text string + format: binary + description: The dataset CSV file as a binary responses: "200": description: Dataset CSV uploaded diff --git a/cli/results.go b/cli/result.go similarity index 85% rename from cli/results.go rename to cli/result.go index a928b1a..f8ebb24 100644 --- a/cli/results.go +++ b/cli/result.go @@ -1,13 +1,15 @@ package cli import ( - "io/ioutil" "log" + "os" "github.com/spf13/cobra" agentsdk "github.com/ultravioletrs/agent/pkg/sdk" ) +const resultFilePath = "result.bin" + func NewResultsCmd(sdk agentsdk.SDK) *cobra.Command { return &cobra.Command{ @@ -22,14 +24,13 @@ func NewResultsCmd(sdk agentsdk.SDK) *cobra.Command { return } - err = ioutil.WriteFile("result.txt", result, 0644) + err = os.WriteFile(resultFilePath, result, 0644) if err != nil { log.Println("Error saving computation result:", err) return } log.Println("Computation result retrieved and saved successfully!") - log.Println("Response:", string(result)) }, } } diff --git a/go.mod b/go.mod index d7da793..d29a98a 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/caarlos0/env/v7 v7.1.0 github.com/go-kit/kit v0.12.0 github.com/go-zoo/bone v1.3.0 + github.com/golang/protobuf v1.5.3 github.com/mainflux/mainflux v0.0.0-20230726142711-2b78902e0170 github.com/prometheus/client_golang v1.16.0 github.com/spf13/cobra v1.7.0 @@ -32,7 +33,6 @@ require ( github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gofrs/uuid v4.4.0+incompatible // indirect - github.com/golang/protobuf v1.5.3 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/prometheus/client_model v0.4.0 // indirect diff --git a/pkg/sdk/agent.go b/pkg/sdk/agent.go index 3982e9c..0e9c089 100644 --- a/pkg/sdk/agent.go +++ b/pkg/sdk/agent.go @@ -9,7 +9,14 @@ import ( "github.com/ultravioletrs/agent/agent" ) -type AgentSDK struct { +type SDK interface { + Run(computation Computation) (string, error) + UploadAlgorithm(algorithm []byte) (string, error) + UploadDataset(dataset []byte) (string, error) + Result() ([]byte, error) +} + +type agentSDK struct { client agent.AgentServiceClient logger logger.Logger } @@ -33,14 +40,14 @@ type Computation struct { type Metadata map[string]interface{} -func NewAgentSDK(log logger.Logger, agentClient agent.AgentServiceClient) *AgentSDK { - return &AgentSDK{ +func NewAgentSDK(log logger.Logger, agentClient agent.AgentServiceClient) *agentSDK { + return &agentSDK{ client: agentClient, logger: log, } } -func (sdk *AgentSDK) Run(computation Computation) (string, error) { +func (sdk *agentSDK) Run(computation Computation) (string, error) { computationBytes, err := json.Marshal(computation) if err != nil { sdk.logger.Error("Failed to marshal computation") @@ -59,7 +66,7 @@ func (sdk *AgentSDK) Run(computation Computation) (string, error) { return response.Computation, nil } -func (sdk *AgentSDK) UploadAlgorithm(algorithm []byte) (string, error) { +func (sdk *agentSDK) UploadAlgorithm(algorithm []byte) (string, error) { request := &agent.AlgoRequest{ Algorithm: algorithm, } @@ -73,7 +80,7 @@ func (sdk *AgentSDK) UploadAlgorithm(algorithm []byte) (string, error) { return response.AlgorithmID, nil } -func (sdk *AgentSDK) UploadDataset(dataset string) (string, error) { +func (sdk *agentSDK) UploadDataset(dataset []byte) (string, error) { request := &agent.DataRequest{ Dataset: dataset, } @@ -87,12 +94,10 @@ func (sdk *AgentSDK) UploadDataset(dataset string) (string, error) { return response.DatasetID, nil } -func (sdk *AgentSDK) Result() ([]byte, error) { +func (sdk *agentSDK) Result() ([]byte, error) { request := &agent.ResultRequest{} - ctx, cancel := context.WithTimeout(context.Background(), time.Second*100) - defer cancel() - response, err := sdk.client.Result(ctx, request) + response, err := sdk.client.Result(context.Background(), request) if err != nil { sdk.logger.Error("Failed to call Result RPC") return nil, err diff --git a/pkg/sdk/sdk.go b/pkg/sdk/sdk.go deleted file mode 100644 index 0b945b5..0000000 --- a/pkg/sdk/sdk.go +++ /dev/null @@ -1,8 +0,0 @@ -package sdk - -type SDK interface { - Run(computation Computation) (string, error) - UploadAlgorithm(algorithm []byte) (string, error) - UploadDataset(dataset string) (string, error) - Result() ([]byte, error) -} diff --git a/pkg/socket.go b/pkg/socket.go new file mode 100644 index 0000000..ae78194 --- /dev/null +++ b/pkg/socket.go @@ -0,0 +1,54 @@ +package socket + +import ( + "fmt" + "io" + "net" + "os" +) + +func StartUnixSocketServer(socketPath string) (net.Listener, error) { + // Remove any existing socket file + _ = os.Remove(socketPath) + + // Create a Unix domain socket listener + listener, err := net.Listen("unix", socketPath) + if err != nil { + return nil, fmt.Errorf("error creating socket listener: %v", err) + } + + fmt.Println("Unix domain socket server is listening on", socketPath) + + return listener, nil +} + +func AcceptConnection(listener net.Listener, dataChannel chan []byte, errorChannel chan error) { + conn, err := listener.Accept() + if err != nil { + errorChannel <- fmt.Errorf("error accepting connection:: %v", err) + } + + handleConnection(conn, dataChannel, errorChannel) +} + +func handleConnection(conn net.Conn, dataChannel chan []byte, errorChannel chan error) { + defer conn.Close() + + // Create a dynamic buffer to store incoming data + var buffer []byte + tmp := make([]byte, 1024) + + for { + // Read data into the temporary buffer + n, err := conn.Read(tmp) + if err != nil { + if err == io.EOF { + break + } + errorChannel <- err + } + buffer = append(buffer, tmp[:n]...) + } + + dataChannel <- buffer +} diff --git a/test/manual/README.md b/test/manual/README.md new file mode 100644 index 0000000..6d4162b --- /dev/null +++ b/test/manual/README.md @@ -0,0 +1,56 @@ +# Manual tests + +## CLI + +Open a console and start `agent` + +```sh +AGENT_LOG_LEVEL=info go run cmd/agent/main.go +``` + +Open another console and run + +```sh +export AGENT_GRPC_URL=localhost:7002 + +# Run the CLI program with algorithm input +go run cmd/cli/main.go algo test/manual/algo/lin_reg.py +# 2023/09/21 10:43:53 Uploading algorithm binary: test/manual/algo/lin_reg.py + +# Run the CLI program with dataset input +go run cmd/cli/main.go data test/manual/data/iris.csv +# 2023/09/21 10:45:25 Uploading dataset CSV: test/manual/data/iris.csv + +# Run the CLI program to fetch computation result +go run cmd/cli/main.go result +# 2023/09/21 10:45:39 Retrieving computation result file +# 2023/09/21 10:45:40 Computation result retrieved and saved successfully! +``` + +Now there is a `result.bin` file in the current working directory. The file holds the trained logistic regression model. To test the model, run + +```sh +python3 test/manual/algo/lin_reg_test.py test/manual/data/iris.csv result.bin +``` + +You should get an output (truncated for the sake of brevity): + +```sh + Id SepalLengthCm SepalWidthCm PetalLengthCm PetalWidthCm Species +0 1 5.1 3.5 1.4 0.2 Iris-setosa +1 2 4.9 3.0 1.4 0.2 Iris-setosa +2 3 4.7 3.2 1.3 0.2 Iris-setosa +3 4 4.6 3.1 1.5 0.2 Iris-setosa +4 5 5.0 3.6 1.4 0.2 Iris-setosa +Precision, Recall, Confusion matrix, in training + + precision recall f1-score support + + Iris-setosa 1.000 1.000 1.000 21 +Iris-versicolor 0.923 0.889 0.906 27 + Iris-virginica 0.893 0.926 0.909 27 + + accuracy 0.933 75 + macro avg 0.939 0.938 0.938 75 + weighted avg 0.934 0.933 0.933 75 +``` \ No newline at end of file diff --git a/test/manual/algo/lin_reg.py b/test/manual/algo/lin_reg.py new file mode 100644 index 0000000..92f0ffe --- /dev/null +++ b/test/manual/algo/lin_reg.py @@ -0,0 +1,47 @@ +import sys, io +import joblib +import socket + +import pandas as pd +from sklearn.model_selection import train_test_split +from sklearn.linear_model import LogisticRegression + +dataset = sys.argv[1] +iris = pd.read_csv(io.StringIO(dataset)) + +# Droping the Species since we only need the measurements +X = iris.drop(['Species'], axis=1) + +# converting into numpy array and assigning petal length and petal width +X = X.to_numpy()[:, (3,4)] +y = iris['Species'] + +# Splitting into train and test +X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.5, random_state=42) + +log_reg = LogisticRegression() +log_reg.fit(X_train,y_train) + +# Serialize the trained model to a byte buffer +model_buffer = io.BytesIO() +joblib.dump(log_reg, model_buffer) + +# Get the serialized model as a bytes object +model_bytes = model_buffer.getvalue() + +# Define the path for the Unix domain socket +socket_path = sys.argv[2] + +# Create a Unix domain socket client +client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + +try: + # Connect to the server + client.connect(socket_path) + + # Send the serialized model over the socket + client.send(model_bytes) + +finally: + # Close the socket + client.close()