Article

Go(七)你说你不会并发?

Go(七)你说你不会并发?

作者:lomtom

个人网站:lomtom.cn 🔗

个人公众号:博思奥园 🔗

你的支持就是我最大的动力。

Go 系列:

  1. Go(一)基础入门
  2. Go(二)结构体
  3. Go(三)Go 配置文件
  4. Go(四)Redis 还不会使用?
  5. Go(五)Go 不知道怎么用 Gorm?
  6. Go(六)来来来,教你怎么远程调用
  7. Go(七)你说你不会并发?
  8. Go(八)还不知道函数式选项模式?

不要通过共享内存来通信,而应通过通信来共享内存。

协程(goroutine)

Go 协程具有简单的模型:它是与其它 Go 协程并发运行在同一地址空间的函数。它是轻量级的, 所有消耗几乎就只有栈空间的分配。而且栈最开始是非常小的,所以它们很廉价, 仅在需要时才会随着堆空间的分配(和释放)而变化。

Go 协程在多线程操作系统上可实现多路复用,因此若一个线程阻塞,比如说等待 I/O, 那么其它的线程就会运行。

Go 协程的设计隐藏了线程创建和管理的诸多复杂性。

在函数或方法前添加 go 关键字能够在新的 Go 协程中调用它。当调用完成后, 该 Go 协程也会安静地退出。(效果有点像 Unix Shell 中的 & 符号,它能让命令在后台运行。)

go myFunc()  // 同时运行 myFunc 不需要等待

匿名函数在协程中调用非常方便:

func TestGo(t *testing.T) {
	s := "你好吗"
	go func() {
		fmt.Println(s)
	}()
	ss := "小道科不好"
	fmt.Println(ss)
}

结果输出:

小道科不好
你好吗

在 Go 中,匿名函数都是闭包:其实现在保证了函数内引用变量的生命周期与函数的活动时间相同。

所以,值得注意的是,如果主函数执行完,而 go 后面的方法未执行完,程序同样停止。 例如,我们在 go 后面的方法中,让函数睡眠 1 秒钟,这样主程序运行完就会退出,而不会输出s

func TestGo(t *testing.T) {
	s := "你好吗"
	go func() {
		time.Sleep(1 * time.Second)
		fmt.Println(s)
	}()
	ss := "小道科不好"
	fmt.Println(ss)
}

输出:

小道科不好

那么如何避免这样的情况呢?后续会讲到

这些函数没什么实用性,因为它们没有实现完成时的信号处理。因此,我们需要信道。

管道(channel)

为什么需要 channel? 单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

Go 语言中的通道(channel)是一种特殊的类型。

管道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个管道都是一个具体类型的导管,也就是声明 channel 的时候需要为其指定元素类型。

管道与映射一样,也需要通过 make 来分配内存。

其结果值充当了对底层数据结构的引用。 若提供了一个可选的整数形参,它就会为该信道设置缓冲区大小。

默认值是零,表示不带缓冲的或同步的信道。

  1. 创建(关键词 chan)
c := make(chan int)            // 整数无缓冲信道
c := make(chan int, 0)         // 整数无缓冲信道
c := make(chan *os.File, 100)  // 指向文件的指针的缓冲信道
  1. 插入
c <- a
  1. 读取
num := <- c
  1. 声明读写管道
var c chan int
  1. 声明只写管道
var c chan<- int
  1. 声明只读管道
var c <-chan int

例子:

func TestGo(t *testing.T) {
	// 创建一个无缓冲的类型为整型的 channel
	c := make(chan int)
	// 执行自定义方法;方法结束时,会在信道上发信号
	go func() {
		// doSomething
		for i := 0; i < 5; i++ {
			// 发送一个信号
			c <- i
		}
		// 关闭管道
		close(c)
	}()
	// doSomething
	// 等待自定义方法执行完成,然后从 channel 取值
	for i := range c{
		fmt.Println(i)
	}
}

接收者在收到数据前会一直阻塞。

  1. 若信道是不带缓冲的,那么在接收者收到值前, 发送者会一直阻塞;
  2. 若信道是带缓冲的,则发送者仅在值被复制到缓冲区前阻塞;
  3. 若缓冲区已满,发送者会一直等待直到某个接收者取出一个值为止。
  4. 给一个 nil channel 发送数据,造成永远阻塞
  5. 从一个 nil channel 接收数据,造成永远阻塞
  6. 给一个已经关闭的 channel 发送数据,引起 panic
  7. 从一个已经关闭的 channel 接收数据,如果缓冲区中为空,则返回一个零值
  8. 无缓冲的 channel 是同步的,而有缓冲的 channel 是非同步的

