Concurrency Patterns
Production-ready паттерны конкурентного программирования в Go. Каждый паттерн проверен в реальных системах и решает конкретную проблему.
Pipelines
Pipeline — цепочка стадий обработки, где каждая стадия получает данные из входного канала и отправляет в выходной.
Pipeline Visualizer
Визуализация Pipeline и Fan-Out паттернов
Открыть
Базовый Pipeline
go
// Generator — источник данных
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Square — стадия обработки
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// Использование
func main() {
// Построение pipeline
c := generator(2, 3, 4)
out := square(square(c))
for n := range out {
fmt.Println(n) // 16, 81, 256
}
}Pipeline с Context
go
func generator(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case n, ok := <-in:
if !ok {
return
}
select {
case out <- n * n:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}Fan-Out / Fan-In
Fan-Out: распределение работы
go
// Запустить несколько workers на одном input channel
func fanOut(ctx context.Context, in <-chan Job, workers int) []<-chan Result {
outs := make([]<-chan Result, workers)
for i := 0; i < workers; i++ {
outs[i] = worker(ctx, in)
}
return outs
}
func worker(ctx context.Context, in <-chan Job) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for job := range in {
select {
case out <- process(job):
case <-ctx.Done():
return
}
}
}()
return out
}Fan-In: объединение результатов
go
func fanIn(ctx context.Context, channels ...<-chan Result) <-chan Result {
var wg sync.WaitGroup
out := make(chan Result)
// Для каждого входного канала запустить горутину
output := func(c <-chan Result) {
defer wg.Done()
for result := range c {
select {
case out <- result:
case <-ctx.Done():
return
}
}
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// Закрыть out когда все горутины завершены
go func() {
wg.Wait()
close(out)
}()
return out
}Полный пример Fan-Out/Fan-In
go
func processJobs(ctx context.Context, jobs []Job, workers int) []Result {
// Input channel
in := make(chan Job)
go func() {
defer close(in)
for _, job := range jobs {
select {
case in <- job:
case <-ctx.Done():
return
}
}
}()
// Fan-out
outs := fanOut(ctx, in, workers)
// Fan-in
results := fanIn(ctx, outs...)
// Собрать результаты
var out []Result
for result := range results {
out = append(out, result)
}
return out
}Deadlock Prevention
Deadlock — ситуация когда горутины взаимно блокируют друг друга. Понимание классических deadlock сценариев критично для их предотвращения.
Deadlock Scenarios
Демонстрация различных сценариев deadlock
Открыть
Правила предотвращения
- Lock ordering — всегда захватывать мьютексы в одинаковом глобальном порядке
- Avoid nested locks — минимизировать количество одновременно удерживаемых locks
- Use channels — предпочитать channels вместо shared state где возможно
- Timeouts — использовать select с timeout для операций с каналами
- Context cancellation — всегда проверять ctx.Done() в долгих операциях
Error Handling
errgroup
go
import "golang.org/x/sync/errgroup"
func fetchAll(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url // Capture
g.Go(func() error {
body, err := fetch(ctx, url)
if err != nil {
return err
}
results[i] = body
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err // Первая ошибка, остальные отменены через ctx
}
return results, nil
}errgroup с лимитом параллелизма
go
func fetchAllLimited(ctx context.Context, urls []string, limit int) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(limit) // Максимум limit горутин одновременно
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
body, err := fetch(ctx, url)
if err != nil {
return err
}
results[i] = body
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}Retry with backoff
go
func retry(ctx context.Context, maxAttempts int, fn func() error) error {
var err error
for attempt := 0; attempt < maxAttempts; attempt++ {
err = fn()
if err == nil {
return nil
}
// Exponential backoff
backoff := time.Duration(1<<attempt) * 100 * time.Millisecond
if backoff > 10*time.Second {
backoff = 10 * time.Second
}
select {
case <-time.After(backoff):
continue
case <-ctx.Done():
return ctx.Err()
}
}
return fmt.Errorf("after %d attempts: %w", maxAttempts, err)
}
// Использование
err := retry(ctx, 5, func() error {
return sendRequest()
})Circuit Breaker
go
type CircuitBreaker struct {
mu sync.Mutex
failures int
threshold int
state string // "closed", "open", "half-open"
openUntil time.Time
cooldown time.Duration
}
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mu.Lock()
if cb.state == "open" {
if time.Now().Before(cb.openUntil) {
cb.mu.Unlock()
return errors.New("circuit breaker is open")
}
cb.state = "half-open"
}
cb.mu.Unlock()
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
if cb.failures >= cb.threshold {
cb.state = "open"
cb.openUntil = time.Now().Add(cb.cooldown)
}
return err
}
cb.failures = 0
cb.state = "closed"
return nil
}Rate Limiting
Token Bucket
go
import "golang.org/x/time/rate"
// Limiter: 10 requests/sec, burst 5
limiter := rate.NewLimiter(10, 5)
func handleRequest(ctx context.Context) error {
// Блокирующее ожидание токена
if err := limiter.Wait(ctx); err != nil {
return err
}
// Обработка запроса
return nil
}
// Non-blocking вариант
func tryHandle() bool {
if !limiter.Allow() {
return false // Rate limited
}
// Обработка
return true
}
// Зарезервировать на будущее
func reserveHandle() {
r := limiter.Reserve()
if !r.OK() {
// Не хватает burst capacity
return
}
time.Sleep(r.Delay()) // Подождать до своего времени
// Обработка
}Per-client rate limiting
go
type ClientLimiter struct {
mu sync.Mutex
limiters map[string]*rate.Limiter
limit rate.Limit
burst int
}
func (cl *ClientLimiter) GetLimiter(clientID string) *rate.Limiter {
cl.mu.Lock()
defer cl.mu.Unlock()
limiter, exists := cl.limiters[clientID]
if !exists {
limiter = rate.NewLimiter(cl.limit, cl.burst)
cl.limiters[clientID] = limiter
}
return limiter
}
func (cl *ClientLimiter) Allow(clientID string) bool {
return cl.GetLimiter(clientID).Allow()
}Cancellation Patterns
Or-Channel: первый сигнал отменяет всех
go
func or(channels ...<-chan struct{}) <-chan struct{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan struct{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}
// Использование
done := or(
signalAfter(1*time.Second),
signalAfter(2*time.Second),
ctx.Done(),
)
<-done // Закроется через 1 секундуGraceful Shutdown
go
func main() {
ctx, cancel := context.WithCancel(context.Background())
// Ловим сигналы
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Запускаем сервер
server := &http.Server{Addr: ":8080"}
go func() {
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}()
// Запускаем workers
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
worker(ctx)
}()
}
// Ждём сигнал
<-sigCh
log.Println("Shutting down...")
// Фаза 1: остановить приём новых запросов
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
server.Shutdown(shutdownCtx)
// Фаза 2: отменить workers
cancel()
// Фаза 3: дождаться завершения workers
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
log.Println("Graceful shutdown complete")
case <-time.After(10 * time.Second):
log.Println("Forced shutdown")
}
}Resource Management
Semaphore
go
import "golang.org/x/sync/semaphore"
// Ограничить до 100 concurrent операций
sem := semaphore.NewWeighted(100)
func doWork(ctx context.Context) error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
// Работа
return nil
}
// Для тяжёлых операций можно захватывать больше
func heavyWork(ctx context.Context) error {
if err := sem.Acquire(ctx, 10); err != nil {
return err
}
defer sem.Release(10)
// ...
}Bounded Parallelism
go
func processItems(ctx context.Context, items []Item, maxParallel int) error {
sem := make(chan struct{}, maxParallel)
errCh := make(chan error, 1)
var wg sync.WaitGroup
for _, item := range items {
select {
case sem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
return err
}
wg.Add(1)
go func(item Item) {
defer wg.Done()
defer func() { <-sem }()
if err := process(item); err != nil {
select {
case errCh <- err:
default:
}
}
}(item)
}
wg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
}Worker Pool
go
type Pool struct {
jobs chan Job
results chan Result
workers int
wg sync.WaitGroup
}
func NewPool(workers int) *Pool {
p := &Pool{
jobs: make(chan Job, workers*2),
results: make(chan Result, workers*2),
workers: workers,
}
p.wg.Add(workers)
for i := 0; i < workers; i++ {
go p.worker()
}
return p
}
func (p *Pool) worker() {
defer p.wg.Done()
for job := range p.jobs {
p.results <- process(job)
}
}
func (p *Pool) Submit(job Job) {
p.jobs <- job
}
func (p *Pool) Results() <-chan Result {
return p.results
}
func (p *Pool) Shutdown() {
close(p.jobs)
p.wg.Wait()
close(p.results)
}Pub/Sub
Simple Broker
go
type Broker struct {
mu sync.RWMutex
subscribers map[string][]chan Event
}
func NewBroker() *Broker {
return &Broker{
subscribers: make(map[string][]chan Event),
}
}
func (b *Broker) Subscribe(topic string) <-chan Event {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan Event, 10)
b.subscribers[topic] = append(b.subscribers[topic], ch)
return ch
}
func (b *Broker) Publish(topic string, event Event) {
b.mu.RLock()
defer b.mu.RUnlock()
for _, ch := range b.subscribers[topic] {
select {
case ch <- event:
default:
// Drop если subscriber медленный
}
}
}
func (b *Broker) Unsubscribe(topic string, ch <-chan Event) {
b.mu.Lock()
defer b.mu.Unlock()
subs := b.subscribers[topic]
for i, sub := range subs {
if sub == ch {
b.subscribers[topic] = append(subs[:i], subs[i+1:]...)
close(sub)
break
}
}
}Best Practices Summary
Do
go
// ✅ Всегда используйте context для cancellation
func doWork(ctx context.Context) error
// ✅ Закрывайте channels только на стороне producer
func producer() <-chan T {
ch := make(chan T)
go func() {
defer close(ch)
// ...
}()
return ch
}
// ✅ Используйте select с default для non-blocking
select {
case msg := <-ch:
handle(msg)
default:
// Не блокируемся
}
// ✅ Используйте buffered channels для известного количества
results := make(chan Result, len(items))Don't
go
// ❌ Не закрывайте channel на стороне receiver
func consumer(ch chan T) {
close(ch) // Плохо! Producer может паниковать
}
// ❌ Не используйте goroutine leak patterns
func leak() {
ch := make(chan int)
go func() {
ch <- 1 // Заблокирован навсегда!
}()
// Нет receiver
}
// ❌ Не игнорируйте errors из goroutines
go func() {
err := doSomething()
// err потерян!
}()
// ❌ Не используйте shared state без синхронизации
var counter int
go func() { counter++ }() // DATA RACE
go func() { counter++ }()