package atomic

原子操作 atomic operation

代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供。

原子操作(atomic operation)是指不会被线程调度机制(不会被其他操作)打断的操作,并且这样的操作一旦开始,就一直运行到结束,之间不会有任何的中断。通俗地说,原子操作就如同一个原子一样,不可分割。

一个或者多个操作在 CPU 执行的过程中不被中断的特性,称为原子性(atomicity)。这些操作对外表现成一个不可分割的整体,他们要么都执行,要么都不执行,外界不会看到他们只执行到一半的状态。

在 Go(甚至是大部分语言)中,一条普通的赋值语句其实不是一个原子操作。例如,在32位机器上写int64类型的变量就会有中间状态,因为它会被拆成两次写操作(MOV)——写低 32 位和写高 32 位

atomic包

方法 解释
func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr*uint32) (val uint32)
func LoadUint64(addr*uint64) (val uint64)
func LoadUintptr(addr*uintptr) (val uintptr)
func LoadPointer(addr*unsafe.Pointer) (val unsafe.Pointer)
读取操作
func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) 写入操作
func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) 修改操作
func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) 交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) 比较并交换操作

eg

互斥锁和原子操作的性能比较

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
var x int64
var l sync.Mutex
var wg sync.WaitGroup

// 普通版加函数
func add() {
// x = x + 1
x++ // 等价于上面的操作
wg.Done()
}

// 互斥锁版加函数
func mutexAdd() {
l.Lock()
x++
l.Unlock()
wg.Done()
}

// 原子操作版加函数
func atomicAdd() {
atomic.AddInt64(&x, 1)
wg.Done()
}

func main() {
start := time.Now()
for i := 0; i < 10000; i++ {
wg.Add(1)
// go add() // 普通版add函数 不是并发安全的
// go mutexAdd() // 加锁版add函数 是并发安全的,但是加锁性能开销大
go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版
}
wg.Wait()
end := time.Now()
fmt.Println(x)
fmt.Println(end.Sub(start))
}

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。

Go 的原子操作

Go语言的sync/atomic提供了对原子操作的支持,用于同步访问整数和指针。

原子操作主要是两类:

  • 修改:即重新赋值。
  • 存储:即读写。

sync/atomic 提供AddXXXCompareAndSwapXXXSwapXXXLoadXXXStoreXXX等方法。原子操作支持的类型包括int32、int64、uint32、uint64、uintptr、unsafe.Pointer

竞争条件是由于异步的访问共享资源,并试图同时读写该资源而导致的,使用互斥锁和通道的思路都是在线程获得到访问权后阻塞其他线程对共享内存的访问,而使用原子操作解决数据竞争问题则是利用了其不可被打断的特性。

Add

Add 方法很好理解,就是对addr指向的值加deltadelta可以为整数,也可以为负数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// AddInt32 atomically adds delta to *addr and returns the new value.
func AddInt32(addr *int32, delta int32) (new int32)

// AddUint32 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint32(&x, ^uint32(c-1)).
// In particular, to decrement x, do AddUint32(&x, ^uint32(0)).
func AddUint32(addr *uint32, delta uint32) (new uint32)

// AddInt64 atomically adds delta to *addr and returns the new value.
func AddInt64(addr *int64, delta int64) (new int64)

// AddUint64 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint64(&x, ^uint64(c-1)).
// In particular, to decrement x, do AddUint64(&x, ^uint64(0)).
func AddUint64(addr *uint64, delta uint64) (new uint64)

// AddUintptr atomically adds delta to *addr and returns the new value.
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

从注释中可以看出,对于无符号的 uint整型,如果要做减法,可以利用计算机补码的规则,把减法变成加法:

AddUint32(&x, ^uint32(c-1)) // 相当于 x - c
AddUint64(&x, ^uint64(c-1)) // 相当于 x - c

CAS(CompareAndSwap)

go 中的 CAS 操作,是借用了CPU提供的原子性指令来实现。CAS 操作修改共享变量时候不需要对共享变量加锁,而是通过类似乐观锁的方式进行检查,本质还是不断的占用 CPU 资源换取加锁带来的开销(比如上下文切换开销)。CAS 支持的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

// CompareAndSwapInt64 executes the compare-and-swap operation for an int64 value.
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

// CompareAndSwapUint32 executes the compare-and-swap operation for a uint32 value.
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)

// CompareAndSwapUint64 executes the compare-and-swap operation for a uint64 value.
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)

// CompareAndSwapUintptr executes the compare-and-swap operation for a uintptr value.
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

// CompareAndSwapPointer executes the compare-and-swap operation for a unsafe.Pointer value.
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

CompareAndSwapX函数会先判断参数addr指向的操作值与参数old的值是否相等,仅当此判断得到的结果是true之后,才会用参数new代表的新值替换掉原先的旧值,否则操作就会被忽略。

