Skip to content

Commit

Permalink
Implement mutex locks for LSP
Browse files Browse the repository at this point in the history
  • Loading branch information
emcfarlane committed Sep 20, 2024
1 parent 66575fa commit 7249207
Showing 1 changed file with 69 additions and 83 deletions.
152 changes: 69 additions & 83 deletions private/buf/buflsp/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"sync/atomic"
)

const poison = ^uint64(0)

var nextRequestID atomic.Uint64

// withReentrancy assigns a unique request ID to the given context, which can be retrieved
Expand Down Expand Up @@ -56,116 +54,104 @@ func getRequestID(ctx context.Context) uint64 {
return id + 1
}

// mutexPool represents a group of reentrant muteces that cannot be acquired simultaneously.
// mutexPool represents a group of reentrant mutexes that cannot be acquired
// simultaneously.
//
// A zero mutexPool is ready to use.
// The zero value is ready to use.
type mutexPool struct {
lock sync.Mutex
held map[uint64]*mutex
cond sync.Cond
// locks is a set of locked mutexes.
locks map[*mutex]struct{}
// owners is a set of request ids that requested a lock.
owners map[uint64]struct{}
}

// NewMutex creates a new mutex in this pool.
func (mp *mutexPool) NewMutex() mutex {
return mutex{pool: mp}
return mutex{mutexPool: mp}
}

func (m *mutexPool) initWithLock() {
if m.cond.L == nil {
m.cond.L = &m.lock
m.owners = make(map[uint64]struct{})
m.locks = make(map[*mutex]struct{})
}
}

// mutex is a sync.Mutex with some extra features.
// mutex is a mutual exlusion lock derived from a mutexPool.
// It must be created with [mutexPool.NewMutex].
//
// The main feature is reentrancy-checking. Within the LSP, we need to lock-protect many structures,
// and it is very easy to deadlock if the same request tries to lock something multiple times.
// To achieve this, Lock() takes a context, which must be modified by withRequestID().
type mutex struct {
lock sync.Mutex
// This is the id of the context currently holding the lock.
who atomic.Uint64
pool *mutexPool
*mutexPool
}

// Lock attempts to acquire this mutex or blocks.
//
// Unlike [sync.Mutex.Lock], this takes a Context. If that context was updated with withRequestID,
// this function will panic when attempting to lock the mutex while it is already held by a
// goroutine using this same context.
//
// NOTE: to Lock() and Unlock() with the same context DO NOT synchronize with each other. For example,
// attempting to lock this mutex from two different goroutines with the same context will
// result in undefined behavior.
// Unlike [sync.Mutex.Lock], this takes a Context. If that context was updated
// with withRequestID, this function will error when attempting to lock the
// mutex while it is already held by a goroutine using the same context.
//
// Also unlike [sync.Mutex.Lock], it returns an idempotent unlocker function. This can be used like
// defer mu.Lock()(). Note that only the outer function call is deferred: this is part of the
// definition of defer. See https://go.dev/play/p/RJNKRcoQRo1. This unlocker can also be used to
// defer unlocking but also unlock before the function returns.
// Unlike [sync.Mutex.Lock], this takes a Context. The context must be updated
// with withRequestID to ensure that the same request cannot lock multiple
// mutexes from the same mutexPool.
//
// The returned unlocker is not thread-safe.
func (mu *mutex) Lock(ctx context.Context) (unlocker func()) {
var unlocked bool
unlocker = func() {
if unlocked {
return
}
mu.Unlock(ctx)
unlocked = true
}

// It returns an error if the context is canceled before the lock is acquired
// or if the request ID has already locked another mutex.
func (m *mutex) Lock(ctx context.Context) error {
id := getRequestID(ctx)

if mu.who.Load() == id && id > 0 {
// We seem to have tried to lock this lock twice. Panic, and poison the lock.
mu.who.Store(poison)
panic("buflsp/mutex.go: non-reentrant lock locked twice by the same request")
m.lock.Lock()
defer m.lock.Unlock()
m.initWithLock()
// Owner must be unique.
if _, ok := m.owners[id]; ok || id == 0 {
return fmt.Errorf("owner id %d already locked", id)
}

if mu.pool != nil {
mu.pool.lock.Lock()
defer mu.pool.lock.Unlock()
if mu.pool.held == nil {
mu.pool.held = make(map[uint64]*mutex)
m.owners[id] = struct{}{}
// AfterFunc is used to ensure waiters are woken up when the context is done.
stop := context.AfterFunc(ctx, func() {
// The lock must be held to broadcast.
m.lock.Lock()
m.cond.Broadcast()
m.lock.Unlock()
})
defer stop()
// Wait for the lock to be available.
for {
// Check for context cancellation before acquiring the lock.
if err := ctx.Err(); err != nil {
delete(m.owners, id)
return err
}
if held := mu.pool.held[id]; held != nil {
panic(fmt.Sprintf("buflsp/mutex.go: attempted to acquire two non-reentrant locks at once: %p -> %p", mu, held))
// Acquire the lock if it is not already locked.
if _, ok := m.locks[m]; !ok {
m.locks[m] = struct{}{}
break
}
mu.pool.held[id] = mu
m.cond.Wait()
}

// Ok, we're definitely not holding a lock, so we can block until we acquire the lock.
mu.lock.Lock()
mu.storeWho(id)

return unlocker
// Lock acquired.
return nil
}

// Unlock releases this mutex.
//
// Unlock must be called with the same context that locked it, otherwise this function panics.
func (mu *mutex) Unlock(ctx context.Context) {
// It must be called with the same context that locked it, otherwise this
// function panics.
func (m *mutex) Unlock(ctx context.Context) {
m.lock.Lock()
defer m.lock.Unlock()
id := getRequestID(ctx)
if mu.who.Load() != id {
panic("buflsp/mutex.go: lock was locked by one request and unlocked by another")
}

mu.storeWho(0)

if mu.pool != nil {
mu.pool.lock.Lock()
defer mu.pool.lock.Unlock()
if held := mu.pool.held[id]; held != mu {
panic(fmt.Sprintf("buflsp/mutex.go: attempted to unlock the wrong lock: %p -> %p", mu, held))
}
delete(mu.pool.held, id)
}

mu.lock.Unlock()
}

func (mu *mutex) storeWho(id uint64) {
for {
// This has to be a CAS loop to avoid races with a poisoning p.
old := mu.who.Load()
if old == poison {
panic("buflsp/mutex.go: non-reentrant lock locked twice by the same request")
}
if mu.who.CompareAndSwap(old, id) {
break
}
_, isLocked := m.locks[m]
_, isOwner := m.owners[id]
if !isLocked || !isOwner {
panic(fmt.Sprintf("unlock of unlocked mutex id %d", id))
}
delete(m.owners, id)
delete(m.locks, m)
m.cond.Broadcast()
}

0 comments on commit 7249207

Please sign in to comment.