Skip to Content

Read-and-Write-in-High-Concurrency

Posted on

问题

今天遇到一个高并发读写的问题。

具体的场景是,有一个“策略”的集合,然后每秒有很多消息进来,每一条消息都要匹配有没有对应的策略,如果有的话就应用策略(更改消息的某个属性),没有的话就返回。

抽象来看,就是在N多读的同时,怎样去写数据。

一开始我的方法是策略存在数组里,消息来了就去遍历数组,如果匹配到了就返回对应的规则。这种方法最笨,因为每一条消息过来,我都要去循环遍历整个数组,如果数组很长的话(有很多规则),那么带来的无谓开销会很大,复杂度为O(n)。

而且还一个问题,如果在range数组的时候,数组发生了变化,那么就会读到错误的值,或者崩溃。

一种解决的方法是,在遍历之前,首先拷贝一份新的,遍历新的策略数组,而不是原有的全局变量。这种方法的问题在于,每次匹配规则的时候,都要进行一次拷贝,虽然简单,也能解决问题,但,太浪费资源,而且很丑。

最终的思路是,用哈希表的方式去匹配策略,从复杂度上来看是O(1)的操作,但问题在于并发读写哈希表。

复现

为了更容易的表示问题,用了大量的并发读写(实际情况没有下面那么频繁,写操作要比读操作少得多得多):

package main

import (
	"fmt"
	"time"
)

var biu map[int]int

func read() {
	for i := 0; i < len(biu); i++ {
		if biu[i] != i {
			fmt.Printf("%d != %d\n", biu[i], i)
			panic("!")
		}
	}
}

func write() {
	for i := 0; i < time.Now().Second(); i++ {
		biu[i] = i
	}
}

func main() {
	biu = make(map[int]int)

	go func() {
		for {
			go write()
			time.Sleep(time.Millisecond)
		}
	}()

	for {
		go read()
		time.Sleep(time.Millisecond)
	}

}

会遇到下面的错误:

➜  rw-cocurrency go run main.go 
fatal error: concurrent map read and map write

goroutine 146 [running]:
runtime.throw(0x4bd628, 0x21)
	/opt/go/src/runtime/panic.go:605 +0x95 fp=0xc42002bf30 sp=0xc42002bf10 pc=0x427415
runtime.mapaccess1_fast64(0x4a0e40, 0xc4200740f0, 0x1, 0xc4200900d8)
	/opt/go/src/runtime/hashmap_fast.go:119 +0x1a8 fp=0xc42002bf58 sp=0xc42002bf30 pc=0x40aa98
...
...

尝试方案

互斥锁(Mutex)

首先想到的是在读写的时候加锁,确保读和写不会同时发生。

var m sync.Mutex

func read() {
	m.Lock()
	defer m.Unlock()
	for i := 0; i < len(biu); i++ {
		if biu[i] != i {
			fmt.Printf("%d != %d\n", biu[i], i)
			panic("!")
		}
	}
}

func write() {
	m.Lock()
	defer m.Unlock()
	for i := 0; i < time.Now().Second(); i++ {
		biu[i] = i
	}
}

但随之而来了另一个问题,如果每秒有1w的读,每分钟有1个写,那么我这1w/s的LockUnLock岂不是很尴尬?而且读的时候并不关心别人,只关心是否有写就行了。

所以,互斥锁就变成了读写锁。

读写锁(RWMutex)

这种锁的好处在于,读是可以并发的,Read Locker并不会影响其他的读操作,但Write Locker又是优先于Read Locker,而且读和写互斥,即,如果有写操作发生,那么读操作会等待写完成,然后再读。

var m sync.RWMutex

func read() {
	m.RLock()
	defer m.RUnlock()
	for i := 0; i < len(biu); i++ {
		if biu[i] != i {
			fmt.Printf("%d != %d\n", biu[i], i)
			panic("!")
		}
	}
}

func write() {
	m.Lock()
	defer m.Unlock()
	for i := 0; i < time.Now().Second(); i++ {
		biu[i] = i
	}
}

但是(凡是总有个但是),这样还是不够“高性能”,因为读的时候加锁还是有点丑。有没有更好的方案?

很高兴,golang在1.9中发布了官方的sync.Map

有位大佬写了详细的介绍:http://colobu.com/2017/07/11/dive-into-sync-Map/

sync.Map

// Map is a concurrent map with amortized-constant-time loads, stores, and deletes.
// It is safe for multiple goroutines to call a Map's methods concurrently.
//
// It is optimized for use in concurrent loops with keys that are
// stable over time, and either few steady-state stores, or stores
// localized to one goroutine per key.
//
// For use cases that do not share these attributes, it will likely have
// comparable or worse performance and worse type safety than an ordinary
// map paired with a read-write mutex.
//
// The zero Map is valid and empty.
//

It is optimized for use in concurrent loops with keys that are stable over time, and either few steady-state stores, or stores localized to one goroutine per key.

这句话我的理解是,如果map中的key很稳定,不会经常变动,或者并发操作不同的key,那这个map是比较厉害的。(不理解 few steady-state stores 是啥意思,如果有朋友看到了希望能帮忙解释一下)

所以,在以上我的情境中(读多写少),使用sync.Map是没差的。

最终的代码就变成了:

package main

import (
	"fmt"
	"sync"
	"time"
)

var biu sync.Map

func read() {
	biu.Range(func(k, v interface{}) bool {
		if k.(int) != v.(int) {
			fmt.Printf("%d != %d\n", k.(int), v.(int))
			panic("!")
		}
		return true
	})
}

func write() {
	for i := 0; i < time.Now().Second(); i++ {
		biu.Store(i, i)
	}
}

func main() {
	go func() {
		for {
			go write()
			time.Sleep(time.Millisecond)
		}
	}()

	for {
		go read()
		time.Sleep(time.Millisecond)
	}

}

总结

  1. 匹配不要用数组,用哈希表效率会更高
  2. 哈希表不能并发读写
  3. MetuxRWMutex要根据场景选用
  4. sync.Map很牛,但也不要滥用
  5. 不要着急,凡事总会有办法
comments powered by Disqus