Connection pool + distributed lock = deadlock

Published:
Translations:no translations yet

Sometimes you need strong consistency guarantees and transactions can not provide them. Sure, it’s not a good smell, but in this situation you usually don’t have a lot of choices.

The easy choice is PostgreSQL advisory lock. But it should be used carefully, especially if you use database connection pools.

Imagine a type LockStorage allowing to create, wait, and release advisory locks.

package main

import (
	"context"
	"database/sql"
	"fmt"
)

const pgLockKey = 13

type LockStorage struct {
	db *sql.DB
}

func (s *LockStorage) Lock(ctx context.Context, lockID int) (Releaser, error) {
	// Get connection from connection pool
	// https://golang.org/pkg/database/sql/#DB.Conn
	conn, err := s.db.Conn(ctx)
	if err != nil {
		return nil, fmt.Errorf("cannot get database connection: %w", err)
	}
	// Obtain exclusive session level advisory lock
	// https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
	_, err = conn.ExecContext(
		ctx, "SELECT pg_advisory_lock($1, $2)", pgLockKey, lockID)
	if err != nil {
		return nil, fmt.Errorf("cannot obtain advisory lock: %w", err)
	}
	return &lock{conn: conn, lockID: lockID}, nil
}

// The rest is for completeness

type Releaser interface {
	Release(ctx context.Context) error
}

type lock struct {
	conn   *sql.Conn
	lockID int
}

func (l lock) Release(ctx context.Context) error {
	// Return connection to the connection pool
	defer l.conn.Close()
	// Release the advisory lock
	_, err := l.conn.ExecContext(
		ctx, "SELECT pg_advisory_unlock($1, $2)", pgLockKey, l.lockID)
	return err
}

SELECT pg_advisory_lock() waits until lock is free. Waiting goroutine holds a database connection. The pool of connections is usually limited, so several calls to Lock() can exhaust it. All callers will hang in a deadlock if the goroutine holding the advisory lock tries to get one more connection from the pool. Actually, it won’t deadlock as PostgreSQL has timeouts (minutes). But it will break the code trying to preserve consistency.

The fix is to use a mutex before obtaining a connection from the pool:

type LockStorage struct {
	db *sql.DB
	mu sync.Mutex
}

func (s *LockStorage) Lock(ctx context.Context, lockID int) (Releaser, error) {
	// (!) Protect connection pool from running out of connections
	s.mu.Lock()
	defer s.mu.Unlock()

	conn, err := s.db.Conn(ctx)
	if err != nil {
		return nil, fmt.Errorf("cannot get database connection: %w", err)
	}
	_, err = conn.ExecContext(
		ctx, "SELECT pg_advisory_lock($1, $2)", pgLockKey, lockID)
	if err != nil {
		return nil, fmt.Errorf("cannot obtain advisory lock: %w", err)
	}
	return &lock{conn: conn, lockID: lockID}, nil
}

Usually we use mutexes to protect a resource from concurrent access. The connection pool is thread-safe so it looks like we’re using a local mutex to protect a distributed mutex. This is wrong, the distributed mutex doesn’t need that for sure. Here we use the mutex just to limit the number of threads running in parallel in this particular process. The advisory lock limits the number of threads running in parallel across all processes. It’s nice to see how they work together, isn’t it?