1608 字
8 分钟
一些并发的模型
1. 生产者-消费者模型
这是最基础、最常用、也是最实用的模型。
- 有两个或多个生产者线程,它们生产数据(或任务)并放入一个共享的缓冲区。
- 有一个或多个消费者线程,它们从缓冲区中取出数据(或任务)并进行处理。
- 生产者和消费者之间通过这个缓冲区进行解耦。
核心挑战:
- 缓冲区为空:消费者不能从空缓冲区取数据。
- 缓冲区已满:生产者不能向已满的缓冲区放数据。
- 竞态条件:对缓冲区的访问必须是互斥的,防止多个线程同时修改导致数据不一致。
package main
import ( "fmt" "time")
type Task struct { Runnable func(workerId int)}
func main() { // 一个负责任务分发的管道 ch := make(chan Task, 10)
// 启动几个 worker 负责处理任务 for id := range 10 { go func(workerId int) { for t := range ch { t.Runnable(workerId) } }(id) }
// 任务分发 for i := range 20 { j := i t1 := Task{ Runnable: func(workerId int) { fmt.Printf("workerId%v:task%v做一件事情\n", workerId, j) }, } ch <- t1 } time.Sleep(1 * time.Second) close(ch)}现实应用:
- 消息队列(如Kafka, RabbitMQ)。
- 线程池任务调度。
- Golang中的Channel,Java中的
BlockingQueue其底层思想就是生产者-消费者模型。
2. 读者-写者问题
核心思想:在读多写少的问题中,对读写锁的使用,写锁类似于互斥锁,读锁类似于共享锁(能开多个)。
读者只读取共享数据,不修改,写者会修改共享数据。允许多个读者同时读,但写者必须独占访问(即写时不能有读者或其他写者)。
核心挑战:
- 在保证数据一致性的前提下,最大限度地提高并发性(特别是对读者)。
- 可能导致写者饥饿:如果读者源源不断,写者可能永远无法获得访问权。
package main
import ( "fmt" "sync" "time")
func main() { var ( data int rwMutex sync.RWMutex // 读写锁 wg sync.WaitGroup )
// 写者 wg.Add(1) go func() { defer wg.Done() for i := 0; i < 5; i++ { rwMutex.Lock() // 写锁 data = i fmt.Printf("写入: %d\n", i) time.Sleep(time.Millisecond * 200) rwMutex.Unlock() time.Sleep(time.Millisecond * 100) } }()
// 读者(多个) for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { rwMutex.RLock() // 读锁 fmt.Printf("读者%d 读取: %d\n", id, data) time.Sleep(time.Millisecond * 50) rwMutex.RUnlock() time.Sleep(time.Millisecond * 100) } }(i) }
wg.Wait()}现实应用:
- 数据库的并发控制。
- 文件系统、缓存的访问。
3. 哲学家就餐问题
核心思想:避免多个进程(或线程)竞争有限资源时的死锁问题,通过特定的策略(破坏循环)或限制并发度(筷子数量)等方法,解决死锁问题。
- 五个哲学家围坐在一张圆桌旁,他们的一生只在做两件事:思考和吃饭。
- 桌上有五根筷子(不是五双),每两个哲学家之间有一根。
- 哲学家吃饭时需要同时拿起他左边和右边的筷子。
- 如果筷子在别人手里,他就必须等待。
核心挑战:
死锁:如果所有哲学家同时感到饥饿,并同时拿起自己左边的筷子,那么所有人都会永远等待右边的筷子被释放,导致系统僵住。
package main
import ( "fmt" "sync" "time")
func main() { // 5根筷子(互斥锁) chopsticks := make([]sync.Mutex, 5) var wg sync.WaitGroup
// 哲学家函数 philosopher := func(id int) { defer wg.Done()
left := id right := (id + 1) % 5
// 避免死锁:编号为偶数的哲学家先拿左边,奇数先拿右边 for i := 0; i < 3; i++ { // 吃3次 if id%2 == 0 { chopsticks[left].Lock() chopsticks[right].Lock() } else { chopsticks[right].Lock() chopsticks[left].Lock() }
// 拿到两根筷子,开始吃饭 fmt.Printf("哲学家%d 正在吃饭...\n", id) time.Sleep(time.Millisecond * 100)
// 放下筷子 chopsticks[left].Unlock() chopsticks[right].Unlock()
// 思考 fmt.Printf("哲学家%d 正在思考...\n", id) time.Sleep(time.Millisecond * 100) } }
// 启动5个哲学家 for i := 0; i < 5; i++ { wg.Add(1) go philosopher(i) }
wg.Wait()}现实应用:
任何需要获取多个互斥资源才能工作的场景,比如进程需要同时锁定多个数据库记录进行更新。
4. 理发师问题
核心思想:模拟有容量限制的服务系统,通过空结构体(似乎叫信号量)来实现请求者与服务者的同步,并实现超容量时的拒绝和服务者的空闲/工作状态转换。
- 一个理发店有一个理发师(服务线程)、一把理发椅(临界资源)和N把供顾客等待的椅子(等待队列)。
- 如果没有顾客,理发师就在理发椅上睡觉。
- 顾客到来时,如果理发师在睡觉,就叫醒他理发。
- 如果理发师在忙,顾客就坐在等待椅上等待。
- 如果等待椅也坐满了,顾客就会离开。
核心挑战:
- 协调理发师(服务者)和顾客(请求者)的睡眠与唤醒。
- 管理有限的等待队列。
package main
import ( "fmt" "math/rand" "sync" "time")
func main() { // 理发店参数 const ( waitingChairs = 3 // 等待椅数量 totalCustomers = 10 // 总顾客数 )
var ( waitingRoom = make(chan struct{}, waitingChairs) // 等待室(缓冲channel) wg sync.WaitGroup barberReady = make(chan struct{}, 1) // 理发师就绪信号 )
// 理发师 wg.Add(1) go func() { defer wg.Done() for { select { case <-waitingRoom: // 有顾客,开始理发 barberReady <- struct{}{} // 理发师准备就绪 fmt.Println("理发师正在理发...") time.Sleep(time.Millisecond * 200) // 理发时间 fmt.Println("理发完成!") default: // 没有顾客,睡觉 fmt.Println("理发师在睡觉...") time.Sleep(time.Millisecond * 500) } } }()
// 顾客 for i := 0; i < totalCustomers; i++ { wg.Add(1) go func(id int) { defer wg.Done() time.Sleep(time.Millisecond * time.Duration(rand.Intn(300)))
select { case waitingRoom <- struct{}{}: // 成功进入等待室 fmt.Printf("顾客%d 在等待理发\n", id) <-barberReady // 等待理发师就绪 fmt.Printf("顾客%d 正在理发\n", id) default: // 等待室满了,离开 fmt.Printf("顾客%d 看到等待室满了,离开了\n", id) } }(i) }
// 等待所有顾客完成 time.Sleep(time.Second * 3) fmt.Println("营业结束")}现实应用:
线程池或服务器模型。例如,一个Web服务器有固定数量的工作线程(理发师)和一个最大连接队列(等待椅)。当新请求到达时,如果有空闲线程则立即处理,否则进入队列等待,如果队列也满了,则返回“服务器繁忙”错误。