Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
leaxoy committed Mar 4, 2020
1 parent 9930f21 commit 44bf803
Show file tree
Hide file tree
Showing 146 changed files with 9,939 additions and 1 deletion.
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,23 @@
# x-go
# x-go

go标准库的拓展,提供各种便利的工具

[xcontainer](xcontainer/README.md) 集合库

[xcrypt](xcrypt/README.md) 加解密拓展

[xfs](xfs/README.md) 文件系统抽象

[xhash](xhash/README.md) hash拓展

[xnet](xnet/README.md) net拓展

[xsort](xsort/README.md) sort拓展

[xstrings](xstrings/README.md) string拓展

[xsync/xatomic](xsync/xatomic/README.md) 原子量拓展

[xtime](xtime/README.md) time拓展

[xunsafe](xunsafe/README.md) unsafe拓展
60 changes: 60 additions & 0 deletions concurrent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Concurrent 并发工具

## Concurrent 并发请求并等待的工具
```go
type Concurrent struct {}
func NewConcurrent(ctx context.Context) (*Concurrent, context.Context)
// 安排一个异步任务并返回
func (c *Concurrent) SpawnContext(ctx context.Context, fn func(ctx context.Context) error)
// 安排一个异步任务并返回
func (c *Concurrent) Spawn(fn func() error)
// 安排一些任务并等待结果返回
func (c *Concurrent) SpawnAndWait(ctx context.Context, fns ...func(context.Context) error) error
// 等待任务的执行结束,和SpawnContext与Spawn结合使用
func (c *Concurrent) Wait(ctx context.Context) error
```

## Pool 协程池
```go
type Task struct {
ctx context.Context
cancel context.CancelFunc

fn func(ctx context.Context) error
done chan error
}
// 取消一个任务,目前仅限于任务还未进行调度之前
func (t *Task) Cancel()
// 同步等待一个任务的完成
func (t *Task) Wait() error
// 带有超时的等待一个任务的完成
func (t *Task) WaitTimeout(timeout time.Duration) error
type Pool struct {
// TODO: 使用无锁数据结构来提高性能
taskCh chan *Task
workerNum int
done chan struct{}
}
// 创建一个协程池,worker数量为workerNum,任务队列大小为taskNum
func NewPool(workerNum int, taskNum int) *Pool
// 当前任务队列大小
func (p *Pool) JobQueueSize() int
// 调度一个任务并返回任务的token,用来进行控制
func (p *Pool) Spawn(ctx context.Context, fn func(ctx context.Context) error) *Task
// 关闭协程池
func (p *Pool) Shutdown()
```
## Semaphore 信号量,并发量
```go
type Semaphore struct {...}
// 创建一个并发度为n的信号量
func NewSemaphore(n int64) *Semaphore
// 获取token,并执行,如果没有可用的token,会等待
func (s *Semaphore) SpawnContext(ctx context.Context, fn func(ctx context.Context) error) error
// 获取token,并执行,如果没有可用的token,会等待
func (s *Semaphore) Spawn(fn func() error) error
// 获取token,并执行,如果没有可用的token,会立即返回
func (s *Semaphore) TrySpawnContext(ctx context.Context, fn func(ctx context.Context) error) error
// 获取token,并执行,如果没有可用的token,会立即返回
func (s *Semaphore) TrySpawn(fn func() error) error
```
54 changes: 54 additions & 0 deletions concurrent/concurrent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package concurrent

import (
"context"

"golang.org/x/sync/errgroup"
)

type Concurrent struct {
group *errgroup.Group
panicHandle func(x interface{})
}

func (c *Concurrent) SpawnContext(ctx context.Context, fn func(ctx context.Context) error) {
c.group.Go(func() error {
defer func() {
if x := recover(); x != nil {
if c.panicHandle != nil {
c.panicHandle(x)
}
}
}()
return fn(ctx)
})
}

func (c *Concurrent) Spawn(fn func() error) {
c.group.Go(func() error {
defer func() {
if x := recover(); x != nil {
if c.panicHandle != nil {
c.panicHandle(x)
}
}
}()
return fn()
})
}

func (c *Concurrent) Wait(ctx context.Context) error {
return c.group.Wait()
}

func (c *Concurrent) SpawnAndWait(ctx context.Context, fns ...func(context.Context) error) error {
for _, fn := range fns {
c.SpawnContext(ctx, fn)
}
return c.Wait(ctx)
}

func NewConcurrent(ctx context.Context) (*Concurrent, context.Context) {
group, ctx := errgroup.WithContext(ctx)
return &Concurrent{group: group}, ctx
}
71 changes: 71 additions & 0 deletions concurrent/concurrent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package concurrent_test

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/go-board/x-go/concurrent"
)

