前言
Go的goroutine和channel解决了大部分并发问题,但有些场景下,sync包提供的原语更简洁高效。比如保护共享变量、等待一组goroutine完成、确保初始化只执行一次等。
本文整理sync包中常用类型的使用方法和注意事项,配合实际代码示例。
1. Mutex:互斥锁
最基础的锁,同一时刻只有一个goroutine能持有。
1.1 基本用法
packagemainimport("fmt""sync")typeCounterstruct{mu sync.Mutex valueint}func(c*Counter)Inc(){c.mu.Lock()deferc.mu.Unlock()c.value++}func(c*Counter)Value()int{c.mu.Lock()deferc.mu.Unlock()returnc.value}funcmain(){varwg sync.WaitGroup counter:=&Counter{}fori:=0;i<1000;i++{wg.Add(1)gofunc(){deferwg.Done()counter.Inc()}()}wg.Wait()fmt.Println("Final value:",counter.Value())// 1000}1.2 常见错误
错误1:忘记Unlock
func(c*Counter)Inc(){c.mu.Lock()c.value++// 忘记 Unlock,其他goroutine会永远阻塞}用defer可以避免这个问题,即使函数panic也能正常解锁。
错误2:复制带锁的结构体
funcmain(){c1:=Counter{}c2:=c1// 错误:复制了mutex// c1和c2共享同一把锁的状态,行为不可预期}解决方案:传递指针,或者使用noCopy模式。
typeCounterstruct{mu sync.Mutex valueint_noCopy// go vet会检查复制}typenoCopystruct{}func(*noCopy)Lock(){}func(*noCopy)Unlock(){}错误3:重复Lock(死锁)
func(c*Counter)Double(){c.mu.Lock()deferc.mu.Unlock()c.Inc()// Inc里面也会Lock,死锁}解决方案:拆分内部方法,不加锁的版本供内部调用。
func(c*Counter)inc(){c.value++// 不加锁,仅内部使用}func(c*Counter)Inc(){c.mu.Lock()deferc.mu.Unlock()c.inc()}func(c*Counter)Double(){c.mu.Lock()deferc.mu.Unlock()c.inc()c.inc()}2. RWMutex:读写锁
读多写少的场景下,用读写锁比互斥锁性能好。
- 多个goroutine可以同时持有读锁
- 写锁是排他的,持有写锁时不能有其他读锁或写锁
2.1 基本用法
typeConfigstruct{mu sync.RWMutex datamap[string]string}func(c*Config)Get(keystring)string{c.mu.RLock()deferc.mu.RUnlock()returnc.data[key]}func(c*Config)Set(key,valuestring){c.mu.Lock()deferc.mu.Unlock()c.data[key]=value}func(c*Config)GetAll()map[string]string{c.mu.RLock()deferc.mu.RUnlock()// 返回副本,防止外部修改result:=make(map[string]string,len(c.data))fork,v:=rangec.data{result[k]=v}returnresult}2.2 性能对比
funcBenchmarkMutex(b*testing.B){varmu sync.Mutexvarvalueintb.RunParallel(func(pb*testing.PB){forpb.Next(){mu.Lock()_=value mu.Unlock()}})}funcBenchmarkRWMutex(b*testing.B){varmu sync.RWMutexvarvalueintb.RunParallel(func(pb*testing.PB){forpb.Next(){mu.RLock()_=value mu.RUnlock()}})}读多写少时,RWMutex明显更快;但如果写操作频繁,RWMutex开销反而更大。
2.3 注意事项
- 读锁内不要调用写锁方法(会死锁)
- RWMutex是写优先的,有goroutine等待写锁时,后续读锁请求会阻塞
- 不要在热点路径滥用RWMutex,如果读写比例接近,用Mutex更简单
3. WaitGroup:等待一组goroutine
3.1 基本用法
funcmain(){varwg sync.WaitGroup urls:=[]string{"https://example.com","https://example.org","https://example.net",}for_,url:=rangeurls{wg.Add(1)gofunc(urlstring){deferwg.Done()resp,err:=http.Get(url)iferr!=nil{fmt.Println("Error:",err)return}deferresp.Body.Close()fmt.Println(url,resp.Status)}(url)}wg.Wait()fmt.Println("All done")}3.2 常见错误
错误1:Add在goroutine内部调用
// 错误for_,url:=rangeurls{gofunc(urlstring){wg.Add(1)// 可能在Wait之后执行deferwg.Done()// ...}(url)}wg.Wait()// 可能提前返回错误2:Done调用次数不匹配
wg.Add(1)gofunc(){ifsomeCondition{return// 忘记Done}wg.Done()}()用defer可以确保一定执行。
3.3 带超时的等待
WaitGroup本身不支持超时,可以配合channel实现:
funcwaitWithTimeout(wg*sync.WaitGroup,timeout time.Duration)bool{done:=make(chanstruct{})gofunc(){wg.Wait()close(done)}()select{case<-done:returntruecase<-time.After(timeout):returnfalse}}funcmain(){varwg sync.WaitGroup wg.Add(1)gofunc(){deferwg.Done()time.Sleep(2*time.Second)}()ifwaitWithTimeout(&wg,1*time.Second){fmt.Println("Completed")}else{fmt.Println("Timeout")}}4. Once:确保只执行一次
4.1 典型场景:单例初始化
typeDatabasestruct{conn*sql.DB}var(dbInstance*Database dbOnce sync.Once)funcGetDB()*Database{dbOnce.Do(func(){conn,err:=sql.Open("mysql","dsn")iferr!=nil{panic(err)}dbInstance=&Database{conn:conn}})returndbInstance}4.2 注意事项
Once.Do只执行一次,即使panic了也不会重试
varonce sync.Oncevarconfig*ConfigfuncGetConfig()*Config{once.Do(func(){data,err:=ioutil.ReadFile("config.json")iferr!=nil{panic(err)// panic后,once.Do不会再执行}json.Unmarshal(data,&config)})returnconfig// 如果上面panic了,这里返回nil}如果需要重试,不能用sync.Once:
typeLazyConfigstruct{mu sync.Mutex config*Config}func(l*LazyConfig)Get()(*Config,error){l.mu.Lock()deferl.mu.Unlock()ifl.config!=nil{returnl.config,nil}// 可重试的初始化data,err:=ioutil.ReadFile("config.json")iferr!=nil{returnnil,err}varcfg Configiferr:=json.Unmarshal(data,&cfg);err!=nil{returnnil,err}l.config=&cfgreturnl.config,nil}5. Cond:条件变量
用于goroutine间的信号通知,比channel更底层。
5.1 生产者消费者模式
typeQueuestruct{mu sync.Mutex cond*sync.Cond items[]int}funcNewQueue()*Queue{q:=&Queue{}q.cond=sync.NewCond(&q.mu)returnq}func(q*Queue)Put(itemint){q.mu.Lock()deferq.mu.Unlock()q.items=append(q.items,item)q.cond.Signal()// 通知一个等待的goroutine}func(q*Queue)Get()int{q.mu.Lock()deferq.mu.Unlock()forlen(q.items)==0{q.cond.Wait()// 释放锁并等待}item:=q.items[0]q.items=q.items[1:]returnitem}5.2 Broadcast:通知所有等待者
typeBarrierstruct{mu sync.Mutex cond*sync.Cond countinttargetint}funcNewBarrier(nint)*Barrier{b:=&Barrier{target:n}b.cond=sync.NewCond(&b.mu)returnb}func(b*Barrier)Wait(){b.mu.Lock()deferb.mu.Unlock()b.count++ifb.count==b.target{b.cond.Broadcast()// 所有人到齐,通知全部return}forb.count<b.target{b.cond.Wait()}}实际项目中,大部分场景用channel就够了,Cond用得比较少。
6. Pool:对象复用池
减少内存分配和GC压力。
6.1 基本用法
varbufferPool=sync.Pool{New:func()interface{}{returnnew(bytes.Buffer)},}funcprocess(data[]byte)string{buf:=bufferPool.Get().(*bytes.Buffer)deferfunc(){buf.Reset()bufferPool.Put(buf)}()buf.Write(data)// 处理...returnbuf.String()}6.2 实际案例:JSON编码
varjsonEncoderPool=sync.Pool{New:func()interface{}{return&bytes.Buffer{}},}funcToJSON(vinterface{})([]byte,error){buf:=jsonEncoderPool.Get().(*bytes.Buffer)deferfunc(){buf.Reset()jsonEncoderPool.Put(buf)}()encoder:=json.NewEncoder(buf)iferr:=encoder.Encode(v);err!=nil{returnnil,err}// 返回副本,因为buf会被复用result:=make([]byte,buf.Len())copy(result,buf.Bytes())returnresult,nil}6.3 注意事项
- Pool中的对象可能随时被回收(GC时),不要存储重要状态
- Get返回的对象可能是复用的,使用前要Reset
- Pool不是缓存,不保证对象一定存在
- 要确保Put回去的对象是干净的
7. Map:并发安全的map
7.1 基本用法
varcache sync.MapfuncGet(keystring)(interface{},bool){returncache.Load(key)}funcSet(keystring,valueinterface{}){cache.Store(key,value)}funcGetOrSet(keystring,valueinterface{})interface{}{actual,_:=cache.LoadOrStore(key,value)returnactual}funcDelete(keystring){cache.Delete(key)}funcRange(){cache.Range(func(key,valueinterface{})bool{fmt.Println(key,value)returntrue// 返回false停止遍历})}7.2 适用场景
sync.Map针对以下两种场景优化:
- key只写一次但读很多次(缓存)
- 多个goroutine读写不同的key
其他场景下,用普通map+Mutex可能更好。
7.3 性能对比
// sync.MapfuncBenchmarkSyncMap(b*testing.B){varm sync.Map b.RunParallel(func(pb*testing.PB){i:=0forpb.Next(){m.Store(i,i)m.Load(i)i++}})}// map + MutexfuncBenchmarkMapMutex(b*testing.B){m:=make(map[int]int)varmu sync.Mutex b.RunParallel(func(pb*testing.PB){i:=0forpb.Next(){mu.Lock()m[i]=i mu.Unlock()mu.Lock()_=m[i]mu.Unlock()i++}})}读多写少时sync.Map更快,写多时普通map+Mutex更好。
8. 原子操作:sync/atomic
比锁更轻量,适合简单的数值操作。
8.1 基本用法
import"sync/atomic"typeCounterstruct{valueint64}func(c*Counter)Inc(){atomic.AddInt64(&c.value,1)}func(c*Counter)Dec(){atomic.AddInt64(&c.value,-1)}func(c*Counter)Value()int64{returnatomic.LoadInt64(&c.value)}func(c*Counter)Reset(){atomic.StoreInt64(&c.value,0)}8.2 CAS操作
func(c*Counter)CompareAndSwap(old,newint64)bool{returnatomic.CompareAndSwapInt64(&c.value,old,new)}// 无锁更新func(c*Counter)Update(fnfunc(int64)int64){for{old:=atomic.LoadInt64(&c.value)new:=fn(old)ifatomic.CompareAndSwapInt64(&c.value,old,new){return}}}8.3 atomic.Value:存储任意类型
varconfig atomic.ValuefuncUpdateConfig(cfg*Config){config.Store(cfg)}funcGetConfig()*Config{returnconfig.Load().(*Config)}注意:atomic.Value存储的类型必须一致,第一次Store什么类型,后续就只能Store相同类型。
总结
| 原语 | 适用场景 | 注意事项 |
|---|---|---|
| Mutex | 保护共享变量 | 用defer确保Unlock,不要复制 |
| RWMutex | 读多写少 | 写优先,读锁内不要写 |
| WaitGroup | 等待一组goroutine | Add在goroutine外调用 |
| Once | 单例初始化 | panic不会重试 |
| Cond | 条件等待 | 大部分场景用channel更好 |
| Pool | 对象复用 | 不是缓存,对象可能被回收 |
| Map | 并发安全map | 只在特定场景有优势 |
| atomic | 简单数值操作 | 比锁更轻量 |
选择建议:
- 能用channel就用channel,更符合Go的设计哲学
- 保护简单变量用Mutex,不要过度优化
- 读多写少考虑RWMutex,但要实测确认有收益
- 单例初始化用Once,简单可靠
- 热点路径的简单计数用atomic,避免锁竞争
这些并发原语各有适用场景,关键是理解其语义和限制,根据实际需求选择。