Skip to content

Commit

Permalink
NOISSUE - Enable WASM Support and FileSystem Support (ultravioletrs#189)
Browse files Browse the repository at this point in the history
* feat(algorithm): Add wasm as an algo type

Signed-off-by: Rodney Osodo <[email protected]>

* feat(algorithm): Use filesystem to store results

Move from unix socket for results storage to filesystem

* test: test new filesystem changes

Signed-off-by: Rodney Osodo <[email protected]>

* refactor(files): rename resultFile to resultsFilePath

* feat(wasm-runtime): change from wasmtime to wasmedge

Wasmedge enables easier directory mapping to get results

Signed-off-by: Rodney Osodo <[email protected]>

* feat(algorithm): send results as zipped directory

Create a new function to zip the results directory and send it back to the user

* fix(wasm): runtime argument

Fix the directory mapping for wasm runtime arguments

Signed-off-by: Rodney Osodo <[email protected]>

* fix(errors): provide useful error message

* chore(gitignore): add results zip to gitignore

* feat(filesystem): Enable storing results on filesystem for python algos

* refactor: revert to upstream cocos repo

Signed-off-by: Rodney Osodo <[email protected]>

* fix: remove AddDataset from algorithm interface

* fix: agent to handle results zipping

* test: test zipping directories

* refactor(agent): Handle file operations from agent

* test: run test inside eos

Signed-off-by: Rodney Osodo <[email protected]>

* refactor(test): Document and test algos are running

Document steps on running the 2 python exampls and ensure they are running on eos

Signed-off-by: Rodney Osodo <[email protected]>

* fix: remove witheDataset option

* test: test without dataset argument

Signed-off-by: Rodney Osodo <[email protected]>

---------

Signed-off-by: Rodney Osodo <[email protected]>
  • Loading branch information
rodneyosodo authored Aug 6, 2024
1 parent 3c855e3 commit afc306a
Show file tree
Hide file tree
Showing 23 changed files with 517 additions and 265 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/checkproto.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:

- name: Set up protoc
run: |
PROTOC_VERSION=25.3
PROTOC_VERSION=27.2
PROTOC_GEN_VERSION=v1.34.2
PROTOC_GRPC_VERSION=v1.4.0
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ cmd/manager/tmp
*.pem

dist/
result.bin
result.zip
*.spec
2 changes: 1 addition & 1 deletion agent/agent.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion agent/agent_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions agent/algorithm/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ type AlgorithType string
const (
AlgoTypeBin AlgorithType = "bin"
AlgoTypePython AlgorithType = "python"
AlgoTypeWasm AlgorithType = "wasm"
AlgoTypeKey = "algo_type"

ResultsDir = "results"
DatasetsDir = "datasets"
)

func AlgorithmTypeToContext(ctx context.Context, algoType string) context.Context {
Expand All @@ -27,8 +31,5 @@ func AlgorithmTypeFromContext(ctx context.Context) string {
// Algorithm is an interface that specifies the API for an algorithm.
type Algorithm interface {
// Run executes the algorithm and returns the result.
Run() ([]byte, error)

// Add dataset to algorithm.
AddDataset(dataset string)
Run() error
}
49 changes: 6 additions & 43 deletions agent/algorithm/binary/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,77 +6,40 @@ import (
"fmt"
"io"
"log/slog"
"os"
"os/exec"

"github.com/ultravioletrs/cocos/agent/algorithm"
"github.com/ultravioletrs/cocos/agent/events"
"github.com/ultravioletrs/cocos/pkg/socket"
)

const socketPath = "unix_socket"

var _ algorithm.Algorithm = (*binary)(nil)

type binary struct {
algoFile string
datasets []string
logger *slog.Logger
stderr io.Writer
stdout io.Writer
}

func New(logger *slog.Logger, eventsSvc events.Service, algoFile string) algorithm.Algorithm {
func NewAlgorithm(logger *slog.Logger, eventsSvc events.Service, algoFile string) algorithm.Algorithm {
return &binary{
algoFile: algoFile,
logger: logger,
stderr: &algorithm.Stderr{Logger: logger, EventSvc: eventsSvc},
stdout: &algorithm.Stdout{Logger: logger},
}
}

func (b *binary) AddDataset(dataset string) {
b.datasets = append(b.datasets, dataset)
}

func (b *binary) Run() ([]byte, error) {
defer os.Remove(b.algoFile)
defer func() {
for _, file := range b.datasets {
os.Remove(file)
}
}()
listener, err := socket.StartUnixSocketServer(socketPath)
if err != nil {
return nil, fmt.Errorf("error creating stdout pipe: %v", err)
}
defer listener.Close()

// Create channels for received data and errors
dataChannel := make(chan []byte)
errorChannel := make(chan error)

var result []byte

go socket.AcceptConnection(listener, dataChannel, errorChannel)

args := append([]string{socketPath}, b.datasets...)
cmd := exec.Command(b.algoFile, args...)
func (b *binary) Run() error {
cmd := exec.Command(b.algoFile)
cmd.Stderr = b.stderr
cmd.Stdout = b.stdout

if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("error starting algorithm: %v", err)
return fmt.Errorf("error starting algorithm: %v", err)
}

if err := cmd.Wait(); err != nil {
return nil, fmt.Errorf("algorithm execution error: %v", err)
return fmt.Errorf("algorithm execution error: %v", err)
}

select {
case result = <-dataChannel:
return result, nil
case err = <-errorChannel:
return nil, fmt.Errorf("error receiving data: %v", err)
}
return nil
}
54 changes: 11 additions & 43 deletions agent/algorithm/python/python.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ import (

"github.com/ultravioletrs/cocos/agent/algorithm"
"github.com/ultravioletrs/cocos/agent/events"
"github.com/ultravioletrs/cocos/pkg/socket"
"google.golang.org/grpc/metadata"
)

const (
socketPath = "unix_socket"
PyRuntime = "python3"
pyRuntimeKey = "python_runtime"
)
Expand All @@ -35,18 +33,15 @@ var _ algorithm.Algorithm = (*python)(nil)

type python struct {
algoFile string
datasets []string
logger *slog.Logger
stderr io.Writer
stdout io.Writer
runtime string
requirementsFile string
}

func New(logger *slog.Logger, eventsSvc events.Service, runtime, requirementsFile, algoFile string) algorithm.Algorithm {
func NewAlgorithm(logger *slog.Logger, eventsSvc events.Service, runtime, requirementsFile, algoFile string) algorithm.Algorithm {
p := &python{
algoFile: algoFile,
logger: logger,
stderr: &algorithm.Stderr{Logger: logger, EventSvc: eventsSvc},
stdout: &algorithm.Stdout{Logger: logger},
requirementsFile: requirementsFile,
Expand All @@ -59,17 +54,13 @@ func New(logger *slog.Logger, eventsSvc events.Service, runtime, requirementsFil
return p
}

func (p *python) AddDataset(dataset string) {
p.datasets = append(p.datasets, dataset)
}

func (p *python) Run() ([]byte, error) {
func (p *python) Run() error {
venvPath := "venv"
createVenvCmd := exec.Command(p.runtime, "-m", "venv", venvPath)
createVenvCmd.Stderr = p.stderr
createVenvCmd.Stdout = p.stdout
if err := createVenvCmd.Run(); err != nil {
return nil, fmt.Errorf("error creating virtual environment: %v", err)
return fmt.Errorf("error creating virtual environment: %v", err)
}

pythonPath := filepath.Join(venvPath, "bin", "python")
Expand All @@ -79,48 +70,25 @@ func (p *python) Run() ([]byte, error) {
rcmd.Stderr = p.stderr
rcmd.Stdout = p.stdout
if err := rcmd.Run(); err != nil {
return nil, fmt.Errorf("error installing requirements: %v", err)
}
}

defer os.Remove(p.algoFile)
defer func() {
for _, file := range p.datasets {
os.Remove(file)
return fmt.Errorf("error installing requirements: %v", err)
}
}()
defer os.RemoveAll(venvPath)

listener, err := socket.StartUnixSocketServer(socketPath)
if err != nil {
return nil, fmt.Errorf("error creating stdout pipe: %v", err)
}
defer listener.Close()

dataChannel := make(chan []byte)
errorChannel := make(chan error)

var result []byte

go socket.AcceptConnection(listener, dataChannel, errorChannel)

args := append([]string{p.algoFile, socketPath}, p.datasets...)
cmd := exec.Command(pythonPath, args...)
cmd := exec.Command(pythonPath, p.algoFile)
cmd.Stderr = p.stderr
cmd.Stdout = p.stdout

if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("error starting algorithm: %v", err)
return fmt.Errorf("error starting algorithm: %v", err)
}

if err := cmd.Wait(); err != nil {
return nil, fmt.Errorf("algorithm execution error: %v", err)
return fmt.Errorf("algorithm execution error: %v", err)
}

select {
case result = <-dataChannel:
return result, nil
case err = <-errorChannel:
return nil, fmt.Errorf("error receiving data: %v", err)
if err := os.RemoveAll(venvPath); err != nil {
return fmt.Errorf("error removing virtual environment: %v", err)
}

return nil
}
59 changes: 59 additions & 0 deletions agent/algorithm/results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package algorithm

import (
"archive/zip"
"bytes"
"fmt"
"io"
"os"
"path/filepath"
)

// ZipDirectory zips a directory and returns the zipped bytes.
func ZipDirectory() ([]byte, error) {
buf := new(bytes.Buffer)
zipWriter := zip.NewWriter(buf)

err := filepath.Walk(ResultsDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("error walking the path %q: %v", path, err)
}

if info.IsDir() {
return nil
}

relPath, err := filepath.Rel(ResultsDir, path)
if err != nil {
return fmt.Errorf("error getting relative path for %q: %v", path, err)
}

file, err := os.Open(path)
if err != nil {
return fmt.Errorf("error opening file %q: %v", path, err)
}
defer file.Close()

zipFile, err := zipWriter.Create(relPath)
if err != nil {
return fmt.Errorf("error creating zip file for %q: %v", path, err)
}

if _, err = io.Copy(zipFile, file); err != nil {
return fmt.Errorf("error copying file %q to zip: %v", path, err)
}

return err
})
if err != nil {
return nil, err
}

if err = zipWriter.Close(); err != nil {
return nil, err
}

return buf.Bytes(), nil
}
81 changes: 81 additions & 0 deletions agent/algorithm/results_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package algorithm_test

import (
"os"
"testing"

"github.com/ultravioletrs/cocos/agent/algorithm"
)

func TestZipDirectory(t *testing.T) {
cases := []struct {
name string
directories []string
files []string
expected []string
}{
{
name: "empty directory",
directories: []string{"testdata"},
},
{
name: "single file",
files: []string{"file1.txt"},
},
{
name: "directory with single file",
directories: []string{"testdata"},
expected: []string{"testdata/file1.txt"},
},
{
name: "directory with multiple files",
directories: []string{"testdata"},
expected: []string{
"testdata/file1.txt",
"testdata/file2.txt",
"testdata/file3.txt",
},
},
{
name: "nested directories",
directories: []string{"testdata", "testdata/nested"},
expected: []string{
"testdata/nested/file1.txt",
"testdata/nested/file2.txt",
"testdata/nested/file3.txt",
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if err := os.Mkdir(algorithm.ResultsDir, 0o755); err != nil {
t.Fatalf("error creating results directory: %s", err.Error())
}
defer func() {
if err := os.RemoveAll(algorithm.ResultsDir); err != nil {
t.Fatalf("error removing results directory and its contents: %s", err.Error())
}
}()

for _, dir := range tc.directories {
if dir != "" {
if err := os.Mkdir(algorithm.ResultsDir+"/"+dir, 0o755); err != nil {
t.Fatalf("error creating test directory: %s", err.Error())
}
}
}
for _, file := range tc.files {
if _, err := os.Create(algorithm.ResultsDir + "/" + file); err != nil {
t.Fatalf("error creating test file: %s", err.Error())
}
}

if _, err := algorithm.ZipDirectory(); err != nil {
t.Errorf("ZipDirectory() error = %v", err)
}
})
}
}
Loading

0 comments on commit afc306a

Please sign in to comment.