Skip to content

Commit

Permalink
pipeline: add local weighted semaphores
Browse files Browse the repository at this point in the history
  • Loading branch information
sevein committed Jan 9, 2020
1 parent 8e3ddb4 commit e644ce5
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 11 deletions.
1 change: 1 addition & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ processingDir = ""
processingConfig = "automated"
# transferLocationID = "88f6b517-c0cc-411b-8abf-79544ce96f54"
storageServiceURL = "http://test:[email protected]:62081"
capacity = 5

[[hooks."hari"]]
baseURL = "" # E.g.: "https://192.168.1.50:8080/api"
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ require (
gocloud.dev v0.17.0
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 // indirect
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20191210023423-ac6580df4449 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.0.0-20191210221141-98df12377212 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
6 changes: 6 additions & 0 deletions hack/test-dpj/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ content-creation:
err: FIXITY_ERR = yes
err: push-tar

TIMES ?= 10
many-ok:
@for run in {1..$(TIMES)}; do \
make push-tar; \
done;

TIMES ?= 10
many:
@for run in {1..$(TIMES)}; do \
Expand Down
17 changes: 17 additions & 0 deletions internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/artefactual-labs/enduro/internal/amclient"
"golang.org/x/sync/semaphore"
)

type Config struct {
Expand All @@ -28,14 +29,21 @@ type Config struct {
ProcessingDir string
ProcessingConfig string
StorageServiceURL string
Capacity uint64
}

type Pipeline struct {
// ID (UUID) of the pipeline. This is not provided by the user but loaded
// on demand once we have access to the pipeline API.
ID string

// A weighted semaphore to limit concurrent use of this pipeline.
sem *semaphore.Weighted

// Configuration attributes.
config *Config

// The underlying HTTP client used by amclient.
client *http.Client
}

Expand All @@ -44,6 +52,7 @@ func NewPipeline(config Config) (*Pipeline, error) {
config.ProcessingDir = expandPath(config.ProcessingDir)

p := &Pipeline{
sem: semaphore.NewWeighted(int64(config.Capacity)),
config: &config,
client: httpClient(),
}
Expand Down Expand Up @@ -129,6 +138,14 @@ func (p Pipeline) Config() *Config {
return p.config
}

func (p *Pipeline) Acquire(ctx context.Context) error {
return p.sem.Acquire(ctx, 1)
}

func (p *Pipeline) Release() {
p.sem.Release(1)
}

func httpClient() *http.Client {
const (
dialTimeout = 5 * time.Second
Expand Down
40 changes: 40 additions & 0 deletions internal/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package pipeline

import (
"context"
"testing"
"time"

"gotest.tools/v3/assert"
)

func TestPipelineSemaphore(t *testing.T) {
t.Parallel()

ctx := context.Background()

p, err := NewPipeline(Config{Capacity: 3})
assert.ErrorContains(t, err, "error during pipeline identification")

tryAcquire := func(n int64) bool {
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
return p.Acquire(ctx) == nil
}

tries := []bool{}

// These three should succeed right away.
tries = append(tries, tryAcquire(1))
tries = append(tries, tryAcquire(1))
tries = append(tries, tryAcquire(1))

// And the one too because we've released once.
p.Release()
tries = append(tries, tryAcquire(1))

// But this will fail because all the slots are taken.
tries = append(tries, tryAcquire(1))

assert.DeepEqual(t, tries, []bool{true, true, true, true, false})
}
26 changes: 26 additions & 0 deletions internal/workflow/activities/acquire_pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package activities

import (
"context"

"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

// AcquirePipelineActivity acquires a lock in the weighted semaphore associated
// to a particular pipeline.
type AcquirePipelineActivity struct {
manager *manager.Manager
}

func NewAcquirePipelineActivity(m *manager.Manager) *AcquirePipelineActivity {
return &AcquirePipelineActivity{manager: m}
}

func (a *AcquirePipelineActivity) Execute(ctx context.Context, name string) error {
p, err := a.manager.Pipelines.ByName(name)
if err != nil {
return err
}

return p.Acquire(ctx)
}
17 changes: 9 additions & 8 deletions internal/workflow/activities/activities.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package activities

const (
DownloadActivityName = "download-activity"
BundleActivityName = "bundle-activity"
TransferActivityName = "transfer-activity"
PollTransferActivityName = "poll-transfer-activity"
PollIngestActivityName = "poll-ingest-activity"
CleanUpActivityName = "clean-up-activity"
HidePackageActivityName = "hide-package-activity"
DeleteOriginalActivityName = "delete-original-activity"
AcquirePipelineActivityName = "acquire-pipeline-activity"
DownloadActivityName = "download-activity"
BundleActivityName = "bundle-activity"
TransferActivityName = "transfer-activity"
PollTransferActivityName = "poll-transfer-activity"
PollIngestActivityName = "poll-ingest-activity"
CleanUpActivityName = "clean-up-activity"
HidePackageActivityName = "hide-package-activity"
DeleteOriginalActivityName = "delete-original-activity"
)
13 changes: 13 additions & 0 deletions internal/workflow/local_activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,23 @@ import (
"context"

"github.com/artefactual-labs/enduro/internal/collection"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/workflow/manager"

"go.uber.org/cadence/activity"
)

func releasePipelineLocalActivity(ctx context.Context, registry *pipeline.Registry, name string) error {
p, err := registry.ByName(name)
if err != nil {
return err
}

p.Release()

return nil
}

func createPackageLocalActivity(ctx context.Context, colsvc collection.Service, tinfo *TransferInfo) (*TransferInfo, error) {
info := activity.GetInfo(ctx)

Expand Down
9 changes: 9 additions & 0 deletions internal/workflow/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ func withActivityOptsForLongLivedRequest(ctx workflow.Context) workflow.Context
})
}

// withActivityOptsForUnlimitedTime returns a workflow context with activity
// options suited for activities that can take a long time to run.
func withActivityOptsForUnlimitedTime(ctx workflow.Context) workflow.Context {
return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToStartTimeout: time.Hour * 1,
StartToCloseTimeout: forever,
})
}

// withActivityOptsForHeartbeatedRequest returns a workflow context with
// activity options suited for long-lived activities implementing heartbeats.
//
Expand Down
21 changes: 18 additions & 3 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,16 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
}