如下是一个比较和交换变量ab的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func TestCAS(t *testing.T) {
var a int64 = 9
t.Log("a before:", a)
t.Log("a交换是否成功", atomic.CompareAndSwapInt64(&a, 10, 11))
t.Log("a after:", a)

var b int64 = 10
t.Log("b before:", b)
t.Log("b交换是否成功", atomic.CompareAndSwapInt64(&b, 10, 11))
t.Log("b after:", b)
}

/*
a before: 9
a交换是否成功 false
a after: 9
b before: 10
b交换是否成功 true
b after: 11
*/

sync.MutexLock使用CompareAndSwapInt32实现自旋锁:

1
2
3
4
5
6
7
8
9
10
11
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // 自旋锁
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}

Swap

Swap 与 CompareAndSwap 方法相比,少了 Compare,即不需要进行比较就交换的原子操作。支持的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// SwapInt32 atomically stores new into *addr and returns the previous *addr value.
func SwapInt32(addr *int32, new int32) (old int32)

// SwapInt64 atomically stores new into *addr and returns the previous *addr value.
func SwapInt64(addr *int64, new int64) (old int64)

// SwapUint32 atomically stores new into *addr and returns the previous *addr value.
func SwapUint32(addr *uint32, new uint32) (old uint32)

// SwapUint64 atomically stores new into *addr and returns the previous *addr value.
func SwapUint64(addr *uint64, new uint64) (old uint64)

// SwapUintptr atomically stores new into *addr and returns the previous *addr value.
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

// SwapPointer atomically stores new into *addr and returns the previous *addr value.
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

Load(原子读取)和Store(原子写入)

Load 和 Store 方法可以说是成对使用的。Store 方法将一个值存到指定的addr地址中,Load 方法从指定的addr地址读取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// LoadInt32 atomically loads *addr.
func LoadInt32(addr *int32) (val int32)

// LoadInt64 atomically loads *addr.
func LoadInt64(addr *int64) (val int64)

// LoadUint32 atomically loads *addr.
func LoadUint32(addr *uint32) (val uint32)

// LoadUint64 atomically loads *addr.
func LoadUint64(addr *uint64) (val uint64)

// LoadUintptr atomically loads *addr.
func LoadUintptr(addr *uintptr) (val uintptr)

// LoadPointer atomically loads *addr.
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

// StoreInt32 atomically stores val into *addr.
func StoreInt32(addr *int32, val int32)

// StoreInt64 atomically stores val into *addr.
func StoreInt64(addr *int64, val int64)

// StoreUint32 atomically stores val into *addr.
func StoreUint32(addr *uint32, val uint32)

// StoreUint64 atomically stores val into *addr.
func StoreUint64(addr *uint64, val uint64)

// StoreUintptr atomically stores val into *addr.
func StoreUintptr(addr *uintptr, val uintptr)

// StorePointer atomically stores val into *addr.
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

往变量a存储一个值,然后再从a读取值:

1
2
3
4
5
6
7
func TestStoreAndLoad(t *testing.T) {
var a int32

atomic.StoreInt32(&a, 10)
res := atomic.LoadInt32(&a)
t.Log(res) // 10
}

atomic.Value

atomic.Value是Go语言1.4版本的时候加入的,它相当于一个容器,可以原子的StoreLoad任意类型的值。是对int32int64uint32uint64uintptrunsafe.Pointer类型原子操作的补充。但它的实现不是通过汇编来完成,而是基于已有的atomic包。

atomic.Value 实现了Load() (val any)Store(val any)Swap(new any) (old any)CompareAndSwap(old any, new any) (swapped bool)四个方法

使用时需要遵循原则:

  1. 不能存储nil;
  2. 对于同一个atomic.Value不能存入类型不同的值。存储第一个值后,就只能存储这个类型的值。
  3. atomic.Value可以实现对自定义类型的原子操作
  4. 最好不要使用atomic.Value存储引用类型的值,可能导致数据不是并发安全的

使用示例:

1
2
3
4
5
6
7
8
9
10
func TestValue(t *testing.T) {
var a atomic.Value
a.Store(int32(10))
res := a.Load().(int32)
t.Log(res) // 10

t.Log(a.CompareAndSwap(int32(10), int32(11))) // true
res = a.Load().(int32)
t.Log(res) // 11
}

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// A Value provides an atomic load and store of a consistently typed value.
// The zero value for a Value returns nil from Load.
// Once Store has been called, a Value must not be copied.
//
// A Value must not be copied after first use.
type Value struct {
v interface{}
}

// ifaceWords is interface{} internal representation.
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}

atom.Value的实现可以发现一个内部的结构ifaceWords,使用unsafe.Pointer存储数据类型及内容的指针

其实atomic.Value的实现原理就是将interface{}类型分解,得到类型和数据这两个unsafe.Pointer类型字段,在针对它们进行原子操作来达到interface{}类型原子操作的目的。

