Skip to content

Commit

Permalink
Merge pull request #45 from ultravioletrs/algo_data
Browse files Browse the repository at this point in the history
Add basic business logic of training a model on a dataset
  • Loading branch information
drasko authored and darkodraskovic committed Sep 21, 2023
2 parents a3c4664 + b0b22ae commit 5171882
Show file tree
Hide file tree
Showing 20 changed files with 309 additions and 74 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
build
bin
17 changes: 11 additions & 6 deletions agent/agent.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions agent/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ message AlgoRequest { bytes algorithm = 1; }

message AlgoResponse { string algorithmID = 1; }

message DataRequest { string dataset = 1; }
message DataRequest { bytes dataset = 1; }

message DataResponse { string datasetID = 1; }

message ResultRequest {}

message ResultResponse {
bytes file = 1;
}
message ResultResponse { bytes file = 1; }
27 changes: 8 additions & 19 deletions agent/agent_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion agent/api/grpc/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (req algoReq) validate() error {
}

type dataReq struct {
Dataset string `protobuf:"bytes,1,opt,name=dataset,proto3" json:"dataset,omitempty"`
Dataset []byte `protobuf:"bytes,1,opt,name=dataset,proto3" json:"dataset,omitempty"`
}

func (req dataReq) validate() error {
Expand Down
2 changes: 1 addition & 1 deletion agent/api/grpc/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ type dataRes struct {
}

type resultRes struct {
File []byte `json:"-"`
File []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
}
2 changes: 1 addition & 1 deletion agent/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (lm *loggingMiddleware) Algo(ctx context.Context, algorithm []byte) (respon
return lm.svc.Algo(ctx, algorithm)
}

func (lm *loggingMiddleware) Data(ctx context.Context, dataset string) (response string, err error) {
func (lm *loggingMiddleware) Data(ctx context.Context, dataset []byte) (response string, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method Data took %s to complete", time.Since(begin))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion agent/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (ms *metricsMiddleware) Algo(ctx context.Context, algorithm []byte) (string
return ms.svc.Algo(ctx, algorithm)
}

func (ms *metricsMiddleware) Data(ctx context.Context, dataset string) (string, error) {
func (ms *metricsMiddleware) Data(ctx context.Context, dataset []byte) (string, error) {
defer func(begin time.Time) {
ms.counter.With("method", "data").Add(1)
ms.latency.With("method", "data").Observe(time.Since(begin).Seconds())
Expand Down
67 changes: 60 additions & 7 deletions agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"os/exec"

socket "github.com/ultravioletrs/agent/pkg"
)

var (
Expand All @@ -26,33 +30,43 @@ type Metadata map[string]interface{}
type Service interface {
Run(ctx context.Context, cmp Computation) (string, error)
Algo(ctx context.Context, algorithm []byte) (string, error)
Data(ctx context.Context, dataset string) (string, error)
Data(ctx context.Context, dataset []byte) (string, error)
Result(ctx context.Context) ([]byte, error)
}

type agentService struct {
computation Computation
algorithms [][]byte
datasets [][]byte
result []byte
}

const socketPath = "unix_socket"

var _ Service = (*agentService)(nil)

// New instantiates the agent service implementation.
func New() Service {
return &agentService{}
}

func (ks *agentService) Run(ctx context.Context, cmp Computation) (string, error) {
func (as *agentService) Run(ctx context.Context, cmp Computation) (string, error) {
cmpJSON, err := json.Marshal(cmp)
if err != nil {
return "", err
}

as.computation = cmp

return string(cmpJSON), nil // return the JSON string as the function's string return value
}

func (as *agentService) Algo(ctx context.Context, algorithm []byte) (string, error) {
// Implement the logic for the Algo method based on your requirements
// Use the provided ctx and algorithm parameters as needed

as.algorithms = append(as.algorithms, algorithm)

// Perform some processing on the algorithm byte array
// For example, generate a unique ID for the algorithm
algorithmID := "algo123"
Expand All @@ -61,10 +75,12 @@ func (as *agentService) Algo(ctx context.Context, algorithm []byte) (string, err
return algorithmID, nil
}

func (as *agentService) Data(ctx context.Context, dataset string) (string, error) {
func (as *agentService) Data(ctx context.Context, dataset []byte) (string, error) {
// Implement the logic for the Data method based on your requirements
// Use the provided ctx and dataset parameters as needed

as.datasets = append(as.datasets, dataset)

// Perform some processing on the dataset string
// For example, generate a unique ID for the dataset
datasetID := "dataset456"
Expand All @@ -77,10 +93,47 @@ func (as *agentService) Result(ctx context.Context) ([]byte, error) {
// Implement the logic for the Result method based on your requirements
// Use the provided ctx parameter as needed

// Perform some processing to retrieve the computation result file
// For example, read the file from storage or generate a dummy result
result := []byte("This is the computation result file.")
result, err := run(as.algorithms[0], as.datasets[0])
if err != nil {
return nil, fmt.Errorf("error performing computation: %v", err)
}
as.result = result

// Return the result file or an error
return result, nil
return as.result, nil
}

func run(algoContent []byte, dataContent []byte) ([]byte, error) {
listener, err := socket.StartUnixSocketServer(socketPath)
if err != nil {
return nil, fmt.Errorf("error creating stdout pipe: %v", err)
}
defer listener.Close()

// Create channels for received data and errors
dataChannel := make(chan []byte)
errorChannel := make(chan error)
go socket.AcceptConnection(listener, dataChannel, errorChannel)

// Construct the Python script content with CSV data as a command-line argument
script := string(algoContent)
data := string(dataContent)
cmd := exec.Command("python3", "-c", script, data, socketPath)

if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("error starting Python script: %v", err)
}

var receivedData []byte
select {
case receivedData = <-dataChannel:
case err = <-errorChannel:
return nil, fmt.Errorf("error receiving data: %v", err)
}

if err := cmd.Wait(); err != nil {
return nil, fmt.Errorf("python script execution error: %v", err)
}

return receivedData, nil
}
6 changes: 2 additions & 4 deletions agent/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ func (tm *tracingMiddleware) Algo(ctx context.Context, algorithm []byte) (string
return tm.svc.Algo(ctx, algorithm)
}

func (tm *tracingMiddleware) Data(ctx context.Context, dataset string) (string, error) {
ctx, span := tm.tracer.Start(ctx, "data", trace.WithAttributes(
attribute.String("dataset", dataset),
))
func (tm *tracingMiddleware) Data(ctx context.Context, dataset []byte) (string, error) {
ctx, span := tm.tracer.Start(ctx, "data")
defer span.End()

return tm.svc.Data(ctx, dataset)
Expand Down
11 changes: 9 additions & 2 deletions cli/algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cli

import (
"log"
"os"

"github.com/spf13/cobra"
agentsdk "github.com/ultravioletrs/agent/pkg/sdk"
Expand All @@ -18,13 +19,19 @@ func NewAlgorithmsCmd(sdk agentsdk.SDK) *cobra.Command {

log.Println("Uploading algorithm binary:", algorithmFile)

response, err := sdk.UploadAlgorithm([]byte(algorithmFile))
algorithm, err := os.ReadFile(algorithmFile)
if err != nil {
log.Println("Error reading dataset file:", err)
return
}

response, err := sdk.UploadAlgorithm(algorithm)
if err != nil {
log.Println("Error uploading algorithm:", err)
return
}

log.Println("Response:", response)
log.Println("Succesfully uploaded algorithm:", response)
},
}
}
9 changes: 8 additions & 1 deletion cli/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cli

import (
"log"
"os"

"github.com/spf13/cobra"
agentsdk "github.com/ultravioletrs/agent/pkg/sdk"
Expand All @@ -18,7 +19,13 @@ func NewDatasetsCmd(sdk agentsdk.SDK) *cobra.Command {

log.Println("Uploading dataset CSV:", datasetFile)

response, err := sdk.UploadDataset(datasetFile)
dataset, err := os.ReadFile(datasetFile)
if err != nil {
log.Println("Error reading dataset file:", err)
return
}

response, err := sdk.UploadDataset(dataset)
if err != nil {
log.Println("Error uploading dataset:", err)
return
Expand Down
Loading

0 comments on commit 5171882

Please sign in to comment.