Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pveentjer committed Feb 24, 2025
1 parent f2d436d commit 11eec2c
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 16 deletions.
3 changes: 0 additions & 3 deletions agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,6 @@ public boolean weakCompareAndSetLongPlain(final int index, final long expectedVa
return UnsafeApi.weakCompareAndSetLongPlain(byteArray, addressOffset + index, expectedValue, updateValue);
}


/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -668,7 +667,6 @@ public int addIntRelease(final int index, final int increment)
return UnsafeApi.getAndAddIntRelease(byteArray, addressOffset + index, increment);
}


/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -880,7 +878,6 @@ public void putCharVolatile(final int index, final char value)
UnsafeApi.putCharVolatile(byteArray, addressOffset + index, value);
}


/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ private int claimCapacity(final AtomicBuffer buffer, final int recordLength)
newTail += padding;
}
}
while (!buffer.compareAndSetLong(tailPositionIndex, tail, newTail));
while (!buffer.weakCompareAndSetLong(tailPositionIndex, tail, newTail));

if (0 != padding)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void shouldWriteToEmptyBuffer()

when(buffer.getLongVolatile(HEAD_COUNTER_INDEX)).thenReturn(head);
when(buffer.getLongVolatile(TAIL_COUNTER_INDEX)).thenReturn(tail);
when(buffer.compareAndSetLong(TAIL_COUNTER_INDEX, tail, tail + alignedRecordLength))
when(buffer.weakCompareAndSetLong(TAIL_COUNTER_INDEX, tail, tail + alignedRecordLength))
.thenReturn(TRUE);

final UnsafeBuffer srcBuffer = new UnsafeBuffer(allocateDirect(1024));
Expand Down Expand Up @@ -133,7 +133,7 @@ void shouldRejectWriteWhenInsufficientSpace()
assertFalse(ringBuffer.write(MSG_TYPE_ID, srcBuffer, srcIndex, length));

verify(buffer, never()).putInt(anyInt(), anyInt());
verify(buffer, never()).compareAndSetLong(anyInt(), anyLong(), anyLong());
verify(buffer, never()).weakCompareAndSetLong(anyInt(), anyLong(), anyLong());
verify(buffer, never()).putBytes(anyInt(), eq(srcBuffer), anyInt(), anyInt());
verify(buffer, never()).putIntRelease(anyInt(), anyInt());
}
Expand All @@ -154,7 +154,7 @@ void shouldRejectWriteWhenBufferFull()
assertFalse(ringBuffer.write(MSG_TYPE_ID, srcBuffer, srcIndex, length));

verify(buffer, never()).putInt(anyInt(), anyInt());
verify(buffer, never()).compareAndSetLong(anyInt(), anyLong(), anyLong());
verify(buffer, never()).weakCompareAndSetLong(anyInt(), anyLong(), anyLong());
verify(buffer, never()).putIntRelease(anyInt(), anyInt());
}

Expand All @@ -169,7 +169,7 @@ void shouldInsertPaddingRecordPlusMessageOnBufferWrap()

when(buffer.getLongVolatile(HEAD_COUNTER_INDEX)).thenReturn(head);
when(buffer.getLongVolatile(TAIL_COUNTER_INDEX)).thenReturn(tail);
when(buffer.compareAndSetLong(TAIL_COUNTER_INDEX, tail, tail + alignedRecordLength + ALIGNMENT))
when(buffer.weakCompareAndSetLong(TAIL_COUNTER_INDEX, tail, tail + alignedRecordLength + ALIGNMENT))
.thenReturn(TRUE);

final UnsafeBuffer srcBuffer = new UnsafeBuffer(allocateDirect(1024));
Expand Down Expand Up @@ -199,7 +199,7 @@ void shouldInsertPaddingRecordPlusMessageOnBufferWrapWithHeadEqualToTail()

when(buffer.getLongVolatile(HEAD_COUNTER_INDEX)).thenReturn(head);
when(buffer.getLongVolatile(TAIL_COUNTER_INDEX)).thenReturn(tail);
when(buffer.compareAndSetLong(TAIL_COUNTER_INDEX, tail, tail + alignedRecordLength + ALIGNMENT))
when(buffer.weakCompareAndSetLong(TAIL_COUNTER_INDEX, tail, tail + alignedRecordLength + ALIGNMENT))
.thenReturn(TRUE);

final UnsafeBuffer srcBuffer = new UnsafeBuffer(allocateDirect(1024));
Expand Down Expand Up @@ -488,7 +488,7 @@ void shouldInsertPaddingAndWriteToBuffer()
when(buffer.getLongVolatile(HEAD_COUNTER_INDEX)).thenReturn(head);
when(buffer.getLongVolatile(TAIL_COUNTER_INDEX)).thenReturn(tail);
when(buffer.getLongVolatile(HEAD_COUNTER_CACHE_INDEX)).thenReturn(headCache);
when(buffer.compareAndSetLong(TAIL_COUNTER_INDEX, tail, tail + alignedRecordLength + padding))
when(buffer.weakCompareAndSetLong(TAIL_COUNTER_INDEX, tail, tail + alignedRecordLength + padding))
.thenReturn(true);

final UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[messageLength]);
Expand Down Expand Up @@ -525,7 +525,7 @@ void tryClaimReturnsIndexAtWhichEncodedMessageStarts()
final int recordIndex = (int)tailPosition;
when(buffer.getLongVolatile(HEAD_COUNTER_CACHE_INDEX)).thenReturn(headPosition);
when(buffer.getLongVolatile(TAIL_COUNTER_INDEX)).thenReturn(tailPosition);
when(buffer.compareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + alignedRecordLength))
when(buffer.weakCompareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + alignedRecordLength))
.thenReturn(TRUE);

final int index = ringBuffer.tryClaim(MSG_TYPE_ID, length);
Expand All @@ -535,7 +535,7 @@ void tryClaimReturnsIndexAtWhichEncodedMessageStarts()
final InOrder inOrder = inOrder(buffer);
inOrder.verify(buffer).getLongVolatile(HEAD_COUNTER_CACHE_INDEX);
inOrder.verify(buffer).getLongVolatile(TAIL_COUNTER_INDEX);
inOrder.verify(buffer).compareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + alignedRecordLength);
inOrder.verify(buffer).weakCompareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + alignedRecordLength);
inOrder.verify(buffer).putIntRelease(lengthOffset(recordIndex), -recordLength);
inOrder.verify(buffer).putInt(typeOffset(recordIndex), MSG_TYPE_ID);
inOrder.verifyNoMoreInteractions();
Expand All @@ -554,7 +554,7 @@ void tryClaimReturnsIndexAtWhichEncodedMessageStartsAfterPadding()
final int recordIndex = 0;
when(buffer.getLongVolatile(HEAD_COUNTER_CACHE_INDEX)).thenReturn(headPosition);
when(buffer.getLongVolatile(TAIL_COUNTER_INDEX)).thenReturn(tailPosition);
when(buffer.compareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + alignedRecordLength + padding))
when(buffer.weakCompareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + alignedRecordLength + padding))
.thenReturn(TRUE);

final int index = ringBuffer.tryClaim(MSG_TYPE_ID, length);
Expand All @@ -565,7 +565,7 @@ void tryClaimReturnsIndexAtWhichEncodedMessageStartsAfterPadding()
inOrder.verify(buffer).getLongVolatile(HEAD_COUNTER_CACHE_INDEX);
inOrder.verify(buffer).getLongVolatile(TAIL_COUNTER_INDEX);
inOrder.verify(buffer)
.compareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + alignedRecordLength + padding);
.weakCompareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + alignedRecordLength + padding);
inOrder.verify(buffer).putIntRelease(lengthOffset(paddingIndex), -padding);
inOrder.verify(buffer).putInt(typeOffset(paddingIndex), PADDING_MSG_TYPE_ID);
inOrder.verify(buffer).putIntRelease(lengthOffset(paddingIndex), padding);
Expand Down Expand Up @@ -607,7 +607,7 @@ void tryClaimReturnsInsufficientCapacityTail()
when(buffer.getLongVolatile(HEAD_COUNTER_CACHE_INDEX)).thenReturn(cachedHeadPosition);
when(buffer.getLongVolatile(HEAD_COUNTER_INDEX)).thenReturn(headPosition);
when(buffer.getLongVolatile(TAIL_COUNTER_INDEX)).thenReturn(tailPosition);
when(buffer.compareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + padding))
when(buffer.weakCompareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + padding))
.thenReturn(TRUE);

final int index = ringBuffer.tryClaim(MSG_TYPE_ID, length);
Expand All @@ -621,7 +621,7 @@ void tryClaimReturnsInsufficientCapacityTail()
inOrder.verify(buffer).putLongRelease(HEAD_COUNTER_CACHE_INDEX, headPosition);
inOrder.verify(buffer).getLongVolatile(HEAD_COUNTER_INDEX);
inOrder.verify(buffer).putLongRelease(HEAD_COUNTER_CACHE_INDEX, headPosition);
inOrder.verify(buffer).compareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + padding);
inOrder.verify(buffer).weakCompareAndSetLong(TAIL_COUNTER_INDEX, tailPosition, tailPosition + padding);
inOrder.verify(buffer).putIntRelease(lengthOffset(paddingIndex), -padding);
inOrder.verify(buffer).putInt(typeOffset(paddingIndex), PADDING_MSG_TYPE_ID);
inOrder.verify(buffer).putIntRelease(lengthOffset(paddingIndex), padding);
Expand Down

0 comments on commit 11eec2c

Please sign in to comment.