sync

sync.WaitGroup

go中可以使用chan来实现通信,同样,Go 也提供了WaitGroup来实现同步。

上面的例子很好说明了没有使用WaitGroup存在的同步问题

func TestGo(t *testing.T) {
	s := "你好,世界"
	go func() {
		time.Sleep(1 * time.Second)
		fmt.Println(s)
	}()
	ss := "小道科不好"
	fmt.Println(ss)
}

这里重新举一个例子,我在主函数里输出十次,在myFunc里也输出十次,理论情况,都会输出。

为了模拟在myFunc未执行完,而主程序执行完的情况下,在myFunc中加入time.Sleep(time.Second*1)

func TestGo1(t *testing.T) {
	go myFunc()
	for i := 0; i < 10; i++ {
		fmt.Println("main()测试,这是第" + strconv.Itoa(i) + "次")
	}
}

func myFunc() {
	for i := 0; i < 10; i++ {
		fmt.Println("test()测试,这是第" + strconv.Itoa(i) + "次")
		time.Sleep(time.Second*1)
	}
}

输出如下:

main()测试,这是第0次
main()测试,这是第1次
main()测试,这是第2次
main()测试,这是第3次
main()测试,这是第4次
main()测试,这是第5次
main()测试,这是第6次
main()测试,这是第7次
main()测试,这是第8次
main()测试,这是第9次
test()测试,这是第0次

这显然不是我们所要的结果,那么最简单的就是在主函数的循环中也加入time.Sleep(time.Second*1)

func TestGo1(t *testing.T) {
	go myFunc()
	for i := 0; i < 10; i++ {
		fmt.Println("main()测试,这是第" + strconv.Itoa(i) + "次")
		time.Sleep(time.Second*1)
	}
}

func myFunc() {
	for i := 0; i < 10; i++ {
		fmt.Println("test()测试,这是第" + strconv.Itoa(i) + "次")
		time.Sleep(time.Second*1)
	}
}

输出如下:

main()测试,这是第0次
test()测试,这是第0次
main()测试,这是第1次
test()测试,这是第1次
main()测试,这是第2次
test()测试,这是第2次
test()测试,这是第3次
main()测试,这是第3次
main()测试,这是第4次
test()测试,这是第4次
test()测试,这是第5次
main()测试,这是第5次
main()测试,这是第6次
test()测试,这是第6次
test()测试,这是第7次
main()测试,这是第7次
main()测试,这是第8次
test()测试,这是第8次
test()测试,这是第9次
main()测试,这是第9次

这虽然达到了我们的预想的效果,但在正式情况下,我们并不会知道代码的执行速度与时间,所以这个一秒,理论可行,实际却很拉垮。

那么就可以使用WaitGroup来控制。

  1. 只要开启一个协程,就 Add(1),表示开启一个协程
  2. 协程执行完毕,则需要 Done(),表示从协程序等待组里删除
  3. 只有当所有的协程都 Done()后,才会继续执行 Wait()后续代码。
  4. Add 的协程数量,需要和 Done 的协程数对应,否则死锁报错
func TestGo1(t *testing.T) {
	var wg sync.WaitGroup
	go myFunc(&wg)
	for i := 0; i < 10; i++ {
		fmt.Println("main()测试,这是第" + strconv.Itoa(i) + "次")
		//time.Sleep(time.Second*1)
	}
	wg.Wait()
}

func myFunc(wg *sync.WaitGroup) {
	wg.Add(1)
	for i := 0; i < 10; i++ {
		fmt.Println("test()测试,这是第" + strconv.Itoa(i) + "次")
		time.Sleep(time.Second*1)
	}
	wg.Done()
}

输出:

main()测试,这是第0次
main()测试,这是第1次
main()测试,这是第2次
main()测试,这是第3次
main()测试,这是第4次
main()测试,这是第5次
test()测试,这是第0次
main()测试,这是第6次
main()测试,这是第7次
main()测试,这是第8次
main()测试,这是第9次
test()测试,这是第1次
test()测试,这是第2次
test()测试,这是第3次
test()测试,这是第4次
test()测试,这是第5次
test()测试,这是第6次
test()测试,这是第7次
test()测试,这是第8次
test()测试,这是第9次

sync.Once

在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、 只关闭一次通道等。

Go 语言中的 sync 包中提供了一个针对只执行一次场景的解决方案:sync.Once

sync.Once 对外提供的操作只有一个 Do 方法:

func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 0 {
		o.doSlow(f)
	}
}

在使用上,我们只需要将需要执行的方法传入即可。

var db *gorm.DB

