Skip to content

Commit

Permalink
Resolve memory leaks in ConcurrentMap, EpochManager, LockFreeStack, a…
Browse files Browse the repository at this point in the history
…nd LockFreeQueue (#25928)

Resolves all of the memory leaks that are exposed by our current tests
in the ConcurrentMap, EpochManager, LockFreeStack, and LockFreeQueue
modules

While working on this, I also found
#25926 and this PR adds a
future for that.

Testing
- [x] full paratest with/without comm
- [x] full paratest with -memleaks
- [x] full paratest with comm and -memleaks

[Reviewed by @stonea]
  • Loading branch information
jabraham17 authored Sep 24, 2024
2 parents 685efef + 5d4f573 commit afb02c2
Show file tree
Hide file tree
Showing 43 changed files with 150 additions and 159 deletions.
28 changes: 19 additions & 9 deletions modules/packages/ConcurrentMap.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ module ConcurrentMap {
this.size = round(parent.buckets.size * MULTIPLIER_NUM_BUCKETS):int;
this.bucketsDom = {0..#round(parent.buckets.size * MULTIPLIER_NUM_BUCKETS):int};
}
proc deinit() {
[i in bucketsDom] {
if var bb = buckets[i].read() then delete bb;
}
}

// _gen_key will generate the hash on the combined seed and hash of original key
// which ensures a better distribution of keys from varying seeds.
Expand Down Expand Up @@ -293,6 +298,9 @@ module ConcurrentMap {
root = new unmanaged Buckets(keyType, valType);
root.lock.write(P_INNER);
}
proc deinit() {
delete root;
}

/*
Register the task to epoch manager.
Expand All @@ -308,10 +316,10 @@ module ConcurrentMap {
var shouldYield = false;
const defaultHash = key.hash();
var idx = (curr._hash(defaultHash) % (curr.buckets.size):uint):int;
while (true) {
while true {
var next = curr.buckets[idx].read();
// writeln("stuck");
if (next == nil) {
if next == nil {
// If we're not inserting something, I.E we are removing
// or retrieving, we are done.
if !isInsertion then return nil;
Expand All @@ -321,20 +329,20 @@ module ConcurrentMap {
newList.lock.write(E_LOCK);

// We set our Bucket, we also own it so return it
if (curr.buckets[idx].compareAndSwap(nil, newList)) {
if curr.buckets[idx].compareAndSwap(nil, newList) {
return newList;
} else {
// Someone else set their bucket, reload.
delete newList;
}
}
else if (next!.lock.read() == P_INNER) {
else if next!.lock.read() == P_INNER {
curr = next : unmanaged Buckets(keyType, valType);
idx = (curr._hash(defaultHash) % (curr.buckets.size):uint):int;
}
else if (next!.lock.read() == E_AVAIL) {
else if next!.lock.read() == E_AVAIL {
// We now own the bucket...
if (next!.lock.compareAndSwap(E_AVAIL, E_LOCK)) {
if next!.lock.compareAndSwap(E_AVAIL, E_LOCK) {
// Non-insertions don't care.
if !isInsertion then return next : unmanaged Bucket(keyType, valType);
// Insertions cannot have a full bucket...
Expand Down Expand Up @@ -568,7 +576,8 @@ module ConcurrentMap {
:yields: A copy of one of the keys contained in this map.
*/
iter keys() : keyType {
for (key, _) in this {
// dummy variable to workaround: https://github.com/chapel-lang/chapel/issues/25926
for (key, val) in this {
yield key;
}
}
Expand All @@ -579,7 +588,8 @@ module ConcurrentMap {
:yields: A copy of one of the values contained in this map.
*/
iter values() : valType {
for (_, val) in this {
// dummy variable to workaround: https://github.com/chapel-lang/chapel/issues/25926
for (dummy, val) in this {
yield val;
}
}
Expand Down Expand Up @@ -854,7 +864,7 @@ module ConcurrentMap {
tok.pin();
var elist = getEList(key, true, tok);
for i in 0..#elist!.count {
if (elist!.keys[i] == key) {
if elist!.keys[i] == key {
elist!.values[i] = val;
elist!.lock.write(E_AVAIL);
tok.unpin();
Expand Down
53 changes: 31 additions & 22 deletions modules/packages/EpochManager.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,20 @@ module EpochManager {
do {
var oldHead = _head.readABA();
_node.next = oldHead.getObject();
} while(!_head.compareAndSwapABA(oldHead, _node));
} while !_head.compareAndSwapABA(oldHead, _node);
}

iter these() : objType {
var ptr = _head.read();
while (ptr != nil) {
while ptr != nil {
yield ptr!.val!;
ptr = ptr!.next;
}
}

proc deinit() {
var ptr = _head.read();
while (ptr != nil) {
while ptr != nil {
var next = ptr!.next;
delete ptr;
ptr = next;
Expand Down Expand Up @@ -228,11 +228,11 @@ module EpochManager {
do {
oldTop = _freeListHead.readABA();
n = oldTop.getObject();
if (n == nil) {
if n == nil {
return new unmanaged Node(objType);
}
var newTop = n!.freeListNext;
} while (!_freeListHead.compareAndSwapABA(oldTop, newTop));
} while !_freeListHead.compareAndSwapABA(oldTop, newTop);
n!.next.write(nil);
n!.freeListNext = nil;
return n!;
Expand All @@ -243,16 +243,16 @@ module EpochManager {
n.val = newObj;

// Now enqueue
while (true) {
while true {
var tail = _tail.readABA();
var next = tail.next.readABA();
var next_node = next.getObject();
var curr_tail = _tail.readABA();

// Check if tail and next are consistent
if (tail == curr_tail) {
if (next_node == nil) {
if (curr_tail.next.compareAndSwapABA(next, n)) {
if tail == curr_tail {
if next_node == nil {
if curr_tail.next.compareAndSwapABA(next, n) {

// Enqueue is done. Try to swing Tail to the inserted node
_tail.compareAndSwapABA(curr_tail, n);
Expand All @@ -267,7 +267,7 @@ module EpochManager {
}

proc dequeue() : objType? {
while (true) {
while true {
var head = _head.readABA();
var head_node = head.getObject();
var curr_tail = _tail.readABA();
Expand All @@ -276,15 +276,15 @@ module EpochManager {
var next_node = next.getObject();
var curr_head = _head.readABA();

if (head == curr_head) {
if (head_node == tail_node) {
if (next_node == nil) then
if head == curr_head {
if head_node == tail_node {
if next_node == nil then
return nil;
_tail.compareAndSwapABA(curr_tail, next_node);
}
else {
var ret_val = next_node!.val;
if (_head.compareAndSwapABA(curr_head, next_node)) {
if _head.compareAndSwapABA(curr_head, next_node) {
retireNode(head_node!);
return ret_val;
}
Expand Down Expand Up @@ -314,22 +314,22 @@ module EpochManager {

proc peek() : objType {
var actual_head = _head.read().next.read();
if (actual_head != nil) then
if actual_head != nil then
return actual_head.val;
return nil;
}

proc deinit() {
var ptr = _head.read();
while (ptr != nil) {
while ptr != nil {
var tmp = ptr!.next.read();
if delete_val then delete ptr!.val;
delete ptr;
ptr = tmp;
}

ptr = _freeListHead.read();
while (ptr != nil) {
while ptr != nil {
var head = ptr!.freeListNext;
delete ptr;
ptr = head;
Expand Down Expand Up @@ -374,11 +374,11 @@ module EpochManager {
do {
oldTop = _freeListHead.readABA();
n = oldTop.getObject();
if (n == nil) {
if n == nil {
return new unmanaged Node(obj);
}
var newTop = n!.next;
} while (!_freeListHead.compareAndSwapABA(oldTop, newTop));
} while !_freeListHead.compareAndSwapABA(oldTop, newTop);
n!.val = obj;
return n!;
}
Expand All @@ -388,7 +388,7 @@ module EpochManager {
do {
var oldTop = _freeListHead.readABA();
nextObj.next = oldTop.getObject();
} while (!_freeListHead.compareAndSwapABA(oldTop, nextObj));
} while !_freeListHead.compareAndSwapABA(oldTop, nextObj);
}

proc pop() {
Expand All @@ -397,7 +397,14 @@ module EpochManager {

proc deinit() {
var ptr = _head.read();
while (ptr != nil) {
while ptr != nil {
var next = ptr!.next;
delete ptr!.val;
delete ptr;
ptr = next;
}
ptr = _freeListHead.read();
while ptr != nil {
var next = ptr!.next;
delete ptr!.val;
delete ptr;
Expand Down Expand Up @@ -700,6 +707,7 @@ module EpochManager {
Reclaim all objects
*/
proc deinit() {
[tok in allocated_list] delete tok;
delete allocated_list;
delete free_list;
delete limbo_list;
Expand Down Expand Up @@ -904,6 +912,7 @@ module EpochManager {
// Delete locale-private data
delete limbo_list;
delete free_list;
[tok in allocated_list] delete tok;
delete allocated_list;
delete objsToDelete;

Expand Down Expand Up @@ -1049,7 +1058,7 @@ module EpochManager {
var head = limbo.pop();

// Prepare work to be scattered by locale it is intended for.
while (head != nil) {
while head != nil {
var obj = head!.val;
var next = head!.next;
_this.objsToDelete[obj.locale.id].append(obj!);
Expand Down
20 changes: 13 additions & 7 deletions modules/packages/LockFreeQueue.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ module LockFreeQueue {
_head.write(_node);
_tail.write(_node);
}
proc deinit() {
drain();
if var head = _head.read() {
delete head;
}
}

proc getToken() : owned TokenWrapper {
return _manager.register();
Expand All @@ -146,11 +152,11 @@ module LockFreeQueue {
proc enqueue(newObj : objType, tok : owned TokenWrapper = getToken()) {
var n = new unmanaged Node(newObj);
tok.pin();
while (true) {
while true {
var curr_tail = _tail.read()!;
var next = curr_tail.next.read();
if (next == nil) {
if (curr_tail.next.compareAndSwap(next, n)) {
if next == nil {
if curr_tail.next.compareAndSwap(next, n) {
_tail.compareAndSwap(curr_tail, n);
break;
}
Expand All @@ -165,13 +171,13 @@ module LockFreeQueue {

proc dequeue(tok : owned TokenWrapper = getToken()) : (bool, objTypeOpt) {
tok.pin();
while (true) {
while true {
var curr_head = _head.read()!;
var curr_tail = _tail.read();
var next_node = curr_head.next.read();

if (curr_head == curr_tail) {
if (next_node == nil) {
if curr_head == curr_tail {
if next_node == nil {
tok.unpin();
var retval : objTypeOpt;
return (false, retval);
Expand All @@ -180,7 +186,7 @@ module LockFreeQueue {
}
else {
var ret_val = next_node!.val;
if (_head.compareAndSwap(curr_head, next_node)) {
if _head.compareAndSwap(curr_head, next_node) {
tok.deferDelete(curr_head);
tok.unpin();
return (true, ret_val);
Expand Down
12 changes: 9 additions & 3 deletions modules/packages/LockFreeStack.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ module LockFreeStack {
proc init(type objType) {
this.objType = objType;
}
proc deinit() {
drain();
if var top = _top.read() {
delete top;
}
}

proc getToken() : owned TokenWrapper {
return _manager.register();
Expand All @@ -148,7 +154,7 @@ module LockFreeStack {
n.next = oldTop;
if shouldYield then currentTask.yieldExecution();
shouldYield = true;
} while (!_top.compareAndSwap(oldTop, n));
} while !_top.compareAndSwap(oldTop, n);
tok.unpin();
}

Expand All @@ -158,15 +164,15 @@ module LockFreeStack {
var shouldYield = false;
do {
oldTop = _top.read();
if (oldTop == nil) {
if oldTop == nil {
tok.unpin();
var retval : objType;
return (false, retval);
}
var newTop = oldTop!.next;
if shouldYield then currentTask.yieldExecution();
shouldYield = true;
} while (!_top.compareAndSwap(oldTop, newTop));
} while !_top.compareAndSwap(oldTop, newTop);
var retval = oldTop!.val;
tok.deferDelete(oldTop);
tok.unpin();
Expand Down
20 changes: 20 additions & 0 deletions test/functions/iterators/bugs/iter-memory-leak-unpack.bad
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{id = 2}
deinit C id=2
{id = 4}
deinit C id=4
(id = 2)
deinit R id=2
deinit R id=2
deinit R id=1
(id = 4)
deinit R id=4
deinit R id=4
deinit R id=3

=================================================== Memory Leaks ====================================================
Allocated Memory (Bytes) Number Size Total Description Address
=====================================================================================================================
iter-memory-leak-unpack.chpl:26 1 16 16 C prediffed
iter-memory-leak-unpack.chpl:26 1 16 16 C prediffed
=====================================================================================================================

Loading

0 comments on commit afb02c2

Please sign in to comment.