引用类型带来的坑点

因为atom.Value内部实际上维护的是存储值的指针,而这个指针因为不对外暴露,所以认为是并发安全的。然而如果尝试用它来存储引用类型,维护的就是这个引用类型的指针,则不能保证实际的数据是并发安全的。举个例子:

uint32是值类型,切片[]uint32是引用类型,使用两个函数来尝试修改值。

使用Store存入uint32的值a后,无论怎么在外部修改a,使用 Load 都可以获取到 Store 的值。

若使用Store存入引用类型的切片,在外部修改值,Load出来的值也会收到影响。这是因为对于一个引用类型,实际上只是Store了一个指针,只是对一个指针的原子操作,而这个指针实际指向的地址的值,并不在atomic.Value的维护下,所以并不是并发安全的。

go 指针类型

在进行源码分析前思考一下go的指针类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import "fmt"

func double(x *int) {
*x += *x
x = nil
}

func main() {
var a = 3
double(&a)
fmt.Println(a) // 6

p := &a
double(p)
fmt.Println(a, p == nil) // 12 false
}

x=nil并不会对数据 a 产生影响,x 的内存地址从指向 a 转变成指向 nil.如果对 a 的值进行修改,通过*x方式。 go 语言中指针不能进行数学运算,不同类型间指针不能进行转换。

unsafe.Pointer

Go语言的编译器会使用静态类型检查来保证程序运行的类型安全。但它的标准库中又提供了unsafe.Pointer,可以让程序灵活的操作内存并且可以绕过Go语言的类型检查,从而可以跟任意的指针类型相互转换。

例如 字符串和byte切片之前的零拷贝转换

1
2
3
4
5
6
7
8
9
10
type StringHeader struct {
Data uintptr
Len int
}

type SliceHeader struct {
Data uintptr
Len int
Cap int
}

slice 和 string 的底层数据结构基本一样,虽然Go语言的类型检查禁止了它们之间相互转换。但是拥有了unsafe.Pointer这个黑魔法,就可以零拷贝实现[]byte 和string之间的转换,只需共享底层的Data和Len即可

1
2
3
4
5
6
func string2bytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(&s))
}
func bytes2string(b []byte) string{
return *(*string)(unsafe.Pointer(&b))
}

修改结构体变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type A struct{
 i string
 j int64
}

func main(){
 n := A{i: "a", j: 1}
 nPointer := unsafe.Pointer(&n)

 niPointer := (*string)(unsafe.Pointer(nPointer))
 *niPointer = "a1"

 njPointer := (*int64)(unsafe.Pointer(uintptr(nPointer) + unsafe.Offsetof(n.j)))
 *njPointer = 2

 fmt.Printf("n.i: %s, n.j: %d", n.i, n.j)
}

结果为a1 2.

结构体的成员变量在内存存储上是一段连续的内存。结构体的初始地址就是第一个成员变量的内存地址。基于结构体的成员地址去计算偏移量。就能够得出其他成员变量的内存地址。

i 为第一个成员变量。因此不需要进行偏移量计算,直接取出指针后转换为 Pointer,再强制转换为字符串类型的指针值即可。

j 为第二个成员变量。需要进行偏移量计算,才可以对其内存地址进行修改.(uintptr 是 Go 的内置类型。返回无符号整数,可存储一个完整的地址。后续常用于指针运算)。

unsafe.Offsetof:func Offsetof(x ArbitraryType) uintptr返回成员变量 x 在结构体当中的偏移量。更具体的讲,就是返回结构体初始位置到 x 之间的字节数。

atomic.Value 的 Store

在 Store 内,只涉及到指针的原子操作,不涉及到数据拷贝,所以Value.Store() 的参数必须是个局部变量(或者说是一块全新的内存)。

Store大致逻辑:

  1. 首先判断待存储值是否为nil,若为nil会直接panic
  2. 将待存储的数据和当前的值分别转换为ifaceWords
  3. 进入一个无限for循环
    2.1 先检查现有值的typ,如果为nil表示这是第一次存储,则先调用runtime_procPin()通过修改当前g关联m的locks属性来禁止P被抢占
    2.2 尝试使用CompareAndSwapPointer将现有值的typ设置为unsafe.Pointer(^uintptr(0))方便Load 操作时判断当前状态,如果失败则解除抢占回到for循环开始位置继续执行
    2.3 如果设置成功,则可以完成第一次的数据存储
  4. 自旋等待中的gorountine如果发现uintptr(typ) == ^uintptr(0)表明第一次存储尚未完成则继续自旋等待
  5. 到这里说明第一次存储已经完成,则检查Value从始至终是否都是保存同一类型数据,不是则panic
  6. 非第一次存储,则更新数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Store 将Value的值设置为x
