Skip to content

二十二、CSP并发机制

wenjianzhang edited this page Nov 12, 2019 · 1 revision

CSP

Communcating sequential processes (CSP)

CSP vs. Actor

  • 和 Actor 的直接通讯不同, CSP 模式则是通过 Channel 进行通讯的, 更松耦合一些。
  • Go 中 channel 是有容量限制并且独立于处理Groutine,而如 Erlang, Actor 模式中的 mailbox 的容量是无限的,接收进程也总是被动地处理消息。

Channel

先看一段代码

func service() string {
	time.Sleep(time.Millisecond * 50)
	return "Done"
}

func otherTask() {
	fmt.Println("working on something else")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("task is done.")
}

func TestService(t *testing.T) {
	fmt.Println(service())
	otherTask()
}

输出

=== RUN   TestService
Done
working on something else
task is done.
--- PASS: TestService (0.15s)
PASS

Process finished with exit code 0

可以看出上述打印结果是按照程序执行的顺序 下边是用 Channel

func service() string {
	time.Sleep(time.Millisecond * 50)
	return "Done"
}

func otherTask() {
	fmt.Println("working on something else")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("task is done.")
}

func AsyncService() chan string {
	retCh := make(chan string)
	go func() {
		ret := service()
		fmt.Println("returned result.")
		retCh <- ret
		fmt.Println("service exited.")
	}()
	return retCh
}

func TestAsyncService(t *testing.T) {
	retCh := AsyncService()

	otherTask()
	fmt.Println(<-retCh)
	time.Sleep(time.Second * 1)
}

输出

=== RUN   TestAsyncService
working on something else
returned result.
task is done.
Done
service exited.
--- PASS: TestAsyncService (1.11s)
PASS

Process finished with exit code 0

使用了channel ,可以看到 service exited. 消息打印在了最后,所以,即便我们channel里边放进了消息,其他的task没有结束之前,service的那个协程都会被阻塞住,直到channel 消息被接收掉; 下边使用buffer channel

func service() string {
	time.Sleep(time.Millisecond * 50)
	return "Done"
}

func otherTask() {
	fmt.Println("working on something else")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("task is done.")
}

func AsyncService() chan string {
	//retCh := make(chan string)
	retCh := make(chan string,1)
	go func() {
		ret := service()
		fmt.Println("returned result.")
		retCh <- ret
		fmt.Println("service exited.")
	}()
	return retCh
}

func TestAsyncService(t *testing.T) {
	retCh := AsyncService()

	otherTask()
	fmt.Println(<-retCh)
	time.Sleep(time.Second * 1)
}

输出

=== RUN   TestAsyncService
working on something else
returned result.
service exited.
task is done.
Done
--- PASS: TestAsyncService (1.11s)
PASS

Process finished with exit code 0

这个时候我们能看到 Done 这个消息打印到了 service exited. 后边,说明那个service 协程没有被阻塞,执行完就释放掉了, channel 里的消息在 otherTask 执行完以后, 正确的打印了出来;

Clone this wiki locally