var loadDbConf sync.Once

// GetDb 获取连接
func GetDb() *gorm.DB {
	loadDbConf.Do(DbInit)
	return db
}

// DbInit 数据库连接池初始化
func DbInit() {
	newLogger := logger.New(
		log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
		logger.Config{
			SlowThreshold:             time.Second, // Slow SQL threshold
			LogLevel:                  logger.Info, // Log level
			IgnoreRecordNotFoundError: true,        // Ignore ErrRecordNotFound error for logger
			Colorful:                  true,        // Disable color
		},
	)
	conn, err1 := gorm.Open(mysql.Open(mySQLUri()), &gorm.Config{
		Logger: newLogger,
	})
	if err1 != nil {
		log.Printf("mysql connect get failed.%v", err1)
		return
	}
	db = conn
	log.Printf("mysql init success")
}
type Once struct {
	done uint32
	m    Mutex
}

sync.Once 其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记 录初始化是否完成。

这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

sync.Map

Go 语言中内置的 map 不是并发安全的。

var m = make(map[string]int)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func TestGo5(t *testing.T) {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			set(key, n)
			fmt.Printf("k=:%v,v:=%v\n", key, get(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
}

上面的代码开启少量几个 goroutine 的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes错误。

k=:0,v:=0
k=:2,v:=2
k=:4,v:=4
k=:5,v:=5
k=:6,v:=6
k=:7,v:=7
k=:1,v:=1
k=:8,v:=8
k=:3,v:=3
k=:11,v:=11
k=:10,v:=10
k=:12,v:=12
fatal error: concurrent map writes
k=:13,v:=13
k=:14,v:=14

goroutine 38 [running]:
runtime.throw(0xe6715c, 0x15)
	E:/program/go/src/runtime/panic.go:1117 +0x79 fp=0xc000337ec8 sp=0xc000337e98 pc=0x7ac6f9
runtime.mapassign_faststr(0xd9b800, 0xc00003c2a0, 0xe8046a, 0x2, 0x0)

像这种场景下就需要为 map 加锁来保证并发的安全性了,Go 语言的 sync 包中提供了一个开箱即用的并发安全版 map–sync.Map。

开箱即用表示不用像内置的 map 一样使用 make 函数初始化就能直接使用。同时 sync.Map 内置了诸如 Store、Load、LoadOrStore、Delete、Range 等操作方法。

var m1 = sync.Map{}

func TestGo6(t *testing.T) {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			k := strconv.Itoa(n)
			m1.Store(k,n)
			v, _ := m1.Load(k)
			fmt.Printf("k=:%v,v:=%v\n", k, v)
			wg.Done()
		}(i)
	}
	wg.Wait()
}

并行化

1 - n 的和

这些设计的另一个应用是在多 CPU 核心上实现并行计算。

如果计算过程能够被分为几块 可独立执行的过程,它就可以在每块计算结束时向信道发送信号,从而实现并行处理。

在下面这个例子,我需要计算 1 到 n 的和,一般来说简单的直接一个循环搞定。

func TestGo(t *testing.T){
	n := 100000
	var s int
	for i := 0;i < n;i++{
		s += i
	}
	log.Println(s)
}

但是对于数据量比较大的,这样显然不适合出现在我们的代码中,那么就可以采用并行计算来实现。

func TestGo3(t *testing.T){
	t1 := time.Now()
	n := 100
	num := 10
	c := make(chan int,num)
	for i := 0;i < num;i++ {
		go func() {
			start := n / num * i
			end := n / num * (i + 1)
			var s int
			for j := start; j < end;j++ {
				s += j
			}
			c <- s
		}()
	}
	var s int
	for i := 0;i < num;i++ {
		s += <- c
	}
	t2 := time.Since(t1)
	log.Println(t2)
	log.Println(s)
}

当然也可以借用WaitGroup

func TestGo3(t *testing.T){
	t1 := time.Now()
	n := 100
	num := 10
	c := make(chan int,num)
	var wg sync.WaitGroup
	for i := 0;i < num;i++ {
		wg.Add(1)
		go func() {
			start := n / num * i
			end := n / num * (i + 1)
			var s int
			for j := start; j < end;j++ {
				s += j
			}
			c <- s
			wg.Done()
		}()
	}
	wg.Wait()
	close(c)
	var s int
	for item := range c{
		s += item
	}
	t2 := time.Since(t1)
	log.Println(t2)
	log.Println(s)
}

我们在循环中启动了独立的处理块,每个 CPU 将执行一个处理。

它们有可能以乱序的形式完成并结束,但这没有关系; 我们只需在所有 Go 协程开始后接收,并统计信道中的完成信号即可。