// 给定值的所有Store调用都必须使用相同的具体类型否则会像存储nil值一样会发生panic
func (v *Value) Store(x interface{}) {
if x == nil {
panic("sync/atomic: store of nil value into Value")
}
vp := (*ifaceWords)(unsafe.Pointer(v))
xp := (*ifaceWords)(unsafe.Pointer(&x))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// 尝试开始第一次存储
// 禁止P被抢占 以便其他goroutine可以使用主动自旋等待来等待完成
// GC也不会偶然看到伪造的类型
runtime_procPin()
// 加了个乐观锁 通过cas操作来检查并设置tpy为特殊的标志位
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
// 没抢到机会 则退出
runtime_procUnpin()
continue
}
// 如果成功表示获得设置存储的权利,执行第一次存储
StorePointer(&vp.data, xp.data)
StorePointer(&vp.typ, xp.typ)
runtime_procUnpin()
return
}
if uintptr(typ) == ^uintptr(0) {
// 第一次存储正在进行 继续等待
// 因为在第一次存储前后已经禁止了抢占所以可以继续自旋等待
continue
}
// 第一次存储完成 检查类型并且覆盖数据
// First store completed. Check type and overwrite data.
if typ != xp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, xp.data)
return
}
}

atomic.Value 的 Load

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Load 返回最近一次Store存储的数据
// 如果没有调用Store存储数据则返回nil
func (v *Value) Load() (x interface{}) {
vp := (*ifaceWords)(unsafe.Pointer(v))
// 一定要用LoadPointer,以保证原子性,比如读到的不是cpu cache
typ := LoadPointer(&vp.typ)
if typ == nil || uintptr(typ) == ^uintptr(0) {
// 未调用Store或第一次存储尚未完成 直接返回nil
return nil
}
data := LoadPointer(&vp.data)
xp := (*ifaceWords)(unsafe.Pointer(&x))
xp.typ = typ
xp.data = data
return
}

bug

atomic包中有一段这样的注释

BUG(rsc): On 386, the 64-bit functions use instructions unavailable before the Pentium MMX. On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core. On ARM, 386, and 32-bit MIPS, it is the caller’s responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.

不同硬件平台并不是都可以在任意地址上访问任意数据;而且如果数据没有内存对齐可能会导致CPU访问两次内存才能拿到数据,如果内存对齐一次就能完成数据读取。

这里大概是说在ARM,386,和32位MIPS,调用者有责任安排原子访问的64位字按照8字节对齐,否则程序会panic。因为不同平台上的编译器有自己的对齐系数,32bit平台上一般是4字节对齐,而在64bit平台上一般是8字节对齐。所以32bit平台上8字节数字可能会因为内存对齐拆分成2个4字节分布。

eg:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"sync/atomic"
)

type M struct {
x int64
u uint32
v int64
}

func main() {
m := M{}
result := atomic.AddInt64(&m.v, 1)
fmt.Println(result)
}

GOARCH=amd64 go build pointer.go && ./pointer 执行正常

但是在386上GOARCH=386 go build pointer.go && ./pointer 程序发生panic

1
2
3
4
5
6
7
8
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x2f7c]

goroutine 1 [running]:
runtime/internal/atomic.Xadd64(0x1141612c, 0x1, 0x0, 0x53f8, 0x1141a230)
/usr/local/go/src/runtime/internal/atomic/asm_386.s:105 +0xc
main.main()
/Users/mywork/workspace/workspace_go/godemo/pointer/pointer.go:16 +0x40

原子操作与互斥锁的区别

互斥锁是一种数据结构,使你可以执行一系列互斥操作。而原子操作是互斥的单个操作,这意味着没有其他线程可以打断它

原子锁的优缺点:

  • 优势:更轻量。比如 CAS 可以在不形成临界区和创建互斥量的情况下完成并发安全的值替换操作。这可以大大减少同步对程序性能的损耗。
  • 劣势:使用 CAS 操作的做法趋于乐观,总是假设被操作值未曾被改变(即与旧值相等),并一旦确认这个假设的真实性就立即进行值替换,那么在被操作值被频繁变更的情况下,CAS 操作并不那么容易成功。而使用互斥锁的做法则趋于悲观,我们总假设会有并发的操作要修改被操作的值,并使用锁将相关操作放入临界区中加以保护。

原子操作与互斥锁的区别:

  • 互斥锁是一种数据结构,用来让一个线程(或 goroutine)执行程序的关键部分,完成互斥的多个操作。
  • 原子操作是针对某个值的单个互斥操作
  • 可以把互斥锁理解为悲观锁,共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程。

atomic包提供了底层的原子性内存原语,这对于同步算法的实现很有用。这些函数一定要非常小心地使用,使用不当反而会增加系统资源的开销,对于应用层来说,最好使用通道或sync包中提供的功能来完成同步操作。