并发
什么是并发?我的理解是并发是关于如何让一个系统在面对多个任务时,既能高效地利用资源,又能正确地协调它们之间的交互。在《戴森球计划》等游戏运行时,CPU满载运行时渲染的蓝色洪流,我认为是对并发/并行最完美的具象化呈现。
先说进程。 进程是一个正在运行的程序实例,Chrome浏览器和Word文档,它们就是两个不同的进程。操作系统会为每个进程分配彼此隔离的用户空间,并把它们交给 CPU 调度执行。一个应用可以只开一个进程,也可以开很多个,这取决于应用自身;对操作系统来说,进程之间没有啥区别,它只负责把要跑的活儿交给 CPU。
那一颗 CPU 怎么“同时”跑这么多东西? 关键在于“时间片”。单个 CPU 核心同一时刻只能执行一个任务,但操作系统会把时间切成很细的片段,轮流把这些时间片分给不同的可运行实体。每个时间片到了就切换到下一个,如此快速轮转,让我们主观上感觉“所有程序都在同时跑”。这就是并发,宏观上的并行,微观上的交替。
再说线程。线程是操作系统调度的最小单位,线程通常对应一个功能。一个进程至少包含一个线程;所谓“调度进程”本质上是在调度进程里的线程。同一进程中的线程共享进程的资源,但各自拥有独立的栈(栈主要是负责存局部变量和函数调用栈等数据的)。
线程大致分两类:
- 内核级线程:创建、销毁、调度与上下文切换由内核完成,需要系统调用,开销相对更高。
- 用户级线程(协程):在线程库/运行时于用户态完成调度与切换,通常只在必要时与少量内核线程关联,因而切换开销更小。
goroutine 就是一种用户级线程 它由 Go 运行时在用户态调度,创建成本低、切换轻量,还配有按需增长的栈。运行时采用 M
Goroutine
Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。
我们想要实现一下用 10 个协程去累加一个数字,每个协程将这个变量自增 10w 次,我们期望得到 100w 的结果
package main
import ( "fmt" "time")
func main() { sum := 0 for range 10 { go func() { for range 100000 { sum += 1 } }() } time.Sleep(1 * time.Second) fmt.Println(sum)}结果呢?
root@RINAI-SWORD:/project/lanshan/count# go run ./569738root@RINAI-SWORD:/project/lanshan/count# go run ./503786root@RINAI-SWORD:/project/lanshan/count# go run ./595125得到是不确定的数字,可能有人觉得奇怪,但是在现实中,这才是符合预期的,你如果把数值调小,让多个线程累加到 100 或者 1000,你的结果可能正是你期望的,但是仅仅是因为样本数量太少而没有引发多线程出现的问题。
那么为什么会出现这种结果?
这涉及到我们 cpu 的工作原理,我们的一条自增 ++ 代码,在 cpu 层面并不是原子执行的,实际上,一个简单的自增在计算机层面会是这样的顺序:
1. cpu 从指定内存读取 sum 的值2. cpu 将 sum + 1。3. cpu 将 sum 写回指定内存。大概是这三个步骤,那么在多线程场景下思考,只要在 1 -> 2 的间隙发生线程切换,这里线程切换会将当前线程中的某些上下文数据保存下来到内存中,等到切换回线程1的时候就会从内存中恢复这些数据到 cpu 中,在另一个线程中执行了 sum + 1,就会导致我们当前 cpu 读取的数值是旧的,比如:
线程1:cpu 读取到 sum = 10。线程2:cpu 读取到 sum = 10。线程2:cpu 将 sum + 1。线程2:cpu 回写 sum,此时 sum = 11。线程1:cpu 根据读取到的 sum + 1。线程1:cpu 回写 sum,此时 sum = 11.由此可见,多线程虽然利用多核的优势,但是也给我们带来了一些麻烦。因此,我们需要同步原语,以消除这种不确定性,在并发环境中强制实现“顺序”和“互斥”
原子性和原子操作
原子性的核心目标是保证数据在并发访问下的一致性。
在多线程/多核环境中,操作系统会随时在不同的线程之间进行切换,如果一个非原子操作执行到一半被中断,而另一个线程又来读取或修改相同的数据,结果将是不可预测的。
通常是原子的(在主流32/64位CPU上):
- 读取或写入一个对齐的、原生数据类型(如在Go中,
int32、int64、bool等变量的单次读或写)通常是原子的。这意味着你不会读到半个写操作的结果。
通常不是原子的(需要额外保护):
- 任何需要多个步骤的操作:
对结构体、切片、映射等复杂数据结构的修改。
在32位系统上对64位变量的读写(如 int64),可能需要两条指令来完成。
**原子操作:**Go中 sync/atomic 包,封装了底层的硬件原子指令,提供了如 atomic.AddInt32、atomic.LoadPointer、atomic.CompareAndSwapUint64 等函数。
Channel
“不要通过共享内存来通信,而要通过通信来共享内存”,这句话便是 Go 语言的并发哲学,什么意思?就是说,如果两个 goroutine 希望共享一个变量,不应该通过一个外部的全局变量来进行加锁读写,而是应该通过 channel 将 goroutine A 中的变量传递给 goroutine B。
下面是一个任务调度器:
package main
import ( "fmt" "time")
type Task struct { // 函数也可以是结构体的成员 Runnable func(workerId int)}
func main() { // 一个负责任务分发的管道 ch := make(chan Task, 10)
// 启动几个 worker 负责处理任务 for id := range 10 { go func(workerId int) { for t := range ch { t.Runnable(workerId) } }(id) }
// 任务分发 for i := range 20 { j := i t1 := Task{ Runnable: func(workerId int) { fmt.Printf("workerId%v:task%v做一件事情\n", workerId, j) }, } ch <- t1 } time.Sleep(1 * time.Second) close(ch)}select 关键字
用于处理多个通道的读写操作,类似于switch,每个case都是通道操作,但是最终只会执行一个(如果多个通道同时接收到数值,会随机选择一个进行接收)。
package main
import ( "fmt" "time")
func main() { tick := time.Tick(100 * time.Millisecond) boom := time.After(500 * time.Millisecond) for { select { case <-tick: fmt.Println("tick.") case <-boom: fmt.Println("BOOM!") return default: fmt.Println(" .") time.Sleep(50 * time.Millisecond) } }}同步原语(Sync包)
1 .WaitGroup
它提供了一种简单、同步的方式,让一个 Goroutine(通常是主 Goroutine)能够阻塞,直到其他多个 Goroutine 都完成它们的工作。它只有三个方法Add(delta int)、Done()和Wait(),功能正如字面意思一样。
package main
import ( "fmt" "sync")
func main() { wg := sync.WaitGroup{} for range 10 { // wg.Add 计数器+1,代表添加一个执行任务 wg.Add(1) go func() { fmt.Println(1) // wg.Done() 计数器-1,代表执行完成 wg.Done() }() } // 等待 Add 的任务全部 Done wg.Wait()}2 .Mutex互斥锁
对于同一把互斥锁:同一时间只能有一个协程获得锁,当锁被持有时,其他协程尝试加锁会被阻塞,直到锁被释放。
var mu sync.Mutexvar counter int
func increment() { mu.Lock() // 加锁 defer mu.Unlock() // 函数返回时解锁 counter++}除了 sync.Mutex 还有一种另一种锁 sync.RWmutex 也就是读写锁,因为在只有并发读,没有并发写的时候,再加锁并没有什么用,因为并发读始终是安全的,因此读写锁就是针对于互斥锁的一个优化,在读多写少的场景下我们可以选择读写锁来替代互斥锁。
3.sync.Once
程序运行时,懒加载初始化常用(其实平时也不咋用这个),保证了整个生命周期,一个 sync.Once 对象仅会执行一次 Do 方法,多余的都会跳过。
比如:
package main
import ( "fmt" "sync")
func main() { init := sync.Once{} num := 0 for range 10 { init.Do(func() { num ++ }) } fmt.Println(num)}结果为 1,无需多言。
4.sync.Map
众所不周知,我们 go 中原生的 map 并不是并发安全的,在并发读写的情况下会 panic,导致程序崩溃,于是 sync 标准库推出了 sync.Map ,这是一个并发安全的 map,严格意义来说,我们直接在原生 map 的基础上加互斥锁或者读写锁也能够解决问题,但是 sync.Map 是做了一定基础的优化的,go 1.24 之前的版本和之后的版本有不同的实现方式,感兴趣可以去了解一下。总之,有了它,我们就不需要为 map 维护一个互斥锁了。
5. sync.Cond
它可以实现一种信号通知的功能
package main
import ( "sync" "time")
func main() { l := sync.Mutex{} c := sync.NewCond(&l)
go func() { // 调用 wait 前需要先加锁 c.L.Lock() c.Wait() println("Hello, world!") // 调用 wait 后需要解锁 c.L.Unlock() }()
time.Sleep(time.Second) println("唤醒") // Signal 方法唤醒等待的 goroutine c.Signal()
time.Sleep(time.Second) println("end")}剩下的不常用。
参考链接:https://draven.co/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/
Context 上下文
context 可以在函数传播链中用于存储一些 kv 键值对信息用于下游获取,也可以在并发控制中发挥作用,比如我们可以为这个函数调用设计一个超时时间,我们的 context 就可以通过这个超时时间来取消这个函数调用链,除了超时控制,我们还可以手动地去 cancel 这个上下文,取消这次调用,但是需要注意的是,你即便取消了这个 context,已经执行的代码并不能撤回,谨慎设置超时时间。 例子:
package main
import ( "context" "fmt" "time")
func doSomething(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("doSomething stopped:", ctx.Err()) return // 退出 goroutine default: fmt.Println("hello") time.Sleep(time.Second) } }}
func main() { ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
doSomething(ctx)
}详细讲解 https://draven.co/golang/docs/part3-runtime/ch06-concurrency/golang-context/