除了直接设置 num 常量值以外,我们还可以向 runtime 询问一个合理的值。

函数 runtime.NumCPU 🔗 可以返回硬件 CPU 上的核心数量,如此使用:

var num = runtime.NumCPU()

另外一个需要知道的函数是 runtime.GOMAXPROCS,会返回用户设置可用 CPU 数量。默认情况下使用 runtime.NumCPU 的值,但是可以被命令行环境变量,或者调用此函数并传参正整数。传参 0 的话会返回值,假如说我们尊重用户对资源的分配,

就应该这么写:

var numCPU = runtime.GOMAXPROCS(0)

注意不要混淆并发(concurrency)和并行(parallelism)的概念: 并发是用可独立执行组件构造程序的方法, 而并行则是为了效率在多 CPU 上平行地进行计算。

尽管 Go 的并发特性能够让某些问题更易构造成并行计算, 但 Go 仍然是种并发而非并行的语言,且 Go 的模型并不适合所有的并行问题。

问题

但是,如果你跑过了前面的代码,就会发现一个问题,在后面的计算其实是不准确的。

1 - 100的值应该为4950,而他每次输出的值基本都不会相同,更不可能是4950.

问题出现在 Go 的 for 循环中,该循环变量在每次迭代时会被重用,因此 i 变量会在所有的 Go 协程间共享,这不是我们想要的。

我们需要确保 i 对于每个 Go 协程来说都是唯一的。

这里有几种方法来实现:

  1. 第一种:多写一层调用 将下面的匿名方法抽成一个一般方法,将 i 作为参数传入。
func TestGo3(t *testing.T){
	t1 := time.Now()
	n := 100
	num := 10
	c := make(chan int,num)
	var wg sync.WaitGroup
	for i := 0;i < num;i++ {
		wg.Add(1)
		go myFunc1(n,num,i,c,&wg)
	}
	wg.Wait()
	close(c)
	var s int
	for item := range c{
		s += item
	}
	t2 := time.Since(t1)
	log.Println(t2)
	log.Println(s)
}

func myFunc1(n,num,i int,c chan int,wg *sync.WaitGroup) {
	start := n / num * i
	end := n / num * (i + 1)
	var s int
	for j := start; j < end;j++ {
		s += j
	}
	c <- s
	wg.Done()
}
  1. 第二种:传入参数的闭包(以 i 为参数传入)
func TestGo3(t *testing.T){
	t1 := time.Now()
	n := 100
	num := 10
	c := make(chan int,num)
	var wg sync.WaitGroup
	for i := 0;i < num;i++ {
		wg.Add(1)
		// 以i为参数传入
		go func(i int) {
			start := n / num * i
			end := n / num * (i + 1)
			var s int
			for j := start; j < end;j++ {
				s += j
			}
			c <- s
			wg.Done()
		}(i)
	}
	wg.Wait()
	close(c)
	var s int
	for item := range c{
		s += item
	}
	t2 := time.Since(t1)
	log.Println(t2)
	log.Println(s)
}
  1. 第三种:重新申明
func TestGo3(t *testing.T){
	t1 := time.Now()
	n := 100
	num := 10
	c := make(chan int,num)
	var wg sync.WaitGroup
	for i := 0;i < num;i++ {
		wg.Add(1)
		// 重新申明
		i := i
		go func() {
			start := n / num * i
			end := n / num * (i + 1)
			var s int
			for j := start; j < end;j++ {
				s += j
			}
			c <- s
			wg.Done()
		}()
	}
	wg.Wait()
	close(c)
	var s int
	for item := range c{
		s += item
	}
	t2 := time.Since(t1)
	log.Println(t2)
	log.Println(s)
}

i:= i的写法看起来有点奇怪,但在 Go 中这样做是合法且常见的。

你用相同的名字获得了该变量的一个新的版本, 以此来局部地刻意屏蔽循环变量,使它对每个 Go 协程保持唯一。

性能对比

传统方法与并行计算:

这里不考虑越界情况(即不考虑计算正确性),因为他每次结果还是会计算的。

n 值传统并行
100000003.7146ms1.058ms
10000000039.9925ms7.1645ms
1000000000344.0599ms49.9023ms
100000000003.4797346s501.5713ms
10000000000034.5406926s4.650136s

结果还是挺明显的。

Copyright

本文为原创内容,欢迎分享与引用,请保留作者与原文链接。

文章标题

Go(七)你说你不会并发?

作者

lomtom

发布方式

原创发布

原文链接 https://lomtom.cn/db04deed
More Reads