From 814912e3300f363341fe3c115846aca1bc51c218 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Wed, 26 Feb 2025 12:44:51 +0200 Subject: [PATCH] Added AtomicBuffer compareAndExchange int/long --- .../org/agrona/concurrent/AtomicBuffer.java | 27 ++++++++++++++ .../org/agrona/concurrent/UnsafeBuffer.java | 27 ++++++++++++++ .../agrona/concurrent/AtomicBufferTest.java | 36 +++++++++++++++++++ 3 files changed, 90 insertions(+) diff --git a/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java b/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java index 3db9f2e28..64a5d030d 100644 --- a/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java +++ b/agrona/src/main/java/org/agrona/concurrent/AtomicBuffer.java @@ -218,6 +218,19 @@ public interface AtomicBuffer extends MutableDirectBuffer */ boolean compareAndSetLong(int index, long expectedValue, long updateValue); + /** + * Atomic compare and exchange of a long given an expected value. + *

+ * This call has sequential-consistent semantics. + * + * @param index in bytes for where to put. + * @param expectedValue at to be compared. + * @param updateValue to be exchanged. + * @return the old value no matter if the expected value was found. + * @since 2.1.0 + */ + long compareAndExchangeLong(int index, long expectedValue, long updateValue); + /** * Atomically exchange a value at a location returning the previous contents. *

@@ -359,6 +372,20 @@ public interface AtomicBuffer extends MutableDirectBuffer */ boolean compareAndSetInt(int index, int expectedValue, int updateValue); + /** + * Atomic compare and exchange of a int given an expected value. + *

+ * This call has sequential-consistent semantics. + * + * @param index in bytes for where to put. + * @param expectedValue at to be compared. + * @param updateValue to be exchanged. + * @return the old value no matter if the expected value was found. + * @since 2.1.0 + */ + int compareAndExchangeInt(int index, int expectedValue, int updateValue); + + /** * Atomically exchange a value at a location returning the previous contents. *

diff --git a/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java b/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java index 1f4ec33a6..3eda2009e 100644 --- a/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java +++ b/agrona/src/main/java/org/agrona/concurrent/UnsafeBuffer.java @@ -508,6 +508,20 @@ public boolean compareAndSetLong(final int index, final long expectedValue, fina return UnsafeApi.compareAndSetLong(byteArray, addressOffset + index, expectedValue, updateValue); } + /** + * {@inheritDoc} + */ + public long compareAndExchangeLong(final int index, final long expectedValue, final long updateValue) + { + if (SHOULD_BOUNDS_CHECK) + { + boundsCheck0(index, SIZE_OF_LONG); + } + + return UnsafeApi.compareAndExchangeLong(byteArray, addressOffset + index, expectedValue, updateValue); + } + + /** * {@inheritDoc} */ @@ -671,6 +685,19 @@ public boolean compareAndSetInt(final int index, final int expectedValue, final return UnsafeApi.compareAndSetInt(byteArray, addressOffset + index, expectedValue, updateValue); } + /** + * {@inheritDoc} + */ + public int compareAndExchangeInt(final int index, final int expectedValue, final int updateValue) + { + if (SHOULD_BOUNDS_CHECK) + { + boundsCheck0(index, SIZE_OF_INT); + } + + return UnsafeApi.compareAndExchangeInt(byteArray, addressOffset + index, expectedValue, updateValue); + } + /** * {@inheritDoc} */ diff --git a/agrona/src/test/java/org/agrona/concurrent/AtomicBufferTest.java b/agrona/src/test/java/org/agrona/concurrent/AtomicBufferTest.java index 04469bce0..23fca9e96 100644 --- a/agrona/src/test/java/org/agrona/concurrent/AtomicBufferTest.java +++ b/agrona/src/test/java/org/agrona/concurrent/AtomicBufferTest.java @@ -139,6 +139,24 @@ void shouldCompareAndSetLongToNativeBuffer(final ByteBuffer buffer) assertThat(buffer.getLong(INDEX), is(LONG_VALUE + 1)); } + @ParameterizedTest + @MethodSource("atomicBuffers") + void shouldCompareAndExchangeLongToNativeBuffer(final ByteBuffer buffer) + { + final UnsafeBuffer unsafeBuffer = new UnsafeBuffer(buffer); + + buffer.putLong(INDEX, LONG_VALUE); + + // success + assertEquals(LONG_VALUE, unsafeBuffer.compareAndExchangeLong(INDEX, LONG_VALUE, LONG_VALUE + 1)); + assertEquals(LONG_VALUE + 1, unsafeBuffer.getLongVolatile(INDEX)); + + // failure + assertEquals(LONG_VALUE + 1, + unsafeBuffer.compareAndExchangeLong(INDEX, LONG_VALUE + 10, 21)); + assertEquals(LONG_VALUE + 1, buffer.getLong(INDEX)); + } + @ParameterizedTest @MethodSource("atomicBuffers") void shouldGetAndSetLongToNativeBuffer(final ByteBuffer buffer) @@ -251,6 +269,24 @@ void shouldCompareAndSetIntToNativeBuffer(final ByteBuffer buffer) assertThat(buffer.getInt(INDEX), is(INT_VALUE + 1)); } + @ParameterizedTest + @MethodSource("atomicBuffers") + void shouldCompareAndExchangeIntToNativeBuffer(final ByteBuffer buffer) + { + final UnsafeBuffer unsafeBuffer = new UnsafeBuffer(buffer); + + buffer.putInt(INDEX, INT_VALUE); + + // success + assertEquals(INT_VALUE, unsafeBuffer.compareAndExchangeInt(INDEX, INT_VALUE, INT_VALUE + 1)); + assertEquals(INT_VALUE + 1, unsafeBuffer.getIntVolatile(INDEX)); + + // failure + assertEquals(INT_VALUE + 1, + unsafeBuffer.compareAndExchangeInt(INDEX, INT_VALUE + 10, 21)); + assertEquals(INT_VALUE + 1, buffer.getInt(INDEX)); + } + @ParameterizedTest @MethodSource("atomicBuffers") void shouldGetAndSetIntToNativeBuffer(final ByteBuffer buffer)