func TestConcurrent(t *testing.T) {
t.Run("wait success", func(t *testing.T) {
now := time.Now()
c, _ := concurrent.NewConcurrent(context.Background())
c.Spawn(func() error {
time.Sleep(time.Second)
return nil
})
c.Spawn(func() error {
time.Sleep(time.Second * 2)
return nil
})
err := c.Wait(context.Background())
require.Nil(t, err, "errCh must be nil")
require.Equal(t, 2, int(time.Since(now).Seconds()), "longest elapsed time is two seconds")
})
t.Run("wait error", func(t *testing.T) {
now := time.Now()
c, _ := concurrent.NewConcurrent(context.Background())
c.Spawn(func() error {
time.Sleep(time.Second)
return nil
})
c.Spawn(func() error {
time.Sleep(time.Second * 2)
return errors.New("not found")
})
c.Spawn(func() error {
time.Sleep(time.Second * 3)
return nil
})
err := c.Wait(context.Background())
require.EqualError(t, err, "not found", "errCh must be not found")
require.Equal(t, 3, int(time.Since(now).Seconds()), "longest elapsed time is two seconds")
})
t.Run("wait error last", func(t *testing.T) {
now := time.Now()
c, _ := concurrent.NewConcurrent(context.Background())
var a int
c.Spawn(func() error {
a = 123
time.Sleep(time.Second)
return nil
})
c.Spawn(func() error {
time.Sleep(time.Second * 2)
return nil
})
c.Spawn(func() error {
time.Sleep(time.Second * 3)
return errors.New("timeout")
})
err := c.Wait(context.Background())
t.Logf("%+v\n", a)
require.EqualError(t, err, "timeout", "errCh must be timeout")
require.Equal(t, 3, int(time.Since(now).Seconds()), "longest elapsed time is two seconds")
})
}
100 changes: 100 additions & 0 deletions concurrent/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package concurrent

import (
"context"
"errors"
"runtime"
"time"
)

type TaskHandle struct {
ctx context.Context
cancel context.CancelFunc

fn func(ctx context.Context) error
done chan error
}

func (t *TaskHandle) Cancel() {
if t.cancel != nil {
t.cancel()
}
}

func (t *TaskHandle) Wait() error {
return <-t.done
}

func (t *TaskHandle) WaitTimeout(timeout time.Duration) error {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-timer.C:
return errors.New("context deadline exceeded")
case err := <-t.done:
return err
}
}

type Pool struct {
// TODO: 使用无锁数据结构来提高性能
taskCh chan *TaskHandle
workerNum int
done chan struct{}
}

func (p *Pool) poll() {
for i := 0; i < p.workerNum; i++ {
go func() {
for {
select {
case <-p.done:
return
case task := <-p.taskCh:
select {
case <-task.ctx.Done():
task.done <- task.ctx.Err()
default:
task.done <- task.fn(task.ctx)
}
}
}
}()
}
}

func (p *Pool) JobQueueSize() int {
return len(p.taskCh)
}

func (p *Pool) Spawn(ctx context.Context, fn func(ctx context.Context) error) *TaskHandle {
ctx, cancel := context.WithCancel(ctx)
t := &TaskHandle{
ctx: ctx,
cancel: cancel,
fn: fn,
done: make(chan error, 1),
}
p.taskCh <- t
return t
}

func (p *Pool) Shutdown() {
close(p.done)
}

func NewPool(workerNum int, taskNum int) *Pool {
pool := &Pool{
taskCh: make(chan *TaskHandle, taskNum),
workerNum: workerNum,
done: make(chan struct{}),
}
pool.poll()
return pool
}

var globalPool = NewPool(runtime.NumCPU(), 10000)

func Spawn(ctx context.Context, fn func(ctx context.Context) error) *TaskHandle {
return globalPool.Spawn(ctx, fn)
}
36 changes: 36 additions & 0 deletions concurrent/executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package concurrent_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/go-board/x-go/concurrent"
)

func TestPool(t *testing.T) {
t.Run("spawn", func(t *testing.T) {
p := concurrent.NewPool(1, 10)
now := time.Now()
task := p.Spawn(context.Background(), func(ctx context.Context) error {
time.Sleep(time.Second)
return nil
})
err := task.Wait()
require.Nil(t, err, "err must be nil")
require.Equal(t, 1, int(time.Since(now).Seconds()), "task spent 1 second")
})
t.Run("spawn timeout", func(t *testing.T) {
p := concurrent.NewPool(1, 10)
now := time.Now()
task := p.Spawn(context.Background(), func(ctx context.Context) error {
time.Sleep(time.Second * 2)
return nil
})
err := task.WaitTimeout(time.Second)
require.NotNil(t, err, "err must be not nil")
require.Equal(t, 1, int(time.Since(now).Seconds()), "task spent 1 second")
})
}
Loading

0 comments on commit 44bf803

Please sign in to comment.