Skip to content

Commit

Permalink
Merge pull request #5 from newmo-oss/add-gogrouptest
Browse files Browse the repository at this point in the history
Add gogrouptest
  • Loading branch information
tenntenn authored Dec 6, 2024
2 parents 30bd929 + a7bd322 commit 9d219b4
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 31 deletions.
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
module github.com/newmo-oss/gogroup

go 1.22.7
go 1.22.10

require (
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/newmo-oss/gotestingmock v0.1.1
github.com/newmo-oss/testid v0.1.0
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8
go.uber.org/goleak v1.3.0
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/newmo-oss/gotestingmock v0.1.1 h1:EtZrif5qSsVrJ4w944pToOL3L9ux0WKwrtKXnzv+Mj8=
github.com/newmo-oss/gotestingmock v0.1.1/go.mod h1:ee64ZPEODG1GK+c4fHzxRzE9WbMi9VuIgEItMA0yXjI=
github.com/newmo-oss/testid v0.1.0 h1:NHe0FfS1b4/z+r2sgcLdHNrRRFZNWR7vEZhBJ4j3Hdk=
github.com/newmo-oss/testid v0.1.0/go.mod h1:Z2jzbNSa3gGZTBTj7fQs4n0Ke5gw3Oa9vYgySN3IZ00=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 h1:+jumHNA0Wrelhe64i8F6HNlS8pkoyMv5sreGx2Ry5Rw=
Expand Down
89 changes: 89 additions & 0 deletions gogrouptest/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package gogrouptest

import (
"context"
"sync"
"testing"

"github.com/newmo-oss/testid"
"github.com/sourcegraph/conc/panics"
"github.com/sourcegraph/conc/pool"

"github.com/newmo-oss/gogroup/internal"
)

var withoutParallels sync.Map

func init() {
if testing.Testing() {
internal.Start = startForTest
}
}

// WithoutParallel sets the parallel mode of [gogroup.Group] to off.
// It makes (*gogroup.Group).Run will execute synchronously.
// The parallel mode is set each test id which get from [testid.FromContext].
// If any test id cannot obtain from the context, the test will be fail with t.Fatal.
// The mode will be remove by t.Cleanup.
func WithoutParallel(t testing.TB, ctx context.Context) {
tid, ok := testid.FromContext(ctx)
if !ok {
t.Fatal("failed to get test ID from the context")
}

t.Cleanup(func() {
withoutParallels.Delete(tid)
})

withoutParallels.Store(tid, true)
}

func noParallel(ctx context.Context) bool {
tid, ok := testid.FromContext(ctx)
if !ok {
return false
}

v, ok := withoutParallels.Load(tid)
if !ok {
return false
}

noparallel, ok := v.(bool)
if !ok {
return false
}

return noparallel
}

func startForTest(ctx context.Context, funcs []func(context.Context) error) func() error {
if !noParallel(ctx) {
return internal.DefaultStart(ctx, funcs)
}

p := pool.New().WithContext(ctx).WithCancelOnError()
doneCh := make([]chan struct{}, len(funcs))

for i, f := range funcs {
doneCh[i] = make(chan struct{})
p.Go(func(ctx context.Context) (rerr error) {
defer func() {
close(doneCh[i])
}()

// wait before function call
if i > 0 {
<-doneCh[i-1]
}

if r := panics.Try(func() { rerr = f(ctx) }); r != nil {
return r.AsError()
}

return rerr
})
}

return p.Wait // return method value
}
81 changes: 81 additions & 0 deletions gogrouptest/group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package gogrouptest_test

import (
"context"
"runtime"
"sync"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/newmo-oss/testid"
"github.com/newmo-oss/gotestingmock"

"github.com/newmo-oss/gogroup"
"github.com/newmo-oss/gogroup/gogrouptest"
)

func TestWithoutParallel(t *testing.T) {
t.Parallel()

cases := map[string]struct {
noParallel bool
}{
"no parallel": {true},
"parallel": {false},
}

for name, tt := range cases {
t.Run(name, func(t *testing.T) {
t.Parallel()
var cleanedup bool
t.Cleanup(func() {
if tt.noParallel && !cleanedup {
t.Error("t.Cleanup must be called")
}
})

var want, got []int
var mu sync.Mutex
var g gogroup.Group
for i := range 100 {
want = append(want, i)
g.Add(func(ctx context.Context) error {
if i%2 == 0 {
runtime.Gosched()
}
mu.Lock()
got = append(got, i)
mu.Unlock()
return nil
})
}

tid := t.Name() + "/" + uuid.NewString()
ctx := testid.WithValue(context.Background(), tid)

if tt.noParallel {
tb := &gotestingmock.TB{
TB: t,
CleanupFunc: func(f func()) {
cleanedup = true
t.Cleanup(f)
},
}
gogrouptest.WithoutParallel(tb, ctx)
}

if err := g.Run(ctx); err != nil {
t.Fatal("unexpected error:", err)
}

diff := cmp.Diff(got, want)
switch {
case tt.noParallel && diff != "":
t.Error("executing order does not match:", diff)
case !tt.noParallel && diff == "":
t.Error("executing order may be randodm")
}
})
}
}
16 changes: 2 additions & 14 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import (
"slices"
"sync"

"github.com/sourcegraph/conc/panics"
"github.com/sourcegraph/conc/pool"
"github.com/newmo-oss/gogroup/internal"
)

// Group is a group of goroutines used to run functions concurrently.
Expand All @@ -27,22 +26,11 @@ func (g *Group) Add(f func(context.Context) error) {
}

func (g *Group) start(ctx context.Context) func() error {
p := pool.New().WithContext(ctx).WithCancelOnError()

g.mu.Lock()
funcs := slices.Clone(g.funcs)
g.mu.Unlock()

for _, f := range funcs {
p.Go(func(ctx context.Context) (rerr error) {
if r := panics.Try(func() { rerr = f(ctx) }); r != nil {
return r.AsError()
}
return rerr
})
}

return p.Wait // return method value
return internal.Start(ctx, funcs)
}

// Run calls all registered functions in different goroutines.
Expand Down
23 changes: 7 additions & 16 deletions group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"testing"
"time"

"github.com/google/uuid"
"github.com/newmo-oss/testid"
"go.uber.org/goleak"

"github.com/newmo-oss/gogroup"
"github.com/newmo-oss/gogroup/gogrouptest"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -47,36 +50,24 @@ func TestGroup(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()

var (
doneCh = make([]chan struct{}, len(tt.funcs))
canceled atomic.Bool
)
var canceled atomic.Bool
var g gogroup.Group
for i, f := range tt.funcs {
doneCh[i] = make(chan struct{})
g.Add(func(ctx context.Context) error {
defer func() {
close(doneCh[i])
}()

// wait before function call
if i > 0 {
<-doneCh[i-1]
}

if tt.wantCanceledFunc >= 0 && tt.wantCanceledFunc < i {
select {
case <-time.After(1 * time.Second):
case <-ctx.Done():
canceled.Store(true)
}
}

return f(ctx)
})
}

ctx := withCancel(context.Background())
tid := t.Name() + "/" + uuid.NewString()
ctx := withCancel(testid.WithValue(context.Background(), tid))
gogrouptest.WithoutParallel(t, ctx)
err := g.Run(ctx)

switch {
Expand Down
23 changes: 23 additions & 0 deletions internal/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package internal

import (
"context"

"github.com/sourcegraph/conc/panics"
"github.com/sourcegraph/conc/pool"
)

var Start = DefaultStart

func DefaultStart(ctx context.Context, funcs []func(context.Context) error) func() error {
p := pool.New().WithContext(ctx).WithCancelOnError()
for _, f := range funcs {
p.Go(func(ctx context.Context) (rerr error) {
if r := panics.Try(func() { rerr = f(ctx) }); r != nil {
return r.AsError()
}
return rerr
})
}
return p.Wait // return method value
}

0 comments on commit 9d219b4

Please sign in to comment.