defer func() {
// Update package status, using a workflow-disconnected context to
// ensure that it runs even after cancellation.
var status = tinfo.Status
if status == collection.StatusInProgress {
status = collection.StatusError
}

// Update package status, using a workflow-disconnected context to
// ensure that it runs even after cancellation.
var dctx, _ = workflow.NewDisconnectedContext(ctx)
_ = workflow.ExecuteLocalActivity(withLocalActivityOpts(dctx), updatePackageStatusLocalActivity, w.manager.Collection, tinfo, status).Get(activityOpts, nil)
var activityOpts = withLocalActivityOpts(dctx)
_ = workflow.ExecuteLocalActivity(activityOpts, updatePackageStatusLocalActivity, w.manager.Collection, tinfo, status).Get(activityOpts, nil)
}()

// Load pipeline configuration and hooks.
Expand All @@ -149,6 +151,13 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
return wferrors.NonRetryableError(fmt.Errorf("error loading configuration: %v", err))
}

// Acquire pipeline.
activityOpts = withActivityOptsForUnlimitedTime(ctx)
err = workflow.ExecuteActivity(activityOpts, activities.AcquirePipelineActivityName, req.Event.PipelineName).Get(activityOpts, nil)
if err != nil {
return wferrors.NonRetryableError(fmt.Errorf("error acquiring pipeline: %v", err))
}

// A session guarantees that activities within it are scheduled on the same
// worker.
var sessCtx workflow.Context
Expand Down Expand Up @@ -271,6 +280,12 @@ func (w *ProcessingWorkflow) SessionHandler(ctx workflow.Context, sessCtx workfl
err error
)

defer func() {
var dctx, _ = workflow.NewDisconnectedContext(ctx)
var activityOpts = withLocalActivityOpts(dctx)
_ = workflow.ExecuteLocalActivity(activityOpts, releasePipelineLocalActivity, w.manager.Pipelines, tinfo.Event.PipelineName).Get(activityOpts, nil)
}()

// Download.
activityOpts = withActivityOptsForLongLivedRequest(sessCtx)
err = workflow.ExecuteActivity(activityOpts, activities.DownloadActivityName, tinfo.Event).Get(activityOpts, &tinfo.TempFile)
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func main() {

cadence.RegisterWorkflow(workflow.NewProcessingWorkflow(m).Execute, collection.ProcessingWorkflowName)

cadence.RegisterActivity(activities.NewAcquirePipelineActivity(m).Execute, activities.AcquirePipelineActivityName)
cadence.RegisterActivity(activities.NewDownloadActivity(m).Execute, activities.DownloadActivityName)
cadence.RegisterActivity(activities.NewBundleActivity().Execute, activities.BundleActivityName)
cadence.RegisterActivity(activities.NewTransferActivity(m).Execute, activities.TransferActivityName)
Expand Down

0 comments on commit e644ce5

Please sign in to comment.