diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..be6c9b6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +**/.DS_Store +.idea +.vscode +.vs diff --git a/concurrency/atomicmap.go b/concurrency/atomicmap.go new file mode 100644 index 0000000..f18ede5 --- /dev/null +++ b/concurrency/atomicmap.go @@ -0,0 +1,111 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package concurrency + +import ( + "sync" + + "golang.org/x/exp/constraints" +) + +type AtomicValue[T constraints.Integer] struct { + lock sync.RWMutex + value T +} + +func (a *AtomicValue[T]) Load() T { + a.lock.RLock() + defer a.lock.RUnlock() + return a.value +} + +func (a *AtomicValue[T]) Store(v T) { + a.lock.Lock() + defer a.lock.Unlock() + a.value = v +} + +func (a *AtomicValue[T]) Add(v T) T { + a.lock.Lock() + defer a.lock.Unlock() + a.value += v + return a.value +} + +type AtomicMap[K comparable, T constraints.Integer] interface { + Get(key K) (*AtomicValue[T], bool) + GetOrCreate(key K, createT T) *AtomicValue[T] + Delete(key K) + ForEach(fn func(key K, value *AtomicValue[T])) + Clear() +} + +type atomicMap[K comparable, T constraints.Integer] struct { + lock sync.RWMutex + items map[K]*AtomicValue[T] +} + +func NewAtomicMap[K comparable, T constraints.Integer]() AtomicMap[K, T] { + return &atomicMap[K, T]{ + items: make(map[K]*AtomicValue[T]), + } +} + +func (a *atomicMap[K, T]) Get(key K) (*AtomicValue[T], bool) { + a.lock.RLock() + defer a.lock.RUnlock() + + item, ok := a.items[key] + if !ok { + return nil, false + } + return item, true +} + +func (a *atomicMap[K, T]) GetOrCreate(key K, createT T) *AtomicValue[T] { + a.lock.RLock() + item, ok := a.items[key] + a.lock.RUnlock() + if !ok { + a.lock.Lock() + // Double-check the key exists to avoid race condition + item, ok = a.items[key] + if !ok { + item = &AtomicValue[T]{value: createT} + a.items[key] = item + } + a.lock.Unlock() + } + return item +} + +func (a *atomicMap[K, T]) Delete(key K) { + a.lock.Lock() + delete(a.items, key) + a.lock.Unlock() +} + +func (a *atomicMap[K, T]) ForEach(fn func(key K, value *AtomicValue[T])) { + a.lock.RLock() + defer a.lock.RUnlock() + for k, v := range a.items { + fn(k, v) + } +} + +func (a *atomicMap[K, T]) Clear() { + a.lock.Lock() + defer a.lock.Unlock() + clear(a.items) +} diff --git a/concurrency/atomicmap_test.go b/concurrency/atomicmap_test.go new file mode 100644 index 0000000..d39ac43 --- /dev/null +++ b/concurrency/atomicmap_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package concurrency + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAtomicMapInt32_New_Get_Delete(t *testing.T) { + m := NewAtomicMap[string, int32]().(*atomicMap[string, int32]) + + require.NotNil(t, m) + require.NotNil(t, m.items) + require.Empty(t, m.items) + + t.Run("basic operations", func(t *testing.T) { + key := "key1" + value := int32(10) + + // Initially, the key should not exist + _, ok := m.Get(key) + require.False(t, ok) + + // Add a value and check it + m.GetOrCreate(key, 0).Store(value) + result, ok := m.Get(key) + require.True(t, ok) + assert.Equal(t, value, result.Load()) + + // Delete the key and check it no longer exists + m.Delete(key) + _, ok = m.Get(key) + require.False(t, ok) + }) + + t.Run("concurrent access multiple keys", func(t *testing.T) { + var wg sync.WaitGroup + keys := []string{"key1", "key2", "key3"} + iterations := 100 + + wg.Add(len(keys) * 2) + for _, key := range keys { + go func(k string) { + defer wg.Done() + for i := 0; i < iterations; i++ { + m.GetOrCreate(k, 0).Add(1) + } + }(key) + go func(k string) { + defer wg.Done() + for i := 0; i < iterations; i++ { + m.GetOrCreate(k, 0).Add(-1) + } + }(key) + } + wg.Wait() + + for _, key := range keys { + val, ok := m.Get(key) + require.True(t, ok) + require.Equal(t, int32(0), val.Load()) + } + }) +} diff --git a/concurrency/mutexmap.go b/concurrency/mutexmap.go new file mode 100644 index 0000000..b00868b --- /dev/null +++ b/concurrency/mutexmap.go @@ -0,0 +1,119 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package concurrency + +import ( + "sync" +) + +// MutexMap is an interface that defines a thread-safe map with keys of type T associated to +// read-write mutexes (sync.RWMutex), allowing for granular locking on a per-key basis. +// This can be useful for scenarios where fine-grained concurrency control is needed. +// +// Methods: +// - Lock(key T): Acquires an exclusive lock on the mutex associated with the given key. +// - Unlock(key T): Releases the exclusive lock on the mutex associated with the given key. +// - RLock(key T): Acquires a read lock on the mutex associated with the given key. +// - RUnlock(key T): Releases the read lock on the mutex associated with the given key. +// - Delete(key T): Removes the mutex associated with the given key from the map. +// - Clear(): Removes all mutexes from the map. +// - ItemCount() int: Returns the number of items (mutexes) in the map. +type MutexMap[T comparable] interface { + Lock(key T) + Unlock(key T) + RLock(key T) + RUnlock(key T) + Delete(key T) + Clear() + ItemCount() int +} + +type mutexMap[T comparable] struct { + lock sync.RWMutex + items map[T]*sync.RWMutex +} + +func NewMutexMap[T comparable]() MutexMap[T] { + return &mutexMap[T]{ + items: make(map[T]*sync.RWMutex), + } +} + +func (a *mutexMap[T]) Lock(key T) { + a.lock.RLock() + mutex, ok := a.items[key] + a.lock.RUnlock() + if !ok { + a.lock.Lock() + mutex, ok = a.items[key] + if !ok { + mutex = &sync.RWMutex{} + a.items[key] = mutex + } + a.lock.Unlock() + } + mutex.Lock() +} + +func (a *mutexMap[T]) Unlock(key T) { + a.lock.RLock() + mutex, ok := a.items[key] + a.lock.RUnlock() + if ok { + mutex.Unlock() + } +} + +func (a *mutexMap[T]) RLock(key T) { + a.lock.RLock() + mutex, ok := a.items[key] + a.lock.RUnlock() + if !ok { + a.lock.Lock() + mutex, ok = a.items[key] + if !ok { + mutex = &sync.RWMutex{} + a.items[key] = mutex + } + a.lock.Unlock() + } + mutex.Lock() +} + +func (a *mutexMap[T]) RUnlock(key T) { + a.lock.RLock() + mutex, ok := a.items[key] + a.lock.RUnlock() + if ok { + mutex.Unlock() + } +} + +func (a *mutexMap[T]) Delete(key T) { + a.lock.Lock() + delete(a.items, key) + a.lock.Unlock() +} + +func (a *mutexMap[T]) Clear() { + a.lock.Lock() + clear(a.items) + a.lock.Unlock() +} + +func (a *mutexMap[T]) ItemCount() int { + a.lock.Lock() + defer a.lock.Unlock() + return len(a.items) +} diff --git a/concurrency/mutexmap_test.go b/concurrency/mutexmap_test.go new file mode 100644 index 0000000..2db2e13 --- /dev/null +++ b/concurrency/mutexmap_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package concurrency + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewMutexMap_Add_Delete(t *testing.T) { + mm := NewMutexMap[string]().(*mutexMap[string]) + + t.Run("New mutex map", func(t *testing.T) { + require.NotNil(t, mm) + require.NotNil(t, mm.items) + require.Empty(t, mm.items) + }) + + t.Run("Lock and unlock mutex", func(t *testing.T) { + mm.Lock("key1") + _, ok := mm.items["key1"] + require.True(t, ok) + mm.Unlock("key1") + }) + + t.Run("Concurrently lock and unlock mutexes", func(t *testing.T) { + var counter int + var wg sync.WaitGroup + + numGoroutines := 10 + wg.Add(numGoroutines) + + // Concurrently lock and unlock for each key + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + mm.Lock("key1") + counter++ + mm.Unlock("key1") + }() + } + wg.Wait() + + require.Equal(t, 10, counter) + }) + + t.Run("RLock and RUnlock mutex", func(t *testing.T) { + mm.RLock("key1") + _, ok := mm.items["key1"] + require.True(t, ok) + mm.RUnlock("key1") + }) + + t.Run("Concurrently RLock and RUnlock mutexes", func(t *testing.T) { + var counter int + var wg sync.WaitGroup + + numGoroutines := 10 + wg.Add(numGoroutines) + + // Concurrently RLock and RUnlock for each key + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + mm.RLock("key1") + counter++ + mm.RUnlock("key1") + }() + } + wg.Wait() + + require.Equal(t, 10, counter) + }) + + t.Run("Delete mutex", func(t *testing.T) { + mm.Lock("key1") + mm.Unlock("key1") + mm.Delete("key1") + _, ok := mm.items["key1"] + require.False(t, ok) + }) + + t.Run("Clear all mutexes, and check item count", func(t *testing.T) { + mm.Lock("key1") + mm.Unlock("key1") + mm.Lock("key2") + mm.Unlock("key2") + + require.Equal(t, 2, mm.ItemCount()) + + mm.Clear() + require.Empty(t, mm.items) + }) +}