sync Package Internals
Пакет sync предоставляет низкоуровневые примитивы синхронизации. Под капотом — это комбинация atomic операций, futex'ов и тесная интеграция с runtime.
sync.Mutex
Структура
go
// sync/mutex.go
type Mutex struct {
state int32 // Состояние мьютекса (locked, woken, starving, waiters count)
sema uint32 // Семафор для блокировки горутин
}
// Биты в state:
const (
mutexLocked = 1 << iota // bit 0: мьютекс захвачен
mutexWoken // bit 1: есть проснувшийся waiter
mutexStarving // bit 2: режим starvation
mutexWaiterShift = iota // bits 3+: количество ожидающих
)State Encoding
┌─────────────────────────────────────────────────────────────────────────────┐
│ Mutex State (int32) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────┬──────────┬────────┬────────┐ │
│ │ Waiters Count │ Starving │ Woken │ Locked │ │
│ │ (29 bits) │ (1 bit) │(1 bit) │(1 bit) │ │
│ └───────────────────────────┴──────────┴────────┴────────┘ │
│ MSB LSB │
│ │
│ Примеры: │
│ • 0b00000001 = locked, no waiters │
│ • 0b00001001 = locked, 1 waiter (8 = 1 << 3) │
│ • 0b00000101 = locked, starving mode │
│ • 0b00000010 = unlocked, woken (someone spinning) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Normal Mode vs Starvation Mode
┌─────────────────────────────────────────────────────────────────────────────┐
│ Mutex Modes (Go 1.9+) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ NORMAL MODE STARVATION MODE │
│ ════════════ ═══════════════ │
│ │
│ • Новые горутины спинятся • FIFO строго │
│ • Конкурируют с waiters • Новые горутины сразу в очередь │
│ • Быстрее для low contention • Гарантирует fairness │
│ │
│ Переход в starvation: Возврат в normal: │
│ waiter ждёт > 1ms • Waiter получил lock И │
│ • Это последний waiter ИЛИ │
│ • Ждал < 1ms │
│ │
│ ┌──────────┐ spin ┌──────────┐ ┌──────────┐ FIFO ┌──────────┐ │
│ │ New │─────────▶│ Compete │ │ New │────────▶│ Queue │ │
│ │ Goroutine│ │ (fair) │ │ Goroutine│ │ (FIFO) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Mutex State Visualizer
Визуализация состояний Mutex: Normal Mode vs Starvation Mode
Открыть
Lock алгоритм
go
func (m *Mutex) Lock() {
// Fast path: захватить unlocked mutex
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Spin в normal mode если mutex locked но не starving
if old&(mutexLocked|mutexStarving) == mutexLocked &&
runtime_canSpin(iter) {
// Попробовать установить woken bit
if !awoke && old&mutexWoken == 0 &&
old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Не ставить locked в starvation mode (сразу в очередь)
if old&mutexStarving == 0 {
new |= mutexLocked
}
// Увеличить счётчик waiters
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// Переключить в starvation mode если ждём долго
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// Сбросить woken если мы были разбужены
if awoke {
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // Захватили lock!
}
// Ждать на семафоре
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// Проверить starvation
starving = starving ||
runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// В starvation mode — мы владеем мьютексом
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving // Выйти из starvation
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}sync.RWMutex
Структура
go
// sync/rwmutex.go
type RWMutex struct {
w Mutex // Для exclusive lock между writers
writerSem uint32 // Семафор для writers
readerSem uint32 // Семафор для readers
readerCount atomic.Int32 // Количество readers (отрицательное = pending writer)
readerWait atomic.Int32 // Readers которых ждёт writer
}
const rwmutexMaxReaders = 1 << 30 // Максимум readersWriter Priority
┌─────────────────────────────────────────────────────────────────────────────┐
│ RWMutex Writer Priority │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ readerCount: │
│ • Положительное: количество активных readers │
│ • Отрицательное: есть pending writer (-rwmutexMaxReaders + readers) │
│ │
│ Сценарий: 3 readers, затем приходит writer │
│ │
│ 1. readerCount = 3 (3 активных readers) │
│ 2. Writer вызывает Lock() │
│ readerCount = 3 - rwmutexMaxReaders = -1073741821 │
│ 3. Writer ждёт завершения 3 readers (readerWait = 3) │
│ 4. Новые readers видят отрицательный readerCount → блокируются │
│ 5. Последний reader уменьшает readerWait до 0 → будит writer │
│ 6. Writer работает │
│ 7. Writer вызывает Unlock() │
│ readerCount = -1073741821 + rwmutexMaxReaders = 3 blocked readers │
│ 8. Blocked readers просыпаются │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Magic Number -1073741821
go
// Откуда берётся это число?
const rwmutexMaxReaders = 1 << 30 // 1073741824
// Когда writer хочет захватить lock:
// readerCount.Add(-rwmutexMaxReaders)
// Если было 3 читателя: 3 - 1073741824 = -1073741821
// Почему именно 1 << 30?
// 1. Достаточно большое, чтобы вместить любое реальное количество readers
// 2. Оставляет место для overflow detection (int32 max = 2^31 - 1)
// 3. При сложении/вычитании не вызывает overflow
// Проверка на overflow в RLock:
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
// readerCount отрицательный → есть pending writer
// Заблокироваться на readerSem
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}RLock/RUnlock
go
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
// Writer ждёт или держит lock
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}
func (rw *RWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
// Writer ждёт
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if rw.readerWait.Add(-1) == 0 {
// Последний reader — разбудить writer
runtime_Semrelease(&rw.writerSem, false, 1)
}
}sync.WaitGroup
WaitGroup Simulator
Симуляция работы sync.WaitGroup с несколькими workers
Открыть
Структура
go
// sync/waitgroup.go
type WaitGroup struct {
noCopy noCopy // go vet проверка на копирование
state atomic.Uint64 // high 32 bits: counter, low 32 bits: waiters
sema uint32 // семафор
}State Encoding
┌─────────────────────────────────────────────────────────────────────────────┐
│ WaitGroup State (uint64) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────┬───────────────────────────────┐ │
│ │ Counter (32 bits) │ Waiters (32 bits) │ │
│ │ (Add/Done) │ (Wait calls) │ │
│ └───────────────────────────────┴───────────────────────────────┘ │
│ MSB LSB │
│ │
│ Пример: Add(3), потом 2 Wait() │
│ state = 0x0000000300000002 (counter=3, waiters=2) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Add/Done/Wait
go
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32) // counter
w := uint32(state) // waiters
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if v == 0 && w != 0 {
// Counter стал 0, есть waiters → разбудить всех
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
if v == 0 {
return // Counter уже 0
}
// Увеличить waiters
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait returned")
}
return
}
}
}sync.Once
Структура
go
// sync/once.go
type Once struct {
done atomic.Uint32 // 1 если уже выполнено
m Mutex // для slow path
}Double-Checked Locking
go
func (o *Once) Do(f func()) {
// Fast path: уже выполнено
if o.done.Load() == 1 {
return
}
o.doSlow(f)
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
// Double-check под lock
if o.done.Load() == 0 {
defer o.done.Store(1)
f()
}
}Почему defer для done.Store?
go
// Если f() паникует, done НЕ устанавливается
// Следующий Do() снова попытается выполнить f()
var once sync.Once
once.Do(func() {
panic("oops") // done остаётся 0
})
once.Do(func() {
// Будет вызван снова!
})sync.Cond
Структура
go
// sync/cond.go
type Cond struct {
noCopy noCopy
L Locker // Внешний lock (обычно *Mutex или *RWMutex)
notify notifyList // Список ожидающих
checker copyChecker // Проверка на копирование
}
// runtime/sema.go
type notifyList struct {
wait atomic.Uint32 // Ticket для следующего waiter
notify uint32 // Ticket следующего для уведомления
lock mutex // Защита очереди
head *sudog // Голова очереди
tail *sudog // Хвост очереди
}Wait/Signal/Broadcast
go
func (c *Cond) Wait() {
// Получить ticket
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// Заблокироваться до Signal/Broadcast
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
func (c *Cond) Signal() {
// Разбудить одного waiter
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
// Разбудить всех waiters
runtime_notifyListNotifyAll(&c.notify)
}Паттерн использования
go
var (
mu sync.Mutex
cond = sync.NewCond(&mu)
ready bool
)
// Waiter
mu.Lock()
for !ready { // Цикл обязателен!
cond.Wait()
}
// ... do work ...
mu.Unlock()
// Signaler
mu.Lock()
ready = true
mu.Unlock()
cond.Signal() // или Broadcast()Цикл обязателен
Wait() может вернуться спонтанно (spurious wakeup). Всегда проверяйте условие в цикле.
Spurious Wakeup: подробнее
Почему Wait() может вернуться без Signal()?
go
// 1. OS-level spurious wakeup
// Некоторые OS могут разбудить горутину без явного сигнала
// Это особенность futex/semaphore реализации
// 2. Race condition при broadcast
// Между проверкой условия и засыпанием может произойти Signal()
// и условие может измениться обратно
// 3. Множественные waiters
// При Broadcast() все просыпаются, но только один получит ресурс
// Остальные должны снова уснутьПример проблемы без цикла:
go
// ❌ НЕПРАВИЛЬНО: без цикла
mu.Lock()
if !ready { // if вместо for
cond.Wait()
}
// ready может быть false здесь из-за spurious wakeup!
doWork() // BUG: работаем с неготовыми данными
mu.Unlock()
// ✅ ПРАВИЛЬНО: с циклом
mu.Lock()
for !ready { // for вместо if
cond.Wait()
}
// ready гарантированно true
doWork()
mu.Unlock()Практический пример с очередью:
go
type Queue struct {
mu sync.Mutex
cond *sync.Cond
items []int
}
func NewQueue() *Queue {
q := &Queue{}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Get() int {
q.mu.Lock()
defer q.mu.Unlock()
// ВАЖНО: for, не if!
// Если два consumer'а проснулись от одного Signal(),
// только первый получит элемент, второй должен ждать дальше
for len(q.items) == 0 {
q.cond.Wait()
}
item := q.items[0]
q.items = q.items[1:]
return item
}
func (q *Queue) Put(item int) {
q.mu.Lock()
q.items = append(q.items, item)
q.mu.Unlock()
q.cond.Signal() // Разбудить одного waiter
}sync.Pool
Структура
go
// sync/pool.go
type Pool struct {
noCopy noCopy
local unsafe.Pointer // [P]poolLocal, per-P pools
localSize uintptr
victim unsafe.Pointer // victim cache от предыдущего GC
victimSize uintptr
New func() any // Функция создания нового объекта
}
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte // False sharing prevention
}
type poolLocalInternal struct {
private any // Приватный объект для P (быстрый доступ)
shared poolChain // Lock-free очередь для sharing
}Per-P Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ sync.Pool Architecture │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Pool │ │
│ │ local: [P0, P1, P2, P3] victim: [prev GC pools] │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ poolLocal │ │ poolLocal │ │ poolLocal │ │ poolLocal │ │
│ │ P0 │ │ P1 │ │ P2 │ │ P3 │ │
│ ├────────────┤ ├────────────┤ ├────────────┤ ├────────────┤ │
│ │ private:obj│ │ private:nil│ │ private:obj│ │ private:nil│ │
│ ├────────────┤ ├────────────┤ ├────────────┤ ├────────────┤ │
│ │ shared: │ │ shared: │ │ shared: │ │ shared: │ │
│ │ [obj,obj] │ │ [obj] │ │ [] │ │ [obj,obj] │ │
│ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │
│ │
│ Get() algorithm: │
│ 1. Попробовать взять private своего P (lock-free) │
│ 2. Попробовать взять из shared своего P │
│ 3. Попробовать украсть из shared других P │
│ 4. Попробовать взять из victim cache │
│ 5. Вызвать New() │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Get/Put
go
func (p *Pool) Get() any {
l, pid := p.pin() // Привязаться к P, отключить preemption
// 1. Попробовать private
x := l.private
l.private = nil
if x == nil {
// 2. Попробовать shared своего P
x, _ = l.shared.popHead()
if x == nil {
// 3. Попробовать другие P или victim
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if x == nil && p.New != nil {
x = p.New()
}
return x
}
func (p *Pool) Put(x any) {
if x == nil {
return
}
l, _ := p.pin()
if l.private == nil {
l.private = x // Положить в private (самый быстрый путь)
} else {
l.shared.pushHead(x) // Положить в shared
}
runtime_procUnpin()
}
// getSlow вызывается когда private и shared текущего P пусты
func (p *Pool) getSlow(pid int) any {
size := runtime_LoadAcquintptr(&p.localSize)
locals := p.local
// ───────────────────────────────────────────────────────────────
// Попытка 1: украсть из shared других P
// ───────────────────────────────────────────────────────────────
for i := 0; i < int(size); i++ {
// Начинаем с pid+1, чтобы не проверять себя снова
l := indexLocal(locals, (pid+i+1)%int(size))
// popTail — lock-free операция, безопасна для stealing
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// ───────────────────────────────────────────────────────────────
// Попытка 2: взять из victim cache (объекты от прошлого GC)
// ───────────────────────────────────────────────────────────────
size = runtime_LoadAcquintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
// Сначала private, потом shared
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Victim cache исчерпан для этого P
runtime_StoreReluintptr(&p.victimSize, 0)
return nil
}Victim Cache и GC
go
// runtime вызывает poolCleanup перед каждым GC
func poolCleanup() {
// 1. Очистить victim cache (objects от предыдущего GC)
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 2. Переместить local → victim
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
}Victim Cache
Объекты живут максимум 2 GC цикла:
- Первый GC: local → victim
- Второй GC: victim очищается
Это предотвращает накопление мусора, но даёт шанс переиспользовать объекты.
sync.Map
Структура
go
// sync/map.go
type Map struct {
mu Mutex
read atomic.Pointer[readOnly] // read-only копия (lock-free доступ)
dirty map[any]*entry // dirty копия (под lock)
misses int // промахи по read
}
type readOnly struct {
m map[any]*entry
amended bool // true если dirty содержит ключи, которых нет в read
}
type entry struct {
p atomic.Pointer[any] // nil, expunged, или указатель на значение
}
// Специальное значение для удалённых из read, но ещё в dirty
var expunged = new(any)
// ┌─────────────────────────────────────────────────────────────────────────┐
// │ Жизненный цикл entry.p │
// ├─────────────────────────────────────────────────────────────────────────┤
// │ │
// │ nil → Значение удалено, ключ ещё в dirty │
// │ expunged → Значение удалено, ключа НЕТ в dirty │
// │ &value → Актуальное значение │
// │ │
// │ Зачем нужен expunged? │
// │ ───────────────────── │
// │ Когда dirty = nil и мы хотим создать новый dirty: │
// │ 1. Копируем все entry из read │
// │ 2. Но НЕ копируем nil entry (они удалены) │
// │ 3. Помечаем их как expunged, чтобы: │
// │ - Load знал: ключа точно нет, не смотри в dirty │
// │ - Store знал: нужно добавить ключ в dirty заново │
// │ │
// │ Пример: │
// │ 1. Store("a", 1) → read["a"]=&1, dirty["a"]=&1 │
// │ 2. Delete("a") → read["a"]=nil, dirty["a"]=nil │
// │ 3. miss++ >= len(dirty) → dirty promote │
// │ 4. Store("b", 2) → dirty создаётся заново │
// │ read["a"] меняется nil→expunged (не копируется в dirty) │
// │ 5. Store("a", 3) → видит expunged → добавляет в dirty │
// │ │
// └─────────────────────────────────────────────────────────────────────────┘Read/Dirty Promotion
┌─────────────────────────────────────────────────────────────────────────────┐
│ sync.Map Architecture │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ LOAD (Read-heavy path) │
│ ═══════════════════════ │
│ 1. Прочитать read atomically │
│ 2. Если ключ есть → вернуть (без lock!) │
│ 3. Если amended → взять lock, проверить dirty │
│ │
│ STORE │
│ ═════ │
│ 1. Если ключ есть в read → CAS значение (без lock!) │
│ 2. Иначе → взять lock, добавить в dirty │
│ │
│ PROMOTION (dirty → read) │
│ ═══════════════════════ │
│ Когда misses >= len(dirty): │
│ 1. read = dirty │
│ 2. dirty = nil │
│ 3. misses = 0 │
│ │
│ ┌────────────────┐ ┌────────────────┐ │
│ │ read │ │ dirty │ │
│ │ (atomic, fast) │ │ (mutex, slow) │ │
│ ├────────────────┤ ├────────────────┤ │
│ │ "key1": val1 │ │ "key1": val1 │ │
│ │ "key2": val2 │ ──promotion──▶ │ "key2": val2 │ │
│ │ │ │ "key3": val3 │ ← новые ключи │
│ └────────────────┘ └────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Load/Store
go
func (m *Map) Load(key any) (value any, ok bool) {
read := m.read.Load()
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// Double-check
read = m.read.Load()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked() // Инкремент misses, возможно promote
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
func (m *Map) Store(key, value any) {
read := m.read.Load()
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return // Обновили существующий ключ без lock
}
m.mu.Lock()
read = m.read.Load()
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
m.dirtyLocked() // Копировать read → dirty
m.read.Store(&readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}Когда использовать sync.Map
| Сценарий | sync.Map | map + Mutex |
|---|---|---|
| Read-heavy (99% reads) | ✅ Лучше | Ок |
| Write-heavy | ❌ Хуже | ✅ Лучше |
| Disjoint keys (разные горутины пишут в разные ключи) | ✅ Лучше | Ок |
| Частые Range() | ❌ Копирует всё | ✅ Быстрее |
| Известный набор ключей | ❌ Overhead | ✅ Проще |
Производительность примитивов
go
// Benchmark результаты (примерные)
// BenchmarkMutexLock-8 50000000 25 ns/op
// BenchmarkRWMutexRLock-8 100000000 10 ns/op
// BenchmarkRWMutexLock-8 30000000 45 ns/op
// BenchmarkWaitGroupAdd-8 100000000 15 ns/op
// BenchmarkOnce-8 2000000000 0.3 ns/op (после первого)
// BenchmarkPoolGet-8 50000000 30 ns/op
// BenchmarkMapLoad-8 100000000 15 ns/opРекомендации
- Mutex vs RWMutex: RWMutex выгоден при >10x reads vs writes
- sync.Pool: использовать для hot path аллокаций (buffers, temp objects)
- sync.Map: только для read-heavy + disjoint writes
- sync.Once: идиоматично для lazy initialization
- sync.Cond: редко нужен, обычно channels лучше