Abstract implementation of {@link RetainableByteBuffer} with
* reference counting.
+ * @deprecated
*/
-public abstract class AbstractRetainableByteBuffer implements RetainableByteBuffer
+@Deprecated(forRemoval = true)
+public abstract class AbstractRetainableByteBuffer extends RetainableByteBuffer.FixedCapacity
{
- private final ReferenceCounter refCount = new ReferenceCounter(0);
- private final ByteBuffer byteBuffer;
+ private final ReferenceCounter _refCount;
public AbstractRetainableByteBuffer(ByteBuffer byteBuffer)
{
- this.byteBuffer = Objects.requireNonNull(byteBuffer);
+ super(byteBuffer, new ReferenceCounter(0));
+ _refCount = (ReferenceCounter)getWrapped();
}
/**
@@ -37,42 +36,6 @@ public AbstractRetainableByteBuffer(ByteBuffer byteBuffer)
*/
protected void acquire()
{
- refCount.acquire();
- }
-
- @Override
- public boolean canRetain()
- {
- return refCount.canRetain();
- }
-
- @Override
- public void retain()
- {
- refCount.retain();
- }
-
- @Override
- public boolean release()
- {
- return refCount.release();
- }
-
- @Override
- public boolean isRetained()
- {
- return refCount.isRetained();
- }
-
- @Override
- public ByteBuffer getByteBuffer()
- {
- return byteBuffer;
- }
-
- @Override
- public String toString()
- {
- return "%s@%x[rc=%d,%s]".formatted(getClass().getSimpleName(), hashCode(), refCount.get(), BufferUtil.toDetailString(byteBuffer));
+ _refCount.acquire();
}
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
index 38d3227e0ff0..c1cf7c490666 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
@@ -199,7 +199,7 @@ public int getMaxCapacity()
}
@Override
- public RetainableByteBuffer acquire(int size, boolean direct)
+ public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
RetainedBucket bucket = bucketFor(size, direct);
@@ -210,24 +210,24 @@ public RetainableByteBuffer acquire(int size, boolean direct)
bucket.recordAcquire();
// Try to acquire a pooled entry.
- Pool.Entry entry = bucket.getPool().acquire();
+ Pool.Entry entry = bucket.getPool().acquire();
if (entry != null)
{
bucket.recordPooled();
- RetainableByteBuffer buffer = entry.getPooled();
- ((Buffer)buffer).acquire();
+ RetainableByteBuffer.Pooled buffer = entry.getPooled();
+ ((PooledBuffer)buffer).acquire();
return buffer;
}
return newRetainableByteBuffer(bucket.getCapacity(), direct, buffer -> reserve(bucket, buffer));
}
- private void reserve(RetainedBucket bucket, RetainableByteBuffer buffer)
+ private void reserve(RetainedBucket bucket, RetainableByteBuffer.Pooled buffer)
{
bucket.recordRelease();
// Try to reserve an entry to put the buffer into the pool.
- Pool.Entry entry = bucket.getPool().reserve();
+ Pool.Entry entry = bucket.getPool().reserve();
if (entry == null)
{
bucket.recordNonPooled();
@@ -237,7 +237,7 @@ private void reserve(RetainedBucket bucket, RetainableByteBuffer buffer)
// Add the buffer to the new entry.
ByteBuffer byteBuffer = buffer.getByteBuffer();
BufferUtil.reset(byteBuffer);
- Buffer pooledBuffer = new Buffer(byteBuffer, b -> release(bucket, entry));
+ PooledBuffer pooledBuffer = new PooledBuffer(this, byteBuffer, b -> release(bucket, entry));
if (entry.enable(pooledBuffer, false))
{
checkMaxMemory(bucket, buffer.isDirect());
@@ -249,7 +249,7 @@ private void reserve(RetainedBucket bucket, RetainableByteBuffer buffer)
entry.remove();
}
- private void release(RetainedBucket bucket, Pool.Entry entry)
+ private void release(RetainedBucket bucket, Pool.Entry entry)
{
bucket.recordRelease();
@@ -257,7 +257,7 @@ private void release(RetainedBucket bucket, Pool.Entry ent
BufferUtil.reset(buffer.getByteBuffer());
// Release the buffer and check the memory 1% of the times.
- int used = ((Buffer)buffer).use();
+ int used = ((PooledBuffer)buffer).use();
if (entry.release())
{
if (used % 100 == 0)
@@ -309,15 +309,15 @@ private void evict(long excessMemory, boolean direct)
}
}
- private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer releaser)
+ private RetainableByteBuffer.Pooled newRetainableByteBuffer(int capacity, boolean direct, Consumer releaser)
{
ByteBuffer buffer = BufferUtil.allocate(capacity, direct);
- Buffer retainableByteBuffer = new Buffer(buffer, releaser);
+ PooledBuffer retainableByteBuffer = new PooledBuffer(this, buffer, releaser);
retainableByteBuffer.acquire();
return retainableByteBuffer;
}
- public Pool poolFor(int capacity, boolean direct)
+ public Pool poolFor(int capacity, boolean direct)
{
RetainedBucket bucket = bucketFor(capacity, direct);
return bucket == null ? null : bucket.getPool();
@@ -445,7 +445,7 @@ private class RetainedBucket
private final LongAdder _evicts = new LongAdder();
private final LongAdder _removes = new LongAdder();
private final LongAdder _releases = new LongAdder();
- private final Pool _pool;
+ private final Pool _pool;
private final int _capacity;
private RetainedBucket(int capacity, int poolSize)
@@ -501,14 +501,14 @@ private int getCapacity()
return _capacity;
}
- private Pool getPool()
+ private Pool getPool()
{
return _pool;
}
private int evict()
{
- Pool.Entry entry;
+ Pool.Entry entry;
if (_pool instanceof BucketCompoundPool compound)
entry = compound.evict();
else
@@ -539,7 +539,7 @@ public String toString()
{
int entries = 0;
int inUse = 0;
- for (Pool.Entry entry : getPool().stream().toList())
+ for (Pool.Entry entry : getPool().stream().toList())
{
entries++;
if (entry.isInUse())
@@ -564,16 +564,16 @@ public String toString()
);
}
- private static class BucketCompoundPool extends CompoundPool
+ private static class BucketCompoundPool extends CompoundPool
{
- private BucketCompoundPool(ConcurrentPool concurrentBucket, QueuedPool queuedBucket)
+ private BucketCompoundPool(ConcurrentPool concurrentBucket, QueuedPool queuedBucket)
{
super(concurrentBucket, queuedBucket);
}
- private Pool.Entry evict()
+ private Pool.Entry evict()
{
- Entry entry = getSecondaryPool().acquire();
+ Entry entry = getSecondaryPool().acquire();
if (entry == null)
entry = getPrimaryPool().acquire();
return entry;
@@ -581,14 +581,19 @@ private Pool.Entry evict()
}
}
- private static class Buffer extends AbstractRetainableByteBuffer
+ private static class PooledBuffer extends RetainableByteBuffer.Pooled
{
- private final Consumer _releaser;
+ private final Consumer _releaser;
+ private final ReferenceCounter _referenceCounter;
private int _usages;
- private Buffer(ByteBuffer buffer, Consumer releaser)
+ private PooledBuffer(ByteBufferPool pool, ByteBuffer buffer, Consumer releaser)
{
- super(buffer);
+ super(pool, buffer, new ReferenceCounter(0));
+ if (getWrapped() instanceof ReferenceCounter referenceCounter)
+ _referenceCounter = referenceCounter;
+ else
+ throw new IllegalArgumentException();
this._releaser = releaser;
}
@@ -610,13 +615,24 @@ private int use()
_usages = 0;
return _usages;
}
+
+ /**
+ * @see ReferenceCounter#acquire()
+ */
+ protected void acquire()
+ {
+ _referenceCounter.acquire();
+ }
}
/**
* A variant of the {@link ArrayByteBufferPool} that
* uses buckets of buffers that increase in size by a power of
* 2 (e.g. 1k, 2k, 4k, 8k, etc.).
+ * @deprecated Usage of {@code Quadratic} is often wasteful of additional space and can increase contention on
+ * the larger buffers.
*/
+ @Deprecated(forRemoval = true, since = "12.1.0")
public static class Quadratic extends ArrayByteBufferPool
{
public Quadratic()
@@ -647,14 +663,14 @@ public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHe
*
A variant of {@link ArrayByteBufferPool} that tracks buffer
* acquires/releases, useful to identify buffer leaks.
*
Use {@link #getLeaks()} when the system is idle to get
- * the {@link Buffer}s that have been leaked, which contain
+ * the {@link TrackedBuffer}s that have been leaked, which contain
* the stack trace information of where the buffer was acquired.
*/
public static class Tracking extends ArrayByteBufferPool
{
private static final Logger LOG = LoggerFactory.getLogger(Tracking.class);
- private final Set buffers = ConcurrentHashMap.newKeySet();
+ private final Set buffers = ConcurrentHashMap.newKeySet();
public Tracking()
{
@@ -666,23 +682,33 @@ public Tracking(int minCapacity, int maxCapacity, int maxBucketSize)
super(minCapacity, maxCapacity, maxBucketSize);
}
+ public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
+ {
+ super(minCapacity, factor, maxCapacity, maxBucketSize);
+ }
+
public Tracking(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, -1, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
}
+ public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
+ {
+ super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
+ }
+
@Override
- public RetainableByteBuffer acquire(int size, boolean direct)
+ public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
- RetainableByteBuffer buffer = super.acquire(size, direct);
- Buffer wrapper = new Buffer(buffer, size);
+ RetainableByteBuffer.Mutable buffer = super.acquire(size, direct);
+ TrackedBuffer wrapper = new TrackedBuffer(buffer, size);
if (LOG.isDebugEnabled())
LOG.debug("acquired {}", wrapper);
buffers.add(wrapper);
return wrapper;
}
- public Set getLeaks()
+ public Set getLeaks()
{
return buffers;
}
@@ -690,11 +716,11 @@ public Set getLeaks()
public String dumpLeaks()
{
return getLeaks().stream()
- .map(Buffer::dump)
+ .map(TrackedBuffer::dump)
.collect(Collectors.joining(System.lineSeparator()));
}
- public class Buffer extends RetainableByteBuffer.Wrapper
+ public class TrackedBuffer extends RetainableByteBuffer.FixedCapacity
{
private final int size;
private final Instant acquireInstant;
@@ -703,12 +729,12 @@ public class Buffer extends RetainableByteBuffer.Wrapper
private final List releaseStacks = new CopyOnWriteArrayList<>();
private final List overReleaseStacks = new CopyOnWriteArrayList<>();
- private Buffer(RetainableByteBuffer wrapped, int size)
+ private TrackedBuffer(RetainableByteBuffer.Mutable wrapped, int size)
{
- super(wrapped);
+ super(wrapped.getByteBuffer(), wrapped);
this.size = size;
this.acquireInstant = Instant.now();
- this.acquireStack = new Throwable();
+ this.acquireStack = new Throwable(Thread.currentThread().getName());
}
public int getSize()
@@ -726,11 +752,39 @@ public Throwable getAcquireStack()
return acquireStack;
}
+ @Override
+ public RetainableByteBuffer slice()
+ {
+ RetainableByteBuffer slice = super.slice();
+ return new Mutable.Wrapper(slice)
+ {
+ @Override
+ public boolean release()
+ {
+ return TrackedBuffer.this.release();
+ }
+ };
+ }
+
+ @Override
+ public RetainableByteBuffer slice(long length)
+ {
+ RetainableByteBuffer slice = super.slice(length);
+ return new Mutable.Wrapper(slice)
+ {
+ @Override
+ public boolean release()
+ {
+ return TrackedBuffer.this.release();
+ }
+ };
+ }
+
@Override
public void retain()
{
super.retain();
- retainStacks.add(new Throwable());
+ retainStacks.add(new Throwable(Thread.currentThread().getName()));
}
@Override
@@ -751,11 +805,18 @@ public boolean release()
catch (IllegalStateException e)
{
buffers.add(this);
- overReleaseStacks.add(new Throwable());
+ overReleaseStacks.add(new Throwable(Thread.currentThread().getName()));
throw e;
}
}
+ @Override
+ protected void addExtraStringInfo(StringBuilder builder)
+ {
+ builder.append(",@");
+ builder.append(Integer.toHexString(System.identityHashCode(getWrapped())));
+ }
+
public String dump()
{
StringWriter w = new StringWriter();
@@ -776,7 +837,7 @@ public String dump()
{
overReleaseStack.printStackTrace(pw);
}
- return "%s@%x of %d bytes on %s wrapping %s acquired at %s".formatted(getClass().getSimpleName(), hashCode(), getSize(), getAcquireInstant(), getWrapped(), w);
+ return "%s@%x of %d bytes on %s wrapping %s acquired at %s".formatted(getClass().getSimpleName(), hashCode(), getSize(), getAcquireInstant(), getRetained(), w);
}
}
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java
index ccfa44212228..898bbf0fee72 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java
@@ -52,62 +52,81 @@ private static SocketAddress noSocketAddress()
private static final Logger LOG = LoggerFactory.getLogger(ByteArrayEndPoint.class);
private static final SocketAddress NO_SOCKET_ADDRESS = noSocketAddress();
- private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 1024;
private static final ByteBuffer EOF = BufferUtil.allocate(0);
private final Runnable _runFillable = () -> getFillInterest().fillable();
private final AutoLock _lock = new AutoLock();
private final Condition _hasOutput = _lock.newCondition();
private final Queue _inQ = new ArrayDeque<>();
- private final int _outputSize;
- private ByteBuffer _out;
- private boolean _growOutput;
+ private final RetainableByteBuffer.DynamicCapacity _buffer;
public ByteArrayEndPoint()
{
- this(null, 0, null, null);
+ this(null, 0, null, -1, false);
}
/**
* @param input the input bytes
- * @param outputSize the output size
+ * @param outputSize the output size or -1 for default
*/
public ByteArrayEndPoint(byte[] input, int outputSize)
{
- this(null, 0, input != null ? BufferUtil.toBuffer(input) : null, BufferUtil.allocate(outputSize));
+ this(null, 0, input != null ? BufferUtil.toBuffer(input) : null, outputSize, false);
}
/**
* @param input the input string (converted to bytes using default encoding charset)
- * @param outputSize the output size
+ * @param outputSize the output size or -1 for default
*/
public ByteArrayEndPoint(String input, int outputSize)
{
- this(null, 0, input != null ? BufferUtil.toBuffer(input) : null, BufferUtil.allocate(outputSize));
+ this(null, 0, input != null ? BufferUtil.toBuffer(input) : null, outputSize, false);
+ }
+
+ /**
+ * @param input the input bytes
+ * @param outputSize the output size or -1 for default
+ * @param growable {@code true} if the output buffer may grow
+ */
+ public ByteArrayEndPoint(byte[] input, int outputSize, boolean growable)
+ {
+ this(null, 0, input != null ? BufferUtil.toBuffer(input) : null, outputSize, growable);
+ }
+
+ /**
+ * @param input the input string (converted to bytes using default encoding charset)
+ * @param outputSize the output size or -1 for default
+ * @param growable {@code true} if the output buffer may grow
+ */
+ public ByteArrayEndPoint(String input, int outputSize, boolean growable)
+ {
+ this(null, 0, input != null ? BufferUtil.toBuffer(input) : null, outputSize, growable);
}
public ByteArrayEndPoint(Scheduler scheduler, long idleTimeoutMs)
{
- this(scheduler, idleTimeoutMs, null, null);
+ this(scheduler, idleTimeoutMs, null, -1, false);
}
public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, byte[] input, int outputSize)
{
- this(timer, idleTimeoutMs, input != null ? BufferUtil.toBuffer(input) : null, BufferUtil.allocate(outputSize));
+ this(timer, idleTimeoutMs, input != null ? BufferUtil.toBuffer(input) : null, outputSize, false);
}
public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, String input, int outputSize)
{
- this(timer, idleTimeoutMs, input != null ? BufferUtil.toBuffer(input) : null, BufferUtil.allocate(outputSize));
+ this(timer, idleTimeoutMs, input != null ? BufferUtil.toBuffer(input) : null, outputSize, false);
}
- public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, ByteBuffer input, ByteBuffer output)
+ public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, ByteBuffer input, int outputSize, boolean growable)
{
super(timer);
if (BufferUtil.hasContent(input))
addInput(input);
- _outputSize = (output == null) ? 1024 : output.capacity();
- _out = output == null ? BufferUtil.allocate(_outputSize) : output;
+
+ _buffer = growable
+ ? new RetainableByteBuffer.DynamicCapacity(null, false, -1, outputSize)
+ : new RetainableByteBuffer.DynamicCapacity(null, false, outputSize);
setIdleTimeout(idleTimeoutMs);
onOpen();
}
@@ -158,7 +177,7 @@ protected void execute(Runnable task)
@Override
protected void needsFillInterest() throws IOException
{
- try (AutoLock lock = _lock.lock())
+ try (AutoLock ignored = _lock.lock())
{
if (!isOpen())
throw new ClosedChannelException();
@@ -185,7 +204,7 @@ public void addInputEOF()
public void addInput(ByteBuffer in)
{
boolean fillable = false;
- try (AutoLock lock = _lock.lock())
+ try (AutoLock ignored = _lock.lock())
{
if (isEOF(_inQ.peek()))
throw new RuntimeIOException(new EOFException());
@@ -227,7 +246,7 @@ public void addInputAndExecute(String s)
public void addInputAndExecute(ByteBuffer in)
{
boolean fillable = false;
- try (AutoLock lock = _lock.lock())
+ try (AutoLock ignored = _lock.lock())
{
if (isEOF(_inQ.peek()))
throw new RuntimeIOException(new EOFException());
@@ -256,9 +275,9 @@ public void addInputAndExecute(ByteBuffer in)
*/
public ByteBuffer getOutput()
{
- try (AutoLock lock = _lock.lock())
+ try (AutoLock ignored = _lock.lock())
{
- return _out;
+ return _buffer.getByteBuffer();
}
}
@@ -276,7 +295,7 @@ public String getOutputString()
*/
public String getOutputString(Charset charset)
{
- return BufferUtil.toString(_out, charset);
+ return BufferUtil.toString(getOutput(), charset);
}
/**
@@ -284,15 +303,14 @@ public String getOutputString(Charset charset)
*/
public ByteBuffer takeOutput()
{
- ByteBuffer b;
+ ByteBuffer taken;
- try (AutoLock lock = _lock.lock())
+ try (AutoLock ignored = _lock.lock())
{
- b = _out;
- _out = BufferUtil.allocate(_outputSize);
+ taken = _buffer.take().getByteBuffer();
}
getWriteFlusher().completeWrite();
- return b;
+ return taken;
}
/**
@@ -305,20 +323,19 @@ public ByteBuffer takeOutput()
*/
public ByteBuffer waitForOutput(long time, TimeUnit unit) throws InterruptedException
{
- ByteBuffer b;
+ ByteBuffer taken;
- try (AutoLock l = _lock.lock())
+ try (AutoLock ignored = _lock.lock())
{
- while (BufferUtil.isEmpty(_out) && !isOutputShutdown())
+ while (_buffer.isEmpty() && !isOutputShutdown())
{
if (!_hasOutput.await(time, unit))
return null;
}
- b = _out;
- _out = BufferUtil.allocate(_outputSize);
+ taken = _buffer.take().getByteBuffer();
}
getWriteFlusher().completeWrite();
- return b;
+ return taken;
}
/**
@@ -342,13 +359,10 @@ public String takeOutputString(Charset charset)
/**
* @param out The out to set.
*/
+ @Deprecated
public void setOutput(ByteBuffer out)
{
- try (AutoLock lock = _lock.lock())
- {
- _out = out;
- }
- getWriteFlusher().completeWrite();
+ throw new UnsupportedOperationException();
}
/**
@@ -363,7 +377,7 @@ public boolean hasMore()
public int fill(ByteBuffer buffer) throws IOException
{
int filled = 0;
- try (AutoLock lock = _lock.lock())
+ try (AutoLock ignored = _lock.lock())
{
while (true)
{
@@ -405,62 +419,42 @@ else if (filled < 0)
public boolean flush(ByteBuffer... buffers) throws IOException
{
boolean flushed = true;
- try (AutoLock l = _lock.lock())
+ try (AutoLock ignored = _lock.lock())
{
if (!isOpen())
throw new IOException("CLOSED");
if (isOutputShutdown())
throw new IOException("OSHUT");
- boolean idle = true;
+ boolean notIdle = false;
for (ByteBuffer b : buffers)
{
- if (BufferUtil.hasContent(b))
- {
- if (_growOutput && b.remaining() > BufferUtil.space(_out))
- {
- BufferUtil.compact(_out);
- if (b.remaining() > BufferUtil.space(_out))
- {
- // Don't grow larger than MAX_BUFFER_SIZE to avoid memory issues.
- if (_out.capacity() < MAX_BUFFER_SIZE)
- {
- long newBufferCapacity = Math.min((long)(_out.capacity() + b.remaining() * 1.5), MAX_BUFFER_SIZE);
- ByteBuffer n = BufferUtil.allocate(Math.toIntExact(newBufferCapacity));
- BufferUtil.append(n, _out);
- _out = n;
- }
- }
- }
-
- if (BufferUtil.append(_out, b) > 0)
- idle = false;
-
- if (BufferUtil.hasContent(b))
- {
- flushed = false;
- break;
- }
- }
+ int remaining = b.remaining();
+ flushed = _buffer.append(b);
+ notIdle |= b.remaining() < remaining;
+ if (!flushed)
+ break;
}
- if (!idle)
+
+ if (notIdle)
{
notIdle();
_hasOutput.signalAll();
}
+
+ return flushed;
}
- return flushed;
}
@Override
public void reset()
{
- try (AutoLock l = _lock.lock())
+ try (AutoLock ignored = _lock.lock())
{
_inQ.clear();
_hasOutput.signalAll();
- BufferUtil.clear(_out);
+ _buffer.clear();
}
super.reset();
}
@@ -476,16 +470,17 @@ public Object getTransport()
*/
public boolean isGrowOutput()
{
- return _growOutput;
+ return _buffer instanceof RetainableByteBuffer.DynamicCapacity;
}
/**
* Set the growOutput to set.
* @param growOutput the growOutput to set
*/
+ @Deprecated
public void setGrowOutput(boolean growOutput)
{
- _growOutput = growOutput;
+ throw new UnsupportedOperationException();
}
@Override
@@ -499,7 +494,7 @@ public String toString()
boolean held = lock.isHeldByCurrentThread();
q = held ? _inQ.size() : -1;
b = held ? _inQ.peek() : "?";
- o = held ? BufferUtil.toDetailString(_out) : "?";
+ o = held ? _buffer.toString() : "?";
}
return String.format("%s[q=%d,q[0]=%s,o=%s]", super.toString(), q, b, o);
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java
index 6df27c6543f7..dffc12e28952 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java
@@ -29,8 +29,9 @@
* The method {@link #ensureBuffer(int, int)} is used to write directly to the last buffer stored in the buffer list,
* if there is less than a certain amount of space available in that buffer then a new one will be allocated and returned instead.
* @see #ensureBuffer(int, int)
+ * @deprecated Use {@link RetainableByteBuffer.DynamicCapacity}
*/
-// TODO: rename to *Aggregator to avoid confusion with RBBP.Accumulator?
+@Deprecated(forRemoval = true)
public class ByteBufferAccumulator implements AutoCloseable
{
private final List _buffers = new ArrayList<>();
@@ -44,7 +45,7 @@ public ByteBufferAccumulator()
public ByteBufferAccumulator(ByteBufferPool bufferPool, boolean direct)
{
- _bufferPool = (bufferPool == null) ? ByteBufferPool.NON_POOLING : bufferPool;
+ _bufferPool = (bufferPool == null) ? new ByteBufferPool.NonPooling() : bufferPool;
_direct = direct;
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java
index 3f2939c31dc2..3469c367ee5c 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java
@@ -26,7 +26,9 @@
* Once the buffer is full, the aggregator will not aggregate any more bytes until its buffer is taken out,
* after which a new aggregate/take buffer cycle can start.
*
The buffers are taken from the supplied {@link ByteBufferPool} or freshly allocated if one is not supplied.
+ * @deprecated Use {@link RetainableByteBuffer.DynamicCapacity}
*/
+@Deprecated(forRemoval = true)
public class ByteBufferAggregator
{
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferAggregator.class);
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java
index 04df7e921ebe..eab85eb5fcc7 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java
@@ -25,7 +25,9 @@
* these into a single {@link ByteBuffer} or byte array and succeed the callbacks.
*
*
This class is not thread safe and callers must do mutual exclusion.
+ * @deprecated Use {@link RetainableByteBuffer.DynamicCapacity}
*/
+@Deprecated
public class ByteBufferCallbackAccumulator
{
private final List _entries = new ArrayList<>();
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java
index 6ac69d9dac32..b993f4058d48 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferOutputStream2.java
@@ -17,16 +17,19 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.eclipse.jetty.util.Blocker;
+import org.eclipse.jetty.util.BufferUtil;
+
/**
- * This class implements an output stream in which the data is written into a list of ByteBuffer,
- * the buffer list automatically grows as data is written to it, the buffers are taken from the
- * supplied {@link ByteBufferPool} or freshly allocated if one is not supplied.
- *
+ * This class implements an output stream in which the data is buffered.
+ *
* Designed to mimic {@link java.io.ByteArrayOutputStream} but with better memory usage, and less copying.
+ * @deprecated Use {@link Content.Sink#asBuffered(Content.Sink, ByteBufferPool, boolean, int, int)}
*/
+@Deprecated
public class ByteBufferOutputStream2 extends OutputStream
{
- private final ByteBufferAccumulator _accumulator;
+ private final RetainableByteBuffer.DynamicCapacity _accumulator;
private int _size = 0;
public ByteBufferOutputStream2()
@@ -36,7 +39,7 @@ public ByteBufferOutputStream2()
public ByteBufferOutputStream2(ByteBufferPool bufferPool, boolean direct)
{
- _accumulator = new ByteBufferAccumulator(bufferPool == null ? ByteBufferPool.NON_POOLING : bufferPool, direct);
+ _accumulator = new RetainableByteBuffer.DynamicCapacity(bufferPool, direct, -1);
}
/**
@@ -46,7 +49,7 @@ public ByteBufferOutputStream2(ByteBufferPool bufferPool, boolean direct)
*/
public RetainableByteBuffer takeByteBuffer()
{
- return _accumulator.takeRetainableByteBuffer();
+ return _accumulator.take();
}
/**
@@ -57,7 +60,7 @@ public RetainableByteBuffer takeByteBuffer()
*/
public RetainableByteBuffer toByteBuffer()
{
- return _accumulator.toRetainableByteBuffer();
+ return _accumulator;
}
/**
@@ -65,7 +68,7 @@ public RetainableByteBuffer toByteBuffer()
*/
public byte[] toByteArray()
{
- return _accumulator.toByteArray();
+ return BufferUtil.toArray(_accumulator.getByteBuffer());
}
public int size()
@@ -83,30 +86,33 @@ public void write(int b)
public void write(byte[] b, int off, int len)
{
_size += len;
- _accumulator.copyBytes(b, off, len);
+ _accumulator.append(ByteBuffer.wrap(b, off, len));
}
public void write(ByteBuffer buffer)
{
_size += buffer.remaining();
- _accumulator.copyBuffer(buffer);
+ _accumulator.append(buffer);
}
public void writeTo(ByteBuffer buffer)
{
- _accumulator.writeTo(buffer);
+ _accumulator.putTo(buffer);
}
public void writeTo(OutputStream out) throws IOException
{
- _accumulator.writeTo(out);
+ try (Blocker.Callback callback = Blocker.callback())
+ {
+ _accumulator.writeTo(Content.Sink.from(out), false, callback);
+ callback.block();
+ }
}
@Override
public void close()
{
- _accumulator.close();
- _size = 0;
+ _accumulator.clear();
}
@Override
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
index 292fc7f7d0ce..d67732dd4542 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
@@ -54,7 +54,7 @@ public interface ByteBufferPool
* @param direct true if a direct memory buffer is needed, false otherwise.
* @return a {@link RetainableByteBuffer} with position and limit set to 0.
*/
- RetainableByteBuffer acquire(int size, boolean direct);
+ RetainableByteBuffer.Mutable acquire(int size, boolean direct);
/**
*
Removes all {@link RetainableByteBuffer#isRetained() non-retained}
@@ -80,7 +80,7 @@ public ByteBufferPool getWrapped()
}
@Override
- public RetainableByteBuffer acquire(int size, boolean direct)
+ public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
return getWrapped().acquire(size, direct);
}
@@ -107,24 +107,15 @@ public void clear()
class NonPooling implements ByteBufferPool
{
@Override
- public RetainableByteBuffer acquire(int size, boolean direct)
+ public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
- return new Buffer(BufferUtil.allocate(size, direct));
+ return RetainableByteBuffer.wrap(BufferUtil.allocate(size, direct)).asMutable();
}
@Override
public void clear()
{
}
-
- private static class Buffer extends AbstractRetainableByteBuffer
- {
- private Buffer(ByteBuffer byteBuffer)
- {
- super(byteBuffer);
- acquire();
- }
- }
}
/**
@@ -135,7 +126,9 @@ private Buffer(ByteBuffer byteBuffer)
* or {@link #insert(int, RetainableByteBuffer) inserted} at a
* specific position in the sequence, and then
* {@link #release() released} when they are consumed.
+ * @deprecated use {@link RetainableByteBuffer.DynamicCapacity}
*/
+ @Deprecated (forRemoval = true)
class Accumulator
{
private final List buffers = new ArrayList<>();
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java
index c0c37eb34111..de5db2b532f2 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java
@@ -27,7 +27,9 @@
/**
* An accumulator of {@link Content.Chunk}s used to facilitate minimal copy
* aggregation of multiple chunks.
+ * @deprecated use {@link RetainableByteBuffer.DynamicCapacity}
*/
+@Deprecated (forRemoval = true, since = "12.1.0")
public class ChunkAccumulator
{
private static final ByteBufferPool NON_POOLING = new ByteBufferPool.NonPooling();
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
index abde44ec95a9..1335de528d5d 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
@@ -13,10 +13,14 @@
package org.eclipse.jetty.io;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousByteChannel;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@@ -33,6 +37,7 @@
import org.eclipse.jetty.io.internal.ContentCopier;
import org.eclipse.jetty.io.internal.ContentSourceByteBuffer;
import org.eclipse.jetty.io.internal.ContentSourceConsumer;
+import org.eclipse.jetty.io.internal.ContentSourceRetainableByteBuffer;
import org.eclipse.jetty.io.internal.ContentSourceString;
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.BufferUtil;
@@ -193,7 +198,13 @@ static ByteBuffer asByteBuffer(Source source) throws IOException
*/
static CompletableFuture asByteArrayAsync(Source source, int maxSize)
{
- return new ChunkAccumulator().readAll(source, maxSize);
+ return asRetainableByteBuffer(source, null, false, maxSize).thenApply(rbb ->
+ {
+ int remaining = rbb.remaining();
+ byte[] bytes = new byte[remaining];
+ rbb.get(bytes, 0, remaining);
+ return bytes;
+ });
}
/**
@@ -216,7 +227,12 @@ static CompletableFuture asByteBufferAsync(Source source)
*/
static CompletableFuture asByteBufferAsync(Source source, int maxSize)
{
- return asByteArrayAsync(source, maxSize).thenApply(ByteBuffer::wrap);
+ return asRetainableByteBuffer(source, null, false, maxSize).thenApply(rbb ->
+ {
+ ByteBuffer byteBuffer = rbb.getByteBuffer();
+ rbb.release(); // safe as the buffer is known not to be pooled
+ return byteBuffer;
+ });
}
/**
@@ -231,7 +247,31 @@ static CompletableFuture asByteBufferAsync(Source source, int maxSiz
*/
static CompletableFuture asRetainableByteBuffer(Source source, ByteBufferPool pool, boolean direct, int maxSize)
{
- return new ChunkAccumulator().readAll(source, pool, direct, maxSize);
+ Promise.Completable promise = new Promise.Completable<>()
+ {
+ @Override
+ public void succeeded(RetainableByteBuffer result)
+ {
+ result.retain();
+ super.succeeded(result);
+ }
+ };
+ asRetainableByteBuffer(source, pool, direct, maxSize, promise);
+ return promise;
+ }
+
+ /**
+ *
Reads, non-blocking, the whole content source into a {@link RetainableByteBuffer}.
+ *
+ * @param source the source to read
+ * @param pool The {@link ByteBufferPool} to acquire the buffer from, or null for a non {@link Retainable} buffer
+ * @param direct True if the buffer should be direct.
+ * @param maxSize The maximum size to read, or -1 for no limit
+ * @param promise the promise to notify when the whole content has been read into a RetainableByteBuffer.
+ */
+ static void asRetainableByteBuffer(Source source, ByteBufferPool pool, boolean direct, int maxSize, Promise promise)
+ {
+ new ContentSourceRetainableByteBuffer(source, pool, direct, maxSize, promise).run();
}
/**
@@ -471,6 +511,144 @@ default boolean rewind()
*/
public interface Sink
{
+ /**
+ *
Wraps the given {@link OutputStream} as a {@link Sink}.
+ * @param out The stream to wrap
+ * @return A sink wrapping the stream
+ */
+ static Sink from(OutputStream out)
+ {
+ return new Sink()
+ {
+ boolean closed;
+
+ @Override
+ public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
+ {
+ if (closed)
+ {
+ callback.failed(new EOFException());
+ return;
+ }
+ try
+ {
+ BufferUtil.writeTo(byteBuffer, out);
+ if (last)
+ {
+ closed = true;
+ out.close();
+ }
+ callback.succeeded();
+ }
+ catch (Throwable t)
+ {
+ callback.failed(t);
+ }
+ }
+ };
+ }
+
+ /**
+ *
Wraps the given {@link ByteChannel} as a {@link Sink}.
+ * @param channel The {@link ByteChannel} to wrap
+ * @return A sink wrapping the stream
+ */
+ static Sink from(ByteChannel channel)
+ {
+ return new Sink()
+ {
+ boolean closed;
+
+ @Override
+ public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
+ {
+ if (closed)
+ {
+ callback.failed(new EOFException());
+ return;
+ }
+ try
+ {
+ int remaining = byteBuffer.remaining();
+ int tries = 0;
+ while (remaining > 0)
+ {
+ int written = channel.write(byteBuffer);
+ if (written > 0)
+ remaining -= written;
+ else if (tries++ > 2)
+ throw new IllegalStateException("ByteChannel in async mode");
+ }
+
+ if (last)
+ {
+ closed = true;
+ channel.close();
+ }
+ callback.succeeded();
+ }
+ catch (Throwable t)
+ {
+ callback.failed(t);
+ }
+ }
+ };
+ }
+
+ /**
+ *
Wraps the given {@link AsynchronousByteChannel} as a {@link Sink}.
+ * @param channel The {@link AsynchronousByteChannel} to wrap
+ * @return A sink wrapping the stream
+ */
+ static Sink from(AsynchronousByteChannel channel)
+ {
+ return new Sink()
+ {
+ boolean closed;
+
+ @Override
+ public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
+ {
+ if (closed)
+ {
+ callback.failed(new EOFException());
+ return;
+ }
+ try
+ {
+ channel.write(byteBuffer, byteBuffer, new CompletionHandler<>()
+ {
+ @Override
+ public void completed(Integer written, ByteBuffer buffer)
+ {
+ if (buffer.hasRemaining())
+ channel.write(buffer, buffer, this);
+ else
+ {
+ if (last)
+ {
+ closed = true;
+ IO.close(channel);
+ }
+ callback.succeeded();
+ }
+ }
+
+ @Override
+ public void failed(Throwable x, ByteBuffer buffer)
+ {
+ callback.failed(x);
+ }
+ });
+ }
+ catch (Throwable t)
+ {
+ callback.failed(t);
+ }
+ }
+ };
+ }
+
/**
*
Wraps the given content sink with a buffering sink.
*
@@ -562,19 +740,34 @@ static void write(Sink sink, boolean last, String utf8Content, Callback callback
* to release the {@code ByteBuffer} back into a pool), or the
* {@link #release()} method overridden.
*/
- public interface Chunk extends Retainable
+ public interface Chunk extends RetainableByteBuffer
{
/**
- *
+ */
+ Chunk EMPTY = new Empty()
+ {
@Override
public boolean isLast()
{
@@ -591,14 +784,8 @@ public String toString()
/**
*
An empty, last, chunk.
*/
- Content.Chunk EOF = new Chunk()
+ Content.Chunk EOF = new Empty()
{
- @Override
- public ByteBuffer getByteBuffer()
- {
- return BufferUtil.EMPTY_BUFFER;
- }
-
@Override
public boolean isLast()
{
@@ -713,19 +900,13 @@ static Chunk from(Throwable failure)
*/
static Chunk from(Throwable failure, boolean last)
{
- return new Chunk()
+ return new Empty()
{
public Throwable getFailure()
{
return failure;
}
- @Override
- public ByteBuffer getByteBuffer()
- {
- return BufferUtil.EMPTY_BUFFER;
- }
-
@Override
public boolean isLast()
{
@@ -805,11 +986,6 @@ static boolean isFailure(Chunk chunk, boolean last)
return chunk != null && chunk.getFailure() != null && chunk.isLast() == last;
}
- /**
- * @return the ByteBuffer of this Chunk
- */
- ByteBuffer getByteBuffer();
-
/**
* Get a failure (which may be from a {@link Source#fail(Throwable) failure} or
* a {@link Source#fail(Throwable, boolean) warning}), if any, associated with the chunk.
@@ -832,59 +1008,10 @@ default Throwable getFailure()
*/
boolean isLast();
- /**
- * @return the number of bytes remaining in this Chunk
- */
- default int remaining()
- {
- return getByteBuffer().remaining();
- }
-
- /**
- * @return whether this Chunk has remaining bytes
- */
- default boolean hasRemaining()
- {
- return getByteBuffer().hasRemaining();
- }
-
- /**
- *
Copies the bytes from this Chunk to the given byte array.
- *
- * @param bytes the byte array to copy the bytes into
- * @param offset the offset within the byte array
- * @param length the maximum number of bytes to copy
- * @return the number of bytes actually copied
- */
- default int get(byte[] bytes, int offset, int length)
- {
- ByteBuffer b = getByteBuffer();
- if (b == null || !b.hasRemaining())
- return 0;
- length = Math.min(length, b.remaining());
- b.get(bytes, offset, length);
- return length;
- }
-
- /**
- *
Skips, advancing the ByteBuffer position, the given number of bytes.
- *
- * @param length the maximum number of bytes to skip
- * @return the number of bytes actually skipped
- */
- default int skip(int length)
- {
- if (length == 0)
- return 0;
- ByteBuffer byteBuffer = getByteBuffer();
- length = Math.min(byteBuffer.remaining(), length);
- byteBuffer.position(byteBuffer.position() + length);
- return length;
- }
-
/**
* @return an immutable version of this Chunk
*/
+ @Deprecated(forRemoval = true, since = "12.1.0")
default Chunk asReadOnly()
{
if (getByteBuffer().isReadOnly())
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java
index 06b056a9ad63..f7b94b31d3a8 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java
@@ -24,6 +24,7 @@
import javax.net.ssl.SSLSession;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Invocable;
@@ -65,7 +66,7 @@
* completable.get();
* }
*/
-public interface EndPoint extends Closeable
+public interface EndPoint extends Closeable, Content.Sink
{
/**
*
Constant returned by {@link #receive(ByteBuffer)} to indicate the end-of-file.
@@ -318,6 +319,36 @@ default void write(Callback callback, SocketAddress address, ByteBuffer... buffe
write(callback, buffers);
}
+ @Override
+ default void write(boolean last, ByteBuffer byteBuffer, Callback callback)
+ {
+ if (last)
+ {
+ write(Callback.from(() ->
+ {
+ try
+ {
+ close();
+ callback.succeeded();
+ }
+ catch (Throwable t)
+ {
+ callback.failed(t);
+ }
+ },
+ x ->
+ {
+ IO.close(this);
+ callback.failed(x);
+ }),
+ byteBuffer);
+ }
+ else
+ {
+ write(callback, byteBuffer);
+ }
+ }
+
/**
* @return the {@link Connection} associated with this EndPoint
* @see #setConnection(Connection)
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java
index 42fdcda26eee..778773c5abd4 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java
@@ -58,23 +58,21 @@ public static RetainableByteBuffer toRetainableByteBuffer(Resource resource, Byt
return RetainableByteBuffer.wrap(ByteBuffer.wrap(memoryResource.getBytes()));
long longLength = resource.length();
- if (longLength > Integer.MAX_VALUE)
- throw new IllegalArgumentException("Resource length exceeds 2 GiB: " + resource);
- int length = (int)longLength;
bufferPool = bufferPool == null ? ByteBufferPool.NON_POOLING : bufferPool;
// Optimize for PathResource.
Path path = resource.getPath();
- if (path != null)
+ if (path != null && longLength < Integer.MAX_VALUE)
{
- RetainableByteBuffer retainableByteBuffer = bufferPool.acquire(length, direct);
+ // TODO convert to a Dynamic once HttpContent uses writeTo semantics
+ RetainableByteBuffer retainableByteBuffer = bufferPool.acquire((int)longLength, direct);
try (SeekableByteChannel seekableByteChannel = Files.newByteChannel(path))
{
long totalRead = 0L;
ByteBuffer byteBuffer = retainableByteBuffer.getByteBuffer();
int pos = BufferUtil.flipToFill(byteBuffer);
- while (totalRead < length)
+ while (totalRead < longLength)
{
int read = seekableByteChannel.read(byteBuffer);
if (read == -1)
@@ -92,26 +90,39 @@ public static RetainableByteBuffer toRetainableByteBuffer(Resource resource, Byt
}
// Fallback to InputStream.
+ RetainableByteBuffer buffer = null;
try (InputStream inputStream = resource.newInputStream())
{
if (inputStream == null)
throw new IllegalArgumentException("Resource does not support InputStream: " + resource);
- ByteBufferAggregator aggregator = new ByteBufferAggregator(bufferPool, direct, length > -1 ? length : 4096, length > -1 ? length : Integer.MAX_VALUE);
- byte[] byteArray = new byte[4096];
+ RetainableByteBuffer.DynamicCapacity retainableByteBuffer = new RetainableByteBuffer.DynamicCapacity(bufferPool, direct, longLength);
while (true)
{
- int read = inputStream.read(byteArray);
+ if (buffer == null)
+ buffer = bufferPool.acquire(8192, false);
+ int read = inputStream.read(buffer.getByteBuffer().array());
if (read == -1)
break;
- aggregator.aggregate(ByteBuffer.wrap(byteArray, 0, read));
+ buffer.getByteBuffer().limit(read);
+ retainableByteBuffer.append(buffer);
+ if (buffer.isRetained())
+ {
+ buffer.release();
+ buffer = null;
+ }
}
- return aggregator.takeRetainableByteBuffer();
+ return retainableByteBuffer;
}
catch (IOException e)
{
throw new RuntimeIOException(e);
}
+ finally
+ {
+ if (buffer != null)
+ buffer.release();
+ }
}
/**
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java
index 0e32781f6279..d92bb6edd866 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java
@@ -48,6 +48,10 @@
*/
public interface Retainable
{
+ Retainable NON_RETAINABLE = new Retainable()
+ {
+ };
+
/**
*
Returns whether this resource is referenced counted by calls to {@link #retain()}
* and {@link #release()}.
A pooled {@link ByteBuffer} which maintains a reference count that is
- * incremented with {@link #retain()} and decremented with {@link #release()}.
- *
The {@code ByteBuffer} is released to a {@link ByteBufferPool}
- * when {@link #release()} is called one more time than {@link #retain()};
- * in such case, the call to {@link #release()} returns {@code true}.
- *
A {@code RetainableByteBuffer} can either be:
+ *
An abstraction over {@link ByteBuffer}s which provides:
*
- *
in pool; in this case {@link #isRetained()} returns {@code false}
- * and calling {@link #release()} throws {@link IllegalStateException}
- *
out of pool but not retained; in this case {@link #isRetained()}
- * returns {@code false} and calling {@link #release()} returns {@code true}
- *
out of pool and retained; in this case {@link #isRetained()}
- * returns {@code true} and calling {@link #release()} returns {@code false}
+ *
{@link Retainable Retainability} so that reference counts can be maintained for shared buffers.
+ *
{@link Pooled Pooled} buffers that use the {@link ByteBufferPool} for any operations and which are returned to the
+ * {@link ByteBufferPool} when fully {@link #release() released}.
+ *
Either {@link FixedCapacity fixed capacity} buffers over a single {@link ByteBuffer} or
+ * {@link DynamicCapacity dynamic capacity} over possible multiple {@link ByteBuffer}s.
+ *
Access APIs to {@link #get() get}, {@link #slice() slice} or {@link #take() take} from
+ * the buffer
+ *
A {@link Mutable Mutable} API variant to {@link Mutable#put(byte) put},
+ * {@link Mutable#append(RetainableByteBuffer) append} or {@link Mutable#add(RetainableByteBuffer) add}
+ * to the buffer
*
+ *
When possible and optimal, implementations will avoid data copies. However, copies may be favoured over retaining
+ * large buffers with small content.
+ *
+ *
Accessing data in the buffer can be achieved via:
+ *
+ *
The {@link #get()}/{@link #get(long)}/{@link #get(byte[], int, int)} methods provide direct access to bytes
+ * within the buffer.
+ *
The {@link #slice()}/{@link #slice(long)} methods for shared access to common backing buffers, but with
+ * independent indexes.
+ *
The {@link #take()}/{@link #take(long)} methods for minimal copy extraction of bulk data.
+ *
Accessing the underlying {@link ByteBuffer} via {@link #getByteBuffer()}, which may coalesce multiple buffers
+ * into a single.
+ *
+ *
The {@code RetainableByteBuffer} APIs are non-modal, meaning that there is no need for any {@link ByteBuffer#flip() flip}
+ * operation between a mutable method and an accessor method.
+ * {@link ByteBuffer} returned or passed to this API should be in "flush" mode, with valid data between the
+ * {@link ByteBuffer#position() position} and {@link ByteBuffer#limit() limit}. The {@link ByteBuffer} returned from
+ * {@link #getByteBuffer()} may used directly and switched to "fill" mode, but it is the callers responsibility to
+ * {@link ByteBuffer#flip() flip} back to "flush" mode, before any {@code RetainableByteBuffer} APIs are used.
+ *
The {@code RetainableByteBuffer} APIs hide any notion of unused space before or after valid data. All indexing is relative
+ * to the first byte of data in the buffer and no manipulation of data pointers is directly supported.
+ *
The buffer may be large and the {@link #size()} is represented as a {@code long} in new APIs. However, APIs that
+ * are tied to a single backing {@link ByteBuffer} may use integer representations of size and indexes.
*/
public interface RetainableByteBuffer extends Retainable
{
/**
* A Zero-capacity, non-retainable {@code RetainableByteBuffer}.
*/
- RetainableByteBuffer EMPTY = wrap(BufferUtil.EMPTY_BUFFER);
+ RetainableByteBuffer EMPTY = new NonRetainableByteBuffer(BufferUtil.EMPTY_BUFFER);
/**
*
Returns a non-retainable {@code RetainableByteBuffer} that wraps
@@ -54,12 +89,12 @@ public interface RetainableByteBuffer extends Retainable
* that may delegate calls to {@link #retain()}.
*
* @param byteBuffer the {@code ByteBuffer} to wrap
- * @return a non-retainable {@code RetainableByteBuffer}
+ * @return a {@link FixedCapacity} buffer wrapping the passed {@link ByteBuffer}
* @see ByteBufferPool.NonPooling
*/
static RetainableByteBuffer wrap(ByteBuffer byteBuffer)
{
- return new NonRetainableByteBuffer(byteBuffer);
+ return new FixedCapacity(byteBuffer);
}
/**
@@ -68,56 +103,152 @@ static RetainableByteBuffer wrap(ByteBuffer byteBuffer)
*
* @param byteBuffer the {@code ByteBuffer} to wrap
* @param retainable the associated {@link Retainable}.
- * @return a {@code RetainableByteBuffer}
+ * @return a {@link FixedCapacity} buffer wrapping the passed {@link ByteBuffer}
* @see ByteBufferPool.NonPooling
*/
static RetainableByteBuffer wrap(ByteBuffer byteBuffer, Retainable retainable)
{
- return new RetainableByteBuffer()
+ return new FixedCapacity(byteBuffer, retainable);
+ }
+
+ /**
+ *
Returns a {@code RetainableByteBuffer} that wraps
+ * the given {@code ByteBuffer} and {@link Runnable} releaser.
+ *
+ * @param byteBuffer the {@code ByteBuffer} to wrap
+ * @param releaser a {@link Runnable} to call when the buffer is released.
+ * @return a {@link FixedCapacity} buffer wrapping the passed {@link ByteBuffer}
+ */
+ static RetainableByteBuffer wrap(ByteBuffer byteBuffer, Runnable releaser)
+ {
+ return new FixedCapacity(byteBuffer)
{
@Override
- public ByteBuffer getByteBuffer()
+ public boolean release()
{
- return byteBuffer;
+ boolean released = super.release();
+ if (released)
+ releaser.run();
+ return released;
}
+ };
+ }
- @Override
- public boolean isRetained()
- {
- throw new UnsupportedOperationException();
- }
+ /**
+ * Check if the underlying implementation is mutable.
+ * Note that the immutable {@link RetainableByteBuffer} API may be backed by a mutable {@link ByteBuffer} or
+ * the {@link Mutable} API may be backed by an immutable {@link ByteBuffer}.
+ * @return whether this buffers implementation is mutable
+ * @see #asMutable()
+ */
+ default boolean isMutable()
+ {
+ return !getByteBuffer().isReadOnly();
+ }
- @Override
- public boolean canRetain()
- {
- return retainable.canRetain();
- }
+ /**
+ * Access this buffer via the {@link Mutable} API.
+ * Note that the {@link Mutable} API may be backed by an immutable {@link ByteBuffer}.
+ * @return An {@link Mutable} representation of this buffer with same data and pointers.
+ * @throws ReadOnlyBufferException If the buffer is not {@link Mutable} or the backing {@link ByteBuffer} is
+ * {@link ByteBuffer#isReadOnly() read-only}.
+ * @see #isMutable()
+ */
+ default Mutable asMutable() throws ReadOnlyBufferException
+ {
+ if (!isMutable() || isRetained())
+ throw new ReadOnlyBufferException();
+ if (this instanceof Mutable mutable)
+ return mutable;
+ throw new ReadOnlyBufferException();
+ }
- @Override
- public void retain()
- {
- retainable.retain();
- }
+ /**
+ * Appends and consumes the contents of this buffer to the passed buffer, limited by the capacity of the target buffer.
+ * @param buffer The buffer to append bytes to, whose limit will be updated.
+ * @return {@code true} if all bytes in this buffer are able to be appended.
+ * @see #putTo(ByteBuffer)
+ */
+ default boolean appendTo(ByteBuffer buffer)
+ {
+ return remaining() == BufferUtil.append(buffer, getByteBuffer());
+ }
- @Override
- public boolean release()
- {
- return retainable.release();
- }
- };
+ /**
+ * Appends and consumes the contents of this buffer to the passed buffer, limited by the capacity of the target buffer.
+ * @param buffer The buffer to append bytes to, whose limit will be updated.
+ * @return {@code true} if all bytes in this buffer are able to be appended.
+ * @see #putTo(ByteBuffer)
+ */
+ default boolean appendTo(RetainableByteBuffer buffer)
+ {
+ return appendTo(buffer.getByteBuffer());
+ }
+
+ /**
+ * Creates a deep copy of this RetainableByteBuffer that is entirely independent
+ * @return A copy of this RetainableByteBuffer
+ */
+ default RetainableByteBuffer copy()
+ {
+ ByteBuffer byteBuffer = getByteBuffer();
+ ByteBuffer copy = BufferUtil.copy(byteBuffer);
+ return new FixedCapacity(copy);
+ }
+
+ /**
+ * Consumes and returns a byte from this RetainableByteBuffer
+ *
+ * @return the byte
+ * @throws BufferUnderflowException if the buffer is empty.
+ * @see #get(byte[], int, int)
+ * @see #get(long)
+ */
+ default byte get() throws BufferUnderflowException
+ {
+ return getByteBuffer().get();
+ }
+
+ /**
+ * Returns a byte from this RetainableByteBuffer at a specific index
+ *
+ * @param index The index relative to the current start of unconsumed data in the buffer.
+ * @return the byte
+ * @throws IndexOutOfBoundsException if the index is too large.
+ */
+ default byte get(long index) throws IndexOutOfBoundsException
+ {
+ ByteBuffer buffer = getByteBuffer();
+ return buffer.get(buffer.position() + Math.toIntExact(index));
}
/**
- * @return whether this instance is retained
- * @see ReferenceCounter#isRetained()
+ * Consumes and copies the bytes from this RetainableByteBuffer to the given byte array.
+ *
+ * @param bytes the byte array to copy the bytes into
+ * @param offset the offset within the byte array
+ * @param length the maximum number of bytes to copy
+ * @return the number of bytes actually copied
*/
- boolean isRetained();
+ default int get(byte[] bytes, int offset, int length)
+ {
+ ByteBuffer b = getByteBuffer();
+ if (b == null || !b.hasRemaining())
+ return 0;
+ length = Math.min(length, b.remaining());
+ b.get(bytes, offset, length);
+ return length;
+ }
/**
* Get the wrapped, not {@code null}, {@code ByteBuffer}.
+ *
If the implementation contains multiple buffers, they are coalesced to a single buffer before being returned.
+ * If the content is too large for a single {@link ByteBuffer}, then the content should be access with
+ * {@link #writeTo(Content.Sink, boolean)}.
* @return the wrapped, not {@code null}, {@code ByteBuffer}
+ * @throws BufferOverflowException if the contents is too large for a single {@link ByteBuffer}
*/
- ByteBuffer getByteBuffer();
+ ByteBuffer getByteBuffer() throws BufferOverflowException;
/**
* @return whether the {@code ByteBuffer} is direct
@@ -129,6 +260,7 @@ default boolean isDirect()
/**
* @return the number of remaining bytes in the {@code ByteBuffer}
+ * @see #size()
*/
default int remaining()
{
@@ -144,7 +276,34 @@ default boolean hasRemaining()
}
/**
- * @return the {@code ByteBuffer} capacity
+ * @return whether the {@code ByteBuffer} has remaining bytes left for reading
+ */
+ default boolean isEmpty()
+ {
+ return !hasRemaining();
+ }
+
+ /**
+ * @return the number of remaining bytes in the {@code ByteBuffer}
+ * @see #remaining()
+ */
+ default long size()
+ {
+ return remaining();
+ }
+
+ /**
+ * @return the maximum size in bytes.
+ * @see #size()
+ */
+ default long maxSize()
+ {
+ return capacity();
+ }
+
+ /**
+ * @return the capacity
+ * @see #maxSize()
*/
default int capacity()
{
@@ -159,10 +318,333 @@ default void clear()
BufferUtil.clear(getByteBuffer());
}
+ /**
+ *
Skips, advancing the ByteBuffer position, the given number of bytes.
+ *
+ * @param length the maximum number of bytes to skip
+ * @return the number of bytes actually skipped
+ */
+ default long skip(long length)
+ {
+ if (length == 0)
+ return 0;
+ ByteBuffer byteBuffer = getByteBuffer();
+ length = Math.min(byteBuffer.remaining(), length);
+ byteBuffer.position(byteBuffer.position() + Math.toIntExact(length));
+ return length;
+ }
+
+ /**
+ *
Limit this buffer's contents to the size.
+ *
+ * @param size the new size of the buffer
+ */
+ default void limit(long size)
+ {
+ ByteBuffer byteBuffer = getByteBuffer();
+ size = Math.min(size, byteBuffer.remaining());
+ byteBuffer.limit(byteBuffer.position() + Math.toIntExact(size));
+ }
+
+ /**
+ * Get a slice of the buffer.
+ * @return A sliced {@link RetainableByteBuffer} sharing this buffers data and reference count, but
+ * with independent position. The buffer is {@link #retain() retained} by this call.
+ * @see #slice(long)
+ */
+ default RetainableByteBuffer slice()
+ {
+ return slice(Long.MAX_VALUE);
+ }
+
+ /**
+ * Get a partial slice of the buffer.
+ * This is equivalent to {@link #slice()}.{@link #limit(long)}, but may be implemented more efficiently.
+ * @param length The number of bytes to slice, which may beyond the limit and less than the capacity, in which case
+ * it will ensure some spare capacity in the slice.
+ * @return A sliced {@link RetainableByteBuffer} sharing the first {@code length} bytes of this buffers data and
+ * reference count, but with independent position. The buffer is {@link #retain() retained} by this call.
+ */
+ default RetainableByteBuffer slice(long length)
+ {
+ int size = remaining();
+ ByteBuffer byteBuffer = getByteBuffer();
+ int limit = byteBuffer.limit();
+
+ byteBuffer.limit(byteBuffer.position() + Math.toIntExact(Math.min(length, size)));
+ ByteBuffer slice = byteBuffer.slice();
+ byteBuffer.limit(limit);
+ if (length > size)
+ slice.limit(size);
+
+ if (!canRetain())
+ return new NonRetainableByteBuffer(slice);
+
+ retain();
+ return RetainableByteBuffer.wrap(slice, this);
+ }
+
+ /**
+ * Take the contents of this buffer, from the head, leaving remaining bytes in this buffer.
+ * This is similar to {@link #slice(long)} followed by a {@link #skip(long)}, but avoids shared data.
+ * @param length The number of bytes to take
+ * @return A buffer with the contents of this buffer after limiting bytes, avoiding copies if possible,
+ * but with no shared internal buffers.
+ */
+ default RetainableByteBuffer take(long length)
+ {
+ if (isEmpty() || length == 0)
+ return EMPTY;
+
+ RetainableByteBuffer slice = slice(length);
+ skip(length);
+ if (slice.isRetained())
+ {
+ RetainableByteBuffer copy = slice.copy();
+ slice.release();
+ return copy;
+ }
+ return slice;
+ }
+
+ /**
+ * Take the contents of this buffer, from the tail, leaving remaining bytes in this buffer.
+ * @param skip The number of bytes to skip before taking the tail.
+ * @return A buffer with the contents of this buffer after skipping bytes, avoiding copies if possible,
+ * but with no shared internal buffers.
+ */
+ default RetainableByteBuffer takeFrom(long skip)
+ {
+ if (isEmpty() || skip > size())
+ return EMPTY;
+
+ RetainableByteBuffer slice = slice();
+ slice.skip(skip);
+ limit(skip);
+ if (slice.isRetained())
+ {
+ RetainableByteBuffer copy = slice.copy();
+ slice.release();
+ return copy;
+ }
+ return slice;
+ }
+
+ /**
+ * Take the contents of this buffer, leaving it clear.
+ * @return A buffer with the contents of this buffer, avoiding copies if possible.
+ * @see #take(long)
+ * @see #takeFrom(long)
+ */
+ default RetainableByteBuffer take()
+ {
+ return take(Long.MAX_VALUE);
+ }
+
+ /**
+ * Consumes and puts the contents of this retainable byte buffer at the end of the given byte buffer.
+ * @param toInfillMode the destination buffer, whose position is updated.
+ * @throws BufferOverflowException – If there is insufficient space in this buffer for the remaining bytes in the source buffer
+ * @see ByteBuffer#put(ByteBuffer)
+ */
+ default void putTo(ByteBuffer toInfillMode) throws BufferOverflowException
+ {
+ toInfillMode.put(getByteBuffer());
+ }
+
+ /**
+ * Asynchronously writes and consumes the contents of this retainable byte buffer into the given sink.
+ * @param sink the destination sink.
+ * @param last true if this is the last write.
+ * @param callback the callback to call upon the write completion.
+ * @see org.eclipse.jetty.io.Content.Sink#write(boolean, ByteBuffer, Callback)
+ */
+ default void writeTo(Content.Sink sink, boolean last, Callback callback)
+ {
+ sink.write(last, getByteBuffer(), callback);
+ }
+
+ /**
+ * Writes and consumes the contents of this retainable byte buffer into the given sink.
+ * @param sink the destination sink.
+ * @param last true if this is the last write.
+ * @see org.eclipse.jetty.io.Content.Sink#write(boolean, ByteBuffer, Callback)
+ */
+ default void writeTo(Content.Sink sink, boolean last) throws IOException
+ {
+ try (Blocker.Callback callback = Blocker.callback())
+ {
+ sink.write(last, getByteBuffer(), callback);
+ callback.block();
+ }
+ }
+
+ /**
+ * @return A string showing the info and detail about this buffer, as well as a summary of the contents
+ */
+ default String toDetailString()
+ {
+ return toString();
+ }
+
+ /**
+ * Extended {@link RetainableByteBuffer} API with mutator methods.
+ * The mutator methods come in the following styles:
+ *
+ *
{@code put} methods are used for putting raw bytes into the buffer and are
+ * similar to {@link ByteBuffer#put(byte)} etc. {@code Put} methods may be used in fluent style.
+ *
{@code add} methods are used for handing over an external buffer to be managed by
+ * this buffer. External buffers are passed by reference and the caller will not longer manage the added buffer.
+ * {@code Add} methods may be used in fluent style.
+ *
{@code append} methods are used for handing over the content of a buffer to be included in this buffer.
+ * The caller may still use the passed buffer and is responsible for eventually releasing it.
+ *
+ *
+ */
+ interface Mutable extends RetainableByteBuffer
+ {
+ /**
+ * @return the number of bytes that can be added, appended or put into this buffer.
+ */
+ default long space()
+ {
+ return capacity() - remaining();
+ }
+
+ /**
+ * @return true if the {@link #size()} is equals to the {@link #maxSize()} and no more bytes can be added, appended
+ * or put to this buffer.
+ */
+ default boolean isFull()
+ {
+ return space() == 0;
+ }
+
+ /**
+ * Add the passed {@link ByteBuffer} to this buffer, growing this buffer if necessary and possible.
+ * The source {@link ByteBuffer} is passed by reference and the caller gives up "ownership", so implementations of
+ * this method may choose to avoid copies by keeping a reference to the buffer.
+ * @param bytes the byte buffer to add, which is passed by reference and is not necessarily consumed by the add.
+ * @return {@code this} buffer.
+ * @throws ReadOnlyBufferException if this buffer is read only.
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ * @see #append(ByteBuffer)
+ */
+ Mutable add(ByteBuffer bytes) throws ReadOnlyBufferException, BufferOverflowException;
+
+ /**
+ * Add the passed {@link RetainableByteBuffer} to this buffer, growing this buffer if necessary and possible.
+ * The source {@link RetainableByteBuffer} is passed by reference and the caller gives up ownership, so
+ * implementations of this method may avoid copies by keeping a reference to the buffer.
+ * Unlike the similar {@link #append(RetainableByteBuffer)} and contrary to the general rules of {@link Retainable},
+ * implementations of this method need not call {@link #retain()} if keeping a reference, but they must ultimately
+ * call {@link #release()} the passed buffer.
+ * Callers should use {@code add} rather than {@link #append(RetainableByteBuffer)} if they already have an obligation
+ * to release the buffer and wish to delegate that obligation to this buffer.
+ * @param bytes the byte buffer to add, which is passed by reference and is not necessarily consumed by the add.
+ * @return {@code this} buffer.
+ * @throws ReadOnlyBufferException if this buffer is read only.
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ Mutable add(RetainableByteBuffer bytes) throws ReadOnlyBufferException, BufferOverflowException;
+
+ /**
+ * Copies the contents of the given byte buffer to the end of this buffer, growing this buffer if
+ * necessary and possible.
+ * @param bytes the byte buffer to copy from, which is consumed.
+ * @return true if all bytes of the given buffer were copied, false otherwise.
+ * @throws ReadOnlyBufferException if this buffer is read only.
+ * @see #add(ByteBuffer)
+ */
+ boolean append(ByteBuffer bytes) throws ReadOnlyBufferException;
+
+ /**
+ * Retain or copy the contents of the given retainable byte buffer to the end of this buffer,
+ * growing this buffer if necessary and possible.
+ * The implementation will heuristically decide to retain or copy the contents
+ * Unlike the similar {@link #add(RetainableByteBuffer)}, implementations of this method must
+ * {@link RetainableByteBuffer#retain()} the passed buffer if they keep a reference to it.
+ * @param bytes the retainable byte buffer to copy from, which is consumed.
+ * @return true if all bytes of the given buffer were copied, false otherwise.
+ * @throws ReadOnlyBufferException if this buffer is read only.
+ * @see #add(RetainableByteBuffer)
+ */
+ boolean append(RetainableByteBuffer bytes) throws ReadOnlyBufferException;
+
+ /**
+ * Put a {@code byte} to the buffer, growing this buffer if necessary and possible.
+ * @param b the {@code byte} to put
+ * @return {@code this} buffer.
+ * @throws ReadOnlyBufferException if this buffer is read only.
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ Mutable put(byte b);
+
+ /**
+ * Put a {@code short} to the buffer, growing this buffer if necessary and possible.
+ * @param s the {@code short} to put
+ * @return {@code this} buffer.
+ * @throws ReadOnlyBufferException if this buffer is read only.
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ Mutable putShort(short s);
+
+ /**
+ * Put an {@code int} to the buffer, growing this buffer if necessary and possible.
+ * @param i the {@code int} to put
+ * @return {@code this} buffer.
+ * @throws ReadOnlyBufferException if this buffer is read only
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ Mutable putInt(int i);
+
+ /**
+ * Put a {@code long} to the buffer, growing this buffer if necessary and possible.
+ * @param l the {@code long} to put
+ * @return {@code this} buffer.
+ * @throws ReadOnlyBufferException if this buffer is read only
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ Mutable putLong(long l);
+
+ /**
+ * Put a {@code byte} array to the buffer, growing this buffer if necessary and possible.
+ * @param bytes the {@code byte} array to put
+ * @param offset the offset into the array
+ * @param length the length in bytes to put
+ * @return {@code this} buffer.
+ * @throws ReadOnlyBufferException if this buffer is read only
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ Mutable put(byte[] bytes, int offset, int length);
+
+ /**
+ * Put a {@code byte} array to the buffer, growing this buffer if necessary and possible.
+ * @param bytes the {@code byte} array to put
+ * @return {@code this} buffer.
+ * @throws ReadOnlyBufferException if this buffer is read only
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ default Mutable put(byte[] bytes)
+ {
+ return put(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Put a {@code byte} to the buffer at a given index.
+ * @param index The index relative to the current start of unconsumed data in the buffer.
+ * @param b the {@code byte} to put
+ * @return {@code this} buffer.
+ * @throws ReadOnlyBufferException if this buffer is read only.
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ Mutable put(long index, byte b);
+ }
+
/**
* A wrapper for {@link RetainableByteBuffer} instances
*/
- class Wrapper extends Retainable.Wrapper implements RetainableByteBuffer
+ class Wrapper extends Retainable.Wrapper implements Mutable
{
public Wrapper(RetainableByteBuffer wrapped)
{
@@ -215,5 +697,1691 @@ public void clear()
{
getWrapped().clear();
}
+
+ @Override
+ public String toString()
+ {
+ return "%s@%x{%s}".formatted(getClass().getSimpleName(), hashCode(), getWrapped().toString());
+ }
+
+ @Override
+ public boolean appendTo(ByteBuffer buffer)
+ {
+ return getWrapped().appendTo(buffer);
+ }
+
+ @Override
+ public boolean appendTo(RetainableByteBuffer buffer)
+ {
+ return getWrapped().appendTo(buffer);
+ }
+
+ @Override
+ public RetainableByteBuffer copy()
+ {
+ return getWrapped().copy();
+ }
+
+ @Override
+ public RetainableByteBuffer slice(long length)
+ {
+ return getWrapped().slice(length);
+ }
+
+ @Override
+ public byte get(long index)
+ {
+ return getWrapped().get(index);
+ }
+
+ @Override
+ public int get(byte[] bytes, int offset, int length)
+ {
+ return getWrapped().get(bytes, offset, length);
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return getWrapped().isEmpty();
+ }
+
+ @Override
+ public void putTo(ByteBuffer toInfillMode) throws BufferOverflowException
+ {
+ getWrapped().putTo(toInfillMode);
+ }
+
+ @Override
+ public long skip(long length)
+ {
+ return getWrapped().skip(length);
+ }
+
+ @Override
+ public RetainableByteBuffer slice()
+ {
+ return getWrapped().slice();
+ }
+
+ @Override
+ public void writeTo(Content.Sink sink, boolean last, Callback callback)
+ {
+ getWrapped().writeTo(sink, last, callback);
+ }
+
+ @Override
+ public Mutable asMutable()
+ {
+ return this;
+ }
+
+ @Override
+ public boolean isFull()
+ {
+ return getWrapped().asMutable().isFull();
+ }
+
+ @Override
+ public long space()
+ {
+ return getWrapped().asMutable().space();
+ }
+
+ @Override
+ public boolean append(ByteBuffer bytes) throws ReadOnlyBufferException
+ {
+ return getWrapped().asMutable().append(bytes);
+ }
+
+ @Override
+ public boolean append(RetainableByteBuffer bytes) throws ReadOnlyBufferException
+ {
+ return getWrapped().asMutable().append(bytes);
+ }
+
+ @Override
+ public Mutable add(ByteBuffer bytes) throws ReadOnlyBufferException, BufferOverflowException
+ {
+ getWrapped().asMutable().add(bytes);
+ return this;
+ }
+
+ @Override
+ public Mutable add(RetainableByteBuffer bytes) throws ReadOnlyBufferException, BufferOverflowException
+ {
+ getWrapped().asMutable().add(bytes);
+ return this;
+ }
+
+ @Override
+ public Mutable put(byte b)
+ {
+ getWrapped().asMutable().put(b);
+ return this;
+ }
+
+ @Override
+ public Mutable put(long index, byte b)
+ {
+ getWrapped().asMutable().put(index, b);
+ return this;
+ }
+
+ @Override
+ public Mutable putShort(short s)
+ {
+ getWrapped().asMutable().putShort(s);
+ return this;
+ }
+
+ @Override
+ public Mutable putInt(int i)
+ {
+ getWrapped().asMutable().putInt(i);
+ return this;
+ }
+
+ @Override
+ public Mutable putLong(long l)
+ {
+ getWrapped().asMutable().putLong(l);
+ return this;
+ }
+
+ @Override
+ public Mutable put(byte[] bytes, int offset, int length)
+ {
+ getWrapped().asMutable().put(bytes, offset, length);
+ return this;
+ }
+
+ @Override
+ public String toDetailString()
+ {
+ return getWrapped().toDetailString();
+ }
+ }
+
+ /**
+ * An abstract implementation of {@link RetainableByteBuffer} that provides the basic {@link Retainable} functionality
+ */
+ abstract class Abstract extends Retainable.Wrapper implements Mutable
+ {
+ public Abstract()
+ {
+ this(new ReferenceCounter());
+ }
+
+ public Abstract(Retainable retainable)
+ {
+ super(retainable);
+ }
+
+ /**
+ * @return A string showing the info about this buffer
+ */
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder();
+ addStringInfo(builder);
+ return builder.toString();
+ }
+
+ /**
+ * @return A string showing the info and detail about this buffer
+ */
+ @Override
+ public String toDetailString()
+ {
+ StringBuilder builder = new StringBuilder();
+ addStringInfo(builder);
+ builder.append("={");
+ addValueString(builder);
+ builder.append("}");
+ return builder.toString();
+ }
+
+ protected void addStringInfo(StringBuilder builder)
+ {
+ builder.append(getClass().getSimpleName());
+ builder.append("@");
+ builder.append(Integer.toHexString(System.identityHashCode(this)));
+ builder.append("[");
+ builder.append(size());
+ builder.append("/");
+ builder.append(maxSize());
+ builder.append(",d=");
+ builder.append(isDirect());
+ addExtraStringInfo(builder);
+ builder.append(",r=");
+ builder.append(getRetained());
+ builder.append("]");
+ }
+
+ protected void addExtraStringInfo(StringBuilder builder)
+ {
+ }
+
+ protected void addValueString(StringBuilder builder)
+ {
+ addValueMarker(builder, true);
+ long size = size();
+ if (size <= 48)
+ {
+ for (int i = 0; i < size; i++)
+ BufferUtil.appendDebugByte(builder, get(i));
+ }
+ else
+ {
+ for (int i = 0; i < 24; i++)
+ BufferUtil.appendDebugByte(builder, get(i));
+ builder.append("...");
+ for (int i = 0; i < 24; i++)
+ BufferUtil.appendDebugByte(builder, get(size - 24 + i));
+ }
+ addValueMarker(builder, false);
+ }
+
+ protected void addValueMarker(StringBuilder builder, boolean beginning)
+ {
+ builder.append(beginning ? "<<<" : ">>>");
+ }
+ }
+
+ /**
+ * A fixed capacity {@link Mutable} {@link RetainableByteBuffer} backed by a single
+ * {@link ByteBuffer}.
+ */
+ class FixedCapacity extends Abstract implements Mutable
+ {
+ private final ByteBuffer _byteBuffer;
+ /*
+ * Remember the flip mode of the internal bytebuffer. This is useful when a FixedCapacity buffer is used
+ * to aggregate multiple other buffers (e.g. by DynamicCapacity buffer), as it avoids a flip/flop on every append.
+ */
+ private int _flipPosition = -1;
+
+ public FixedCapacity(ByteBuffer byteBuffer)
+ {
+ this(byteBuffer, new ReferenceCounter());
+ }
+
+ public FixedCapacity(ByteBuffer byteBuffer, Retainable retainable)
+ {
+ super(retainable);
+ _byteBuffer = Objects.requireNonNull(byteBuffer);
+ }
+
+ @Override
+ public void clear()
+ {
+ super.clear();
+ _byteBuffer.clear();
+ _flipPosition = 0;
+ }
+
+ @Override
+ public Mutable asMutable()
+ {
+ if (!isMutable() || isRetained())
+ throw new ReadOnlyBufferException();
+ return this;
+ }
+
+ @Override
+ public int remaining()
+ {
+ if (_flipPosition < 0)
+ return super.remaining();
+ return _byteBuffer.position() - _flipPosition;
+ }
+
+ @Override
+ public boolean hasRemaining()
+ {
+ if (_flipPosition < 0)
+ return super.hasRemaining();
+
+ return _flipPosition > 0 || _byteBuffer.position() > 0;
+ }
+
+ @Override
+ public boolean isDirect()
+ {
+ return _byteBuffer.isDirect();
+ }
+
+ @Override
+ public int capacity()
+ {
+ return _byteBuffer.capacity();
+ }
+
+ @Override
+ public byte get(long index) throws IndexOutOfBoundsException
+ {
+ int offset = _flipPosition < 0 ? _byteBuffer.position() : _flipPosition;
+ return _byteBuffer.get(offset + Math.toIntExact(index));
+ }
+
+ @Override
+ public void limit(long size)
+ {
+ if (_flipPosition < 0)
+ super.limit(size);
+ else
+ _byteBuffer.position(_flipPosition + Math.toIntExact(Math.min(size, size())));
+ }
+
+ @Override
+ public ByteBuffer getByteBuffer()
+ {
+ // Ensure buffer is in flush mode if accessed externally
+ if (_flipPosition >= 0)
+ {
+ BufferUtil.flipToFlush(_byteBuffer, _flipPosition);
+ _flipPosition = -1;
+ }
+ return _byteBuffer;
+ }
+
+ @Override
+ public boolean append(ByteBuffer bytes) throws ReadOnlyBufferException
+ {
+ // Try to add the whole buffer
+ assert !isRetained();
+
+ // Ensure buffer is flipped to fill mode (and left that way)
+ if (_flipPosition < 0)
+ _flipPosition = BufferUtil.flipToFill(_byteBuffer);
+
+ int length = bytes.remaining();
+ int space = _byteBuffer.remaining();
+
+ if (space == 0)
+ return length == 0;
+
+ if (length > space)
+ {
+ // No space for the whole buffer, so put as much as we can
+ int position = _byteBuffer.position();
+ _byteBuffer.put(position, bytes, bytes.position(), space);
+ _byteBuffer.position(position + space);
+ bytes.position(bytes.position() + space);
+ return false;
+ }
+
+ if (length > 0)
+ _byteBuffer.put(bytes);
+ return true;
+ }
+
+ @Override
+ public boolean append(RetainableByteBuffer bytes) throws ReadOnlyBufferException
+ {
+ assert !isRetained();
+ return bytes.remaining() == 0 || append(bytes.getByteBuffer());
+ }
+
+ @Override
+ public Mutable add(ByteBuffer bytes) throws ReadOnlyBufferException
+ {
+ assert !isRetained();
+
+ // Ensure buffer is flipped to fill mode (and left that way)
+ if (_flipPosition < 0)
+ _flipPosition = BufferUtil.flipToFill(_byteBuffer);
+
+ int length = bytes.remaining();
+ int space = _byteBuffer.remaining();
+
+ if (length > space)
+ throw new BufferOverflowException();
+
+ if (length > 0)
+ _byteBuffer.put(bytes);
+
+ return this;
+ }
+
+ @Override
+ public Mutable add(RetainableByteBuffer bytes) throws ReadOnlyBufferException
+ {
+ assert !isRetained();
+
+ if (bytes instanceof DynamicCapacity dynamic)
+ {
+ int length = bytes.remaining();
+ int space = _byteBuffer.remaining();
+
+ if (length > space)
+ throw new BufferOverflowException();
+ if (length > 0)
+ {
+ for (RetainableByteBuffer buffer : dynamic._buffers)
+ {
+ buffer.retain();
+ add(buffer);
+ }
+ }
+ bytes.release();
+ return this;
+ }
+
+ add(bytes.getByteBuffer());
+ bytes.release();
+ return this;
+ }
+
+ /**
+ * Put a {@code byte} to the buffer, growing this buffer if necessary and possible.
+ * @param b the {@code byte} to put
+ * @throws ReadOnlyBufferException if this buffer is read only.
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ @Override
+ public Mutable put(byte b)
+ {
+ assert !isRetained();
+
+ // Ensure buffer is flipped to fill mode (and left that way)
+ if (_flipPosition < 0)
+ _flipPosition = BufferUtil.flipToFill(_byteBuffer);
+
+ _byteBuffer.put(b);
+ return this;
+ }
+
+ @Override
+ public Mutable put(long index, byte b)
+ {
+ assert !isRetained();
+
+ // Ensure buffer is flipped to fill mode (and left that way)
+ if (_flipPosition < 0)
+ _flipPosition = BufferUtil.flipToFill(_byteBuffer);
+ int remaining = _byteBuffer.position() - _flipPosition;
+ if (index > remaining)
+ throw new IndexOutOfBoundsException();
+ _byteBuffer.put(_flipPosition + Math.toIntExact(index), b);
+ return this;
+ }
+
+ /**
+ * Put a {@code short} to the buffer, growing this buffer if necessary and possible.
+ * @param s the {@code short} to put
+ * @throws ReadOnlyBufferException if this buffer is read only.
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ @Override
+ public Mutable putShort(short s)
+ {
+ assert !isRetained();
+
+ // Ensure buffer is flipped to fill mode (and left that way)
+ if (_flipPosition < 0)
+ _flipPosition = BufferUtil.flipToFill(_byteBuffer);
+
+ _byteBuffer.putShort(s);
+ return this;
+ }
+
+ /**
+ * Put an {@code int} to the buffer, growing this buffer if necessary and possible.
+ * @param i the {@code int} to put
+ * @throws ReadOnlyBufferException if this buffer is read only
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ @Override
+ public Mutable putInt(int i)
+ {
+ assert !isRetained();
+
+ // Ensure buffer is flipped to fill mode (and left that way)
+ if (_flipPosition < 0)
+ _flipPosition = BufferUtil.flipToFill(_byteBuffer);
+
+ _byteBuffer.putInt(i);
+ return this;
+ }
+
+ /**
+ * Put a {@code long} to the buffer, growing this buffer if necessary and possible.
+ * @param l the {@code long} to put
+ * @throws ReadOnlyBufferException if this buffer is read only
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ @Override
+ public Mutable putLong(long l)
+ {
+ assert !isRetained();
+
+ // Ensure buffer is flipped to fill mode (and left that way)
+ if (_flipPosition < 0)
+ _flipPosition = BufferUtil.flipToFill(_byteBuffer);
+
+ _byteBuffer.putLong(l);
+ return this;
+ }
+
+ /**
+ * Put a {@code byte} array to the buffer, growing this buffer if necessary and possible.
+ * @param bytes the {@code byte} array to put
+ * @param offset the offset into the array
+ * @param length the length in bytes to put
+ * @throws ReadOnlyBufferException if this buffer is read only
+ * @throws BufferOverflowException if this buffer cannot fit the byte
+ */
+ @Override
+ public Mutable put(byte[] bytes, int offset, int length)
+ {
+ assert !isRetained();
+
+ // Ensure buffer is flipped to fill mode (and left that way)
+ if (_flipPosition < 0)
+ _flipPosition = BufferUtil.flipToFill(_byteBuffer);
+
+ _byteBuffer.put(bytes, offset, length);
+ return this;
+ }
+
+ @Override
+ protected void addValueMarker(StringBuilder builder, boolean beginning)
+ {
+ if (beginning)
+ {
+ if (_flipPosition >= 0)
+ {
+ builder.append("<<~")
+ .append(_flipPosition)
+ .append('-')
+ .append(_byteBuffer.position())
+ .append('/')
+ .append(_byteBuffer.capacity())
+ .append('<');
+ }
+ else
+ {
+ builder.append("<<")
+ .append(_byteBuffer.position())
+ .append('-')
+ .append(_byteBuffer.limit())
+ .append('/')
+ .append(_byteBuffer.capacity())
+ .append('<');
+ }
+ }
+ else
+ {
+ builder.append(">>>");
+ }
+ }
+ }
+
+ /**
+ * A {@link ByteBufferPool pooled} buffer that knows the pool from which it was allocated.
+ * Any methods that may need to allocated additional buffers (e.g. {@link #copy()}) will use the pool.
+ */
+ class Pooled extends FixedCapacity
+ {
+ private final ByteBufferPool _pool;
+
+ public Pooled(ByteBufferPool pool, ByteBuffer byteBuffer)
+ {
+ super(byteBuffer);
+ _pool = pool;
+ }
+
+ protected Pooled(ByteBufferPool pool, ByteBuffer byteBuffer, Retainable retainable)
+ {
+ super(byteBuffer, retainable);
+ _pool = pool;
+ }
+
+ @Override
+ public RetainableByteBuffer slice(long length)
+ {
+ int size = remaining();
+ ByteBuffer byteBuffer = getByteBuffer();
+ int limit = byteBuffer.limit();
+
+ byteBuffer.limit(byteBuffer.position() + Math.toIntExact(Math.min(length, size)));
+ ByteBuffer slice = byteBuffer.slice();
+ byteBuffer.limit(limit);
+ if (length > size)
+ slice.limit(size);
+
+ if (!canRetain())
+ return new NonRetainableByteBuffer(slice);
+
+ retain();
+ return new Pooled(_pool, slice, this);
+ }
+
+ @Override
+ public RetainableByteBuffer copy()
+ {
+ RetainableByteBuffer copy = _pool.acquire(remaining(), isDirect());
+ copy.asMutable().append(getByteBuffer().slice());
+ return copy;
+ }
+ }
+
+ /**
+ * a {@link FixedCapacity} buffer that is neither not pooled nor {@link Retainable#canRetain() retainable}.
+ */
+ class NonRetainableByteBuffer extends FixedCapacity
+ {
+ public NonRetainableByteBuffer(ByteBuffer byteBuffer)
+ {
+ super(byteBuffer, NON_RETAINABLE);
+ }
+ }
+
+ /**
+ * An {@link Mutable} {@link RetainableByteBuffer} that can grow its capacity, backed by a chain of {@link ByteBuffer},
+ * which may grow either by aggregation and/or retention.
+ * When retaining, a chain of zero copy buffers are kept.
+ * When aggregating, this class avoid repetitive copies of the same data during growth by aggregating
+ * to a chain of buffers, which are only copied to a single buffer if required.
+ * If the {@code minRetainSize} is {code 0}, then appending to this buffer will always retain and accumulate.
+ * If the {@code minRetainSize} is {@link Integer#MAX_VALUE}, then appending to this buffer will always aggregate.
+ */
+ class DynamicCapacity extends Abstract implements Mutable
+ {
+ private static final Logger LOG = LoggerFactory.getLogger(RetainableByteBuffer.DynamicCapacity.class);
+
+ private final ByteBufferPool _pool;
+ private final boolean _direct;
+ private final long _maxSize;
+ private final List _buffers;
+ private final int _aggregationSize;
+ private final int _minRetainSize;
+ private Mutable _aggregate;
+
+ /**
+ * A buffer with no size limit and default aggregation and retention settings.
+ */
+ public DynamicCapacity()
+ {
+ this(null, false, -1, -1, -1);
+ }
+
+ /**
+ * @param pool The pool from which to allocate buffers
+ */
+ public DynamicCapacity(ByteBufferPool pool)
+ {
+ this(pool, false, -1, -1, -1);
+ }
+
+ /**
+ * @param pool The pool from which to allocate buffers
+ * @param direct true if direct buffers should be used
+ * @param maxSize The maximum length of the accumulated buffers or -1 for 2GB limit
+ */
+ public DynamicCapacity(ByteBufferPool pool, boolean direct, long maxSize)
+ {
+ this(pool, direct, maxSize, -1, -1);
+ }
+
+ /**
+ * @param pool The pool from which to allocate buffers
+ * @param direct true if direct buffers should be used
+ * @param maxSize The maximum length of the accumulated buffers or -1 for 2GB limit
+ * @param aggregationSize The default size of aggregation buffers; or 0 for no aggregation growth; or -1 for a default size.
+ * If the {@code aggregationSize} is 0 and the {@code maxSize} is less that {@link Integer#MAX_VALUE},
+ * then a single aggregation buffer may be allocated and the class will behave similarly to {@link FixedCapacity}.
+ */
+ public DynamicCapacity(ByteBufferPool pool, boolean direct, long maxSize, int aggregationSize)
+ {
+ this(pool, direct, maxSize, aggregationSize, -1);
+ }
+
+ /**
+ * @param pool The pool from which to allocate buffers
+ * @param direct true if direct buffers should be used
+ * @param maxSize The maximum length of the accumulated buffers or -1 for 2GB limit
+ * @param aggregationSize The default size of aggregation buffers; or 0 for no aggregation growth; or -1 for a default size.
+ * If the {@code aggregationSize} is 0 and the {@code maxSize} is less that {@link Integer#MAX_VALUE},
+ * then a single aggregation buffer may be allocated and the class will behave similarly to {@link FixedCapacity}.
+ * @param minRetainSize The minimal size of a {@link RetainableByteBuffer} before it will be retained; or 0 to always retain; or -1 for a heuristic;
+ */
+ public DynamicCapacity(ByteBufferPool pool, boolean direct, long maxSize, int aggregationSize, int minRetainSize)
+ {
+ this(new ArrayList<>(), pool, direct, maxSize, aggregationSize, minRetainSize);
+ }
+
+ private DynamicCapacity(List buffers, ByteBufferPool pool, boolean direct, long maxSize, int aggregationSize, int minRetainSize)
+ {
+ super();
+ _pool = pool == null ? new ByteBufferPool.NonPooling() : pool;
+ _direct = direct;
+ _maxSize = maxSize < 0 ? Long.MAX_VALUE : maxSize;
+ _buffers = buffers;
+
+ if (aggregationSize < 0)
+ {
+ _aggregationSize = (int)Math.min(_maxSize, 8192L);
+ }
+ else
+ {
+ if (aggregationSize > _maxSize)
+ throw new IllegalArgumentException("aggregationSize(%d) must be <= maxCapacity(%d)".formatted(aggregationSize, _maxSize));
+ _aggregationSize = aggregationSize;
+ }
+ _minRetainSize = minRetainSize;
+
+ if (_aggregationSize == 0 && _maxSize >= Integer.MAX_VALUE && _minRetainSize != 0)
+ throw new IllegalArgumentException("must always retain if cannot aggregate");
+ }
+
+ public long getMaxSize()
+ {
+ return _maxSize;
+ }
+
+ public int getAggregationSize()
+ {
+ return _aggregationSize;
+ }
+
+ public int getMinRetainSize()
+ {
+ return _minRetainSize;
+ }
+
+ @Override
+ public boolean isMutable()
+ {
+ return true;
+ }
+
+ @Override
+ public Mutable asMutable()
+ {
+ if (isRetained())
+ throw new ReadOnlyBufferException();
+ return this;
+ }
+
+ @Override
+ public ByteBuffer getByteBuffer() throws BufferOverflowException
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("getByteBuffer {}", this);
+ return switch (_buffers.size())
+ {
+ case 0 -> BufferUtil.EMPTY_BUFFER;
+ case 1 -> _buffers.get(0).getByteBuffer();
+ default ->
+ {
+ long size = size();
+ if (size > Integer.MAX_VALUE)
+ throw new BufferOverflowException();
+
+ int length = (int)size;
+ RetainableByteBuffer combined = _pool.acquire(length, _direct);
+ ByteBuffer byteBuffer = combined.getByteBuffer();
+ BufferUtil.flipToFill(byteBuffer);
+ for (RetainableByteBuffer buffer : _buffers)
+ {
+ byteBuffer.put(buffer.getByteBuffer());
+ buffer.release();
+ }
+ BufferUtil.flipToFlush(byteBuffer, 0);
+ _buffers.clear();
+ _buffers.add(combined);
+ _aggregate = null;
+ yield combined.getByteBuffer();
+ }
+ };
+ }
+
+ @Override
+ public RetainableByteBuffer take(long length)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("take {} {}", this, length);
+
+ if (_buffers.isEmpty() || length == 0)
+ return RetainableByteBuffer.EMPTY;
+
+ _aggregate = null;
+
+ if (_buffers.size() == 1)
+ {
+ RetainableByteBuffer buffer = _buffers.get(0);
+
+ // if the length to take is more than half the buffer and it is not retained
+ if (length > (buffer.size() / 2) && !buffer.isRetained())
+ {
+ // slice off the tail and take the buffer itself
+ RetainableByteBuffer tail = buffer.takeFrom(length);
+ _buffers.set(0, tail);
+ return buffer;
+ }
+
+ // take the head of the buffer, but leave the buffer itself
+ return buffer.take(length);
+
+ }
+
+ List buffers = new ArrayList<>(_buffers.size());
+ for (ListIterator i = _buffers.listIterator(); i.hasNext();)
+ {
+ RetainableByteBuffer buffer = i.next();
+
+ long size = buffer.size();
+ if (length >= size)
+ {
+ // take the buffer
+ length -= size;
+ buffers.add(buffer);
+ i.remove();
+ if (length == 0)
+ break;
+ }
+ else
+ {
+ // if the length to take is more than half the buffer and it is not retained
+ if (length > (buffer.size() / 2) && !buffer.isRetained())
+ {
+ // slice off the tail and take the buffer itself
+ RetainableByteBuffer tail = buffer.takeFrom(length);
+ buffers.add(buffer);
+ i.set(tail);
+ }
+ else
+ {
+ // take the head of the buffer, but leave the buffer itself
+ buffers.add(buffer.take(length));
+ }
+ break;
+ }
+ }
+ return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize);
+ }
+
+ @Override
+ public RetainableByteBuffer takeFrom(long skip)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("take {} {}", this, skip);
+
+ if (_buffers.isEmpty() || skip > size())
+ return RetainableByteBuffer.EMPTY;
+
+ _aggregate = null;
+
+ if (_buffers.size() == 1)
+ {
+ RetainableByteBuffer buffer = _buffers.get(0);
+ // if the length to leave is more than half the buffer
+ if (skip > (buffer.size() / 2) || buffer.isRetained())
+ {
+ // take from the tail of the buffer and leave the buffer itself
+ return buffer.takeFrom(skip);
+ }
+ // leave the head taken from the buffer and take the buffer itself
+ _buffers.set(0, buffer.take(skip));
+ return buffer;
+ }
+
+ List buffers = new ArrayList<>(_buffers.size());
+ for (ListIterator i = _buffers.listIterator(); i.hasNext();)
+ {
+ RetainableByteBuffer buffer = i.next();
+
+ long size = buffer.size();
+ if (skip >= size)
+ {
+ // leave this buffer
+ skip -= size;
+ }
+ else if (skip == 0)
+ {
+ buffers.add(buffer);
+ i.remove();
+ }
+ else
+ {
+ // if the length to leave is more than half the buffer
+ if (skip > (buffer.size() / 2) || buffer.isRetained())
+ {
+ // take from the tail of the buffer and leave the buffer itself
+ buffers.add(buffer.takeFrom(skip));
+ }
+ else
+ {
+ // leave the head taken from the buffer and take the buffer itself
+ i.set(buffer.take(skip));
+ buffers.add(buffer);
+ }
+ skip = 0;
+ }
+ }
+ return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize);
+ }
+
+ /**
+ * Take the contents of this buffer, leaving it clear and independent
+ * @return A possibly newly allocated array with the contents of this buffer, avoiding copies if possible.
+ * The length of the array may be larger than the contents, but the offset will always be 0.
+ */
+ public byte[] takeByteArray()
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("takeByteArray {}", this);
+ return switch (_buffers.size())
+ {
+ case 0 -> BufferUtil.EMPTY_BUFFER.array();
+ case 1 ->
+ {
+ RetainableByteBuffer buffer = _buffers.get(0);
+ _aggregate = null;
+ _buffers.clear();
+
+ // The array within the buffer can be used if it is not pooled, is not shared and it exits
+ byte[] array = (!(buffer instanceof Pooled) && !buffer.isRetained() && !buffer.isDirect())
+ ? buffer.getByteBuffer().array() : BufferUtil.toArray(buffer.getByteBuffer());
+
+ buffer.release();
+ yield array;
+ }
+ default ->
+ {
+ long size = size();
+ if (size > Integer.MAX_VALUE)
+ throw new BufferOverflowException();
+
+ int length = (int)size;
+ byte[] array = new byte[length];
+
+ int offset = 0;
+ for (RetainableByteBuffer buffer : _buffers)
+ {
+ int remaining = buffer.remaining();
+ buffer.get(array, offset, remaining);
+ offset += remaining;
+ buffer.release();
+ }
+ _buffers.clear();
+ _aggregate = null;
+ yield array;
+ }
+ };
+ }
+
+ @Override
+ public byte get() throws BufferUnderflowException
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("get {}", this);
+ for (Iterator i = _buffers.listIterator(); i.hasNext();)
+ {
+ RetainableByteBuffer buffer = i.next();
+ if (buffer.isEmpty())
+ {
+ buffer.release();
+ i.remove();
+ continue;
+ }
+
+ byte b = buffer.get();
+ if (buffer.isEmpty())
+ {
+ buffer.release();
+ i.remove();
+ }
+ return b;
+ }
+ throw new BufferUnderflowException();
+ }
+
+ @Override
+ public byte get(long index) throws IndexOutOfBoundsException
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("get {} {}", this, index);
+ for (RetainableByteBuffer buffer : _buffers)
+ {
+ long size = buffer.size();
+ if (index < size)
+ return buffer.get(Math.toIntExact(index));
+ index -= size;
+ }
+ throw new IndexOutOfBoundsException();
+ }
+
+ @Override
+ public int get(byte[] bytes, int offset, int length)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("get array {} {}", this, length);
+ int got = 0;
+ for (Iterator i = _buffers.listIterator(); length > 0 && i.hasNext();)
+ {
+ RetainableByteBuffer buffer = i.next();
+ int l = buffer.get(bytes, offset, length);
+ got += l;
+ offset += l;
+ length -= l;
+
+ if (buffer.isEmpty())
+ {
+ buffer.release();
+ i.remove();
+ }
+ }
+ return got;
+ }
+
+ @Override
+ public boolean isDirect()
+ {
+ return _direct;
+ }
+
+ @Override
+ public boolean hasRemaining()
+ {
+ for (RetainableByteBuffer rbb : _buffers)
+ if (!rbb.isEmpty())
+ return true;
+ return false;
+ }
+
+ @Override
+ public long skip(long length)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("skip {} {}", this, length);
+ long skipped = 0;
+ for (Iterator i = _buffers.listIterator(); length > 0 && i.hasNext();)
+ {
+ RetainableByteBuffer buffer = i.next();
+ long skip = buffer.skip(length);
+ skipped += skip;
+ length -= skip;
+
+ if (buffer.isEmpty())
+ {
+ buffer.release();
+ i.remove();
+ }
+ }
+ return skipped;
+ }
+
+ @Override
+ public void limit(long limit)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("limit {} {}", this, limit);
+ for (Iterator i = _buffers.iterator(); i.hasNext();)
+ {
+ RetainableByteBuffer buffer = i.next();
+
+ long size = buffer.size();
+ if (limit == 0)
+ {
+ buffer.release();
+ i.remove();
+ }
+ else if (limit < size)
+ {
+ buffer.limit(limit);
+ limit = 0;
+ }
+ else
+ {
+ limit -= size;
+ }
+ }
+ }
+
+ @Override
+ public Mutable slice()
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("slice {}", this);
+ List buffers = new ArrayList<>(_buffers.size());
+ for (RetainableByteBuffer rbb : _buffers)
+ buffers.add(rbb.slice());
+ return newSlice(buffers);
+ }
+
+ @Override
+ public Mutable slice(long length)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("slice {} {}", this, length);
+ List buffers = new ArrayList<>(_buffers.size());
+ for (Iterator i = _buffers.iterator(); i.hasNext();)
+ {
+ RetainableByteBuffer buffer = i.next();
+ long size = buffer.size();
+
+ // If length is exceeded or this is the last buffer
+ if (size > length || !i.hasNext())
+ {
+ // slice with length
+ buffers.add(buffer.slice(length));
+ break;
+ }
+
+ buffers.add(buffer.slice());
+ length -= size;
+ }
+ return newSlice(buffers);
+ }
+
+ private Mutable newSlice(List buffers)
+ {
+ return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize);
+ }
+
+ @Override
+ public long space()
+ {
+ return maxSize() - size();
+ }
+
+ @Override
+ public boolean isFull()
+ {
+ return size() >= maxSize();
+ }
+
+ @Override
+ public RetainableByteBuffer copy()
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("copy {}", this);
+ List buffers = new ArrayList<>(_buffers.size());
+ for (RetainableByteBuffer rbb : _buffers)
+ buffers.add(rbb.copy());
+
+ return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize);
+ }
+
+ /**
+ * {@inheritDoc}
+ * @return {@link Integer#MAX_VALUE} if the length of this {@code Accumulator} is greater than {@link Integer#MAX_VALUE}
+ */
+ @Override
+ public int remaining()
+ {
+ long size = size();
+ return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : Math.toIntExact(size);
+ }
+
+ @Override
+ public long size()
+ {
+ long length = 0;
+ for (RetainableByteBuffer buffer : _buffers)
+ length += buffer.remaining();
+ return length;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @return {@link Integer#MAX_VALUE} if the maxLength of this {@code Accumulator} is greater than {@link Integer#MAX_VALUE}.
+ */
+ @Override
+ public int capacity()
+ {
+ long maxSize = maxSize();
+ return maxSize > Integer.MAX_VALUE ? Integer.MAX_VALUE : Math.toIntExact(maxSize);
+ }
+
+ @Override
+ public long maxSize()
+ {
+ return _maxSize;
+ }
+
+ @Override
+ public boolean release()
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("release {}", this);
+ if (super.release())
+ {
+ for (RetainableByteBuffer buffer : _buffers)
+ buffer.release();
+ _buffers.clear();
+ _aggregate = null;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void clear()
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("clear {}", this);
+ if (_buffers.isEmpty())
+ return;
+ _aggregate = null;
+ for (RetainableByteBuffer rbb : _buffers)
+ rbb.release();
+ _buffers.clear();
+ }
+
+ @Override
+ public boolean append(ByteBuffer bytes)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("append BB {} <- {}", this, BufferUtil.toDetailString(bytes));
+ // Cannot mutate contents if retained
+ assert !isRetained();
+
+ // handle empty appends
+ if (bytes == null)
+ return true;
+ int length = bytes.remaining();
+ if (length == 0)
+ return true;
+
+ // Try appending to any existing aggregation buffer
+ boolean existing = _aggregate != null;
+ if (existing)
+ {
+ if (BufferUtil.append(_aggregate.getByteBuffer(), bytes) == length)
+ return true;
+
+ // we were limited by the capacity of the buffer, fall through to trying to allocate another
+ _aggregate = null;
+ }
+
+ // are we full?
+ long size = size();
+ long space = _maxSize - size;
+ if (space <= 0)
+ return false;
+
+ // We will aggregate, either into the last buffer or a newly allocated one.
+ if (!existing &&
+ !_buffers.isEmpty() &&
+ _buffers.get(_buffers.size() - 1) instanceof Mutable mutable &&
+ mutable.isMutable() &&
+ mutable.space() >= length &&
+ !mutable.isRetained())
+ {
+ // We can use the last buffer as the aggregate
+ _aggregate = mutable;
+ checkAggregateLimit(space);
+ }
+ else
+ {
+ // acquire a new aggregate buffer
+ int aggregateSize = _aggregationSize;
+
+ // If we cannot grow, allow a single allocation only if we have not already retained.
+ if (aggregateSize == 0 && _buffers.isEmpty() && _maxSize < Integer.MAX_VALUE)
+ aggregateSize = (int)_maxSize;
+
+ aggregateSize = Math.max(length, aggregateSize);
+ if (aggregateSize > space)
+ aggregateSize = (int)space;
+
+ _aggregate = _pool.acquire(aggregateSize, _direct).asMutable(); // TODO don't allocate more than space
+ checkAggregateLimit(space);
+ _buffers.add(_aggregate);
+ }
+
+ return _aggregate.append(bytes);
+ }
+
+ private void checkAggregateLimit(long space)
+ {
+ // If the new aggregate buffer is larger than the space available, then adjust the capacity
+ if (_aggregate.capacity() > space)
+ {
+ ByteBuffer byteBuffer = _aggregate.getByteBuffer();
+ int limit = byteBuffer.limit();
+ byteBuffer.limit(limit + Math.toIntExact(space));
+ byteBuffer = byteBuffer.slice();
+ byteBuffer.limit(limit);
+ _aggregate = RetainableByteBuffer.wrap(byteBuffer, _aggregate).asMutable();
+ }
+ }
+
+ private boolean shouldAggregate(RetainableByteBuffer buffer, long size)
+ {
+ if (_minRetainSize > 0)
+ return size < _minRetainSize;
+
+ if (_minRetainSize == -1)
+ {
+ // If we are already aggregating and the size is small
+ if (_aggregate != null && size < 128)
+ return true;
+
+ // else if there is a lot of wasted space in the buffer
+ if (buffer instanceof FixedCapacity)
+ return size < buffer.capacity() / 64;
+
+ // else if it is small
+ return size < 128;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean append(RetainableByteBuffer retainableBytes)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("append RBB {} {}", this, retainableBytes);
+
+ // Cannot mutate contents if retained
+ assert !isRetained();
+
+ // Optimize appending dynamics
+ if (retainableBytes instanceof DynamicCapacity dynamicCapacity)
+ {
+ for (Iterator i = dynamicCapacity._buffers.iterator(); i.hasNext();)
+ {
+ RetainableByteBuffer buffer = i.next();
+ if (!append(buffer))
+ return false;
+ buffer.release();
+ i.remove();
+ }
+ return true;
+ }
+
+ // handle empty appends
+ if (retainableBytes == null)
+ return true;
+ long length = retainableBytes.remaining();
+ if (length == 0)
+ return true;
+
+ // If we are already aggregating, and the content will fit, and the pass buffer is mostly empty then just aggregate
+ if (_aggregate != null && _aggregate.space() >= length && (length * 100) < retainableBytes.maxSize())
+ return _aggregate.append(retainableBytes.getByteBuffer());
+
+ // If the content is a tiny part of the retainable, then better to aggregate rather than accumulate
+ if (shouldAggregate(retainableBytes, length))
+ return append(retainableBytes.getByteBuffer());
+
+ // We will accumulate, so stop any further aggregation without allocating a new aggregate buffer;
+ _aggregate = null;
+
+ // Do we have space?
+ long space = _maxSize - size();
+ if (length <= space)
+ {
+ // We have space, so add a retained slice;
+ _buffers.add(retainableBytes.slice());
+ retainableBytes.skip(length);
+ return true;
+ }
+
+ // Are we full?
+ if (space == 0)
+ return false;
+
+ // Add a space limited retained slice of the buffer
+ length = space;
+ _buffers.add(retainableBytes.slice(length));
+ retainableBytes.skip(length);
+ return false;
+ }
+
+ @Override
+ public Mutable add(ByteBuffer bytes) throws ReadOnlyBufferException, BufferOverflowException
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("add BB {} <- {}", this, BufferUtil.toDetailString(bytes));
+ add(RetainableByteBuffer.wrap(bytes));
+ return this;
+ }
+
+ @Override
+ public Mutable add(RetainableByteBuffer bytes) throws ReadOnlyBufferException, BufferOverflowException
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("add RBB {} <- {}", this, bytes);
+ long size = size();
+ long space = _maxSize - size;
+ long length = bytes.size();
+ if (space < length)
+ throw new BufferOverflowException();
+
+ if (shouldAggregate(bytes, length) && append(bytes))
+ {
+ bytes.release();
+ return this;
+ }
+
+ _buffers.add(bytes);
+ _aggregate = null;
+ return this;
+ }
+
+ @Override
+ public Mutable put(byte b)
+ {
+ ensure(1).put(b);
+ return this;
+ }
+
+ @Override
+ public Mutable put(long index, byte b)
+ {
+ for (RetainableByteBuffer buffer : _buffers)
+ {
+ long size = buffer.size();
+ if (index < size)
+ {
+ buffer.asMutable().put(index, b);
+ return this;
+ }
+ index -= size;
+ }
+ throw new IndexOutOfBoundsException();
+ }
+
+ @Override
+ public Mutable putShort(short s)
+ {
+ ensure(2).putShort(s);
+ return this;
+ }
+
+ @Override
+ public Mutable putInt(int i)
+ {
+ ensure(4).putInt(i);
+ return this;
+ }
+
+ @Override
+ public Mutable putLong(long l)
+ {
+ ensure(8).putLong(l);
+ return this;
+ }
+
+ @Override
+ public Mutable put(byte[] bytes, int offset, int length)
+ {
+ // Use existing aggregate if the length is large and there is space for at least half
+ if (length >= 16 && _aggregate != null)
+ {
+ long space = _aggregate.space();
+ if (length > space && length / 2 <= space)
+ {
+ int s = (int)space;
+ _aggregate.put(bytes, offset, s);
+ offset += s;
+ length -= s;
+ }
+ }
+
+ ensure(length).put(bytes, offset, length);
+ return this;
+ }
+
+ private Mutable ensure(int needed) throws BufferOverflowException
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("ensure {} {}", this, needed);
+ long size = size();
+ long space = _maxSize - size;
+ if (space < needed)
+ throw new BufferOverflowException();
+ if (_aggregate != null)
+ {
+ if (_aggregate.space() >= needed)
+ return _aggregate;
+ }
+ else if (!_buffers.isEmpty() &&
+ _buffers.get(_buffers.size() - 1) instanceof Mutable mutable &&
+ mutable.isMutable() &&
+ mutable.space() >= needed &&
+ !mutable.isRetained())
+ {
+ _aggregate = mutable;
+ return _aggregate;
+ }
+
+ // We need a new aggregate, acquire a new aggregate buffer
+ int aggregateSize = _aggregationSize;
+
+ // If we cannot grow, allow a single allocation only if we have not already retained.
+ if (aggregateSize == 0 && _buffers.isEmpty() && _maxSize < Integer.MAX_VALUE)
+ aggregateSize = (int)_maxSize;
+ _aggregate = _pool.acquire(Math.max(needed, aggregateSize), _direct).asMutable();
+
+ // If the new aggregate buffer is larger than the space available, then adjust the capacity
+ checkAggregateLimit(space);
+ _buffers.add(_aggregate);
+ return _aggregate;
+ }
+
+ @Override
+ public boolean appendTo(ByteBuffer to)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("appendTo BB {} -> {}", this, BufferUtil.toDetailString(to));
+ _aggregate = null;
+ for (Iterator i = _buffers.listIterator(); i.hasNext();)
+ {
+ RetainableByteBuffer buffer = i.next();
+ if (!buffer.appendTo(to))
+ return false;
+ buffer.release();
+ i.remove();
+ }
+ return true;
+ }
+
+ @Override
+ public boolean appendTo(RetainableByteBuffer to)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("appendTo RBB {} -> {}", this, to);
+ _aggregate = null;
+ for (Iterator i = _buffers.listIterator(); i.hasNext();)
+ {
+ RetainableByteBuffer buffer = i.next();
+ if (!buffer.appendTo(to))
+ return false;
+ buffer.release();
+ i.remove();
+ }
+ return true;
+ }
+
+ @Override
+ public void putTo(ByteBuffer toInfillMode)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("putTo BB {} -> {}", this, toInfillMode);
+ _aggregate = null;
+ for (Iterator i = _buffers.listIterator(); i.hasNext();)
+ {
+ RetainableByteBuffer buffer = i.next();
+ buffer.putTo(toInfillMode);
+ buffer.release();
+ i.remove();
+ }
+ }
+
+ @Override
+ public void writeTo(Content.Sink sink, boolean last, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("writeTo {} -> {} {} {}", this, sink, last, callback);
+ _aggregate = null;
+ switch (_buffers.size())
+ {
+ case 0 -> callback.succeeded();
+ case 1 ->
+ {
+ RetainableByteBuffer buffer = _buffers.get(0);
+ buffer.writeTo(sink, last, Callback.from(this::clear, callback));
+ }
+ default ->
+ {
+ // Can we do a gather write?
+ if (!last && sink instanceof EndPoint endPoint)
+ {
+ ByteBuffer[] buffers = new ByteBuffer[_buffers.size()];
+ int i = 0;
+ for (RetainableByteBuffer rbb : _buffers)
+ buffers[i++] = rbb.getByteBuffer();
+ endPoint.write(Callback.from(this::clear, callback), buffers);
+ return;
+ }
+
+ // write buffer by buffer
+ new IteratingNestedCallback(callback)
+ {
+ int _index;
+ RetainableByteBuffer _buffer;
+ boolean _lastWritten;
+
+ @Override
+ protected Action process()
+ {
+ // release the last buffer written
+ if (_buffer != null)
+ _buffer.release();
+
+ // write next buffer
+ if (_index < _buffers.size())
+ {
+ _buffer = _buffers.get(_index++);
+ _lastWritten = last && (_index == _buffers.size());
+ _buffer.writeTo(sink, _lastWritten, this);
+ return Action.SCHEDULED;
+ }
+
+ // All buffers written
+ if (last && !_lastWritten)
+ {
+ _buffer = null;
+ _lastWritten = true;
+ sink.write(true, BufferUtil.EMPTY_BUFFER, this);
+ return Action.SCHEDULED;
+ }
+ _buffers.clear();
+ return Action.SUCCEEDED;
+ }
+ }.iterate();
+ }
+ }
+ }
+
+ @Override
+ protected void addExtraStringInfo(StringBuilder builder)
+ {
+ super.addExtraStringInfo(builder);
+ builder.append(",aggSize=");
+ builder.append(_aggregationSize);
+ builder.append(",minRetain=");
+ builder.append(_minRetainSize);
+ builder.append(",buffers=");
+ builder.append(_buffers.size());
+ }
+
+ @Override
+ protected void addValueString(StringBuilder builder)
+ {
+ for (RetainableByteBuffer buffer : _buffers)
+ {
+ builder.append('@');
+ builder.append(Integer.toHexString(System.identityHashCode(buffer)));
+ if (buffer instanceof Abstract abstractBuffer)
+ {
+ builder.append("/r=");
+ builder.append(abstractBuffer.getRetained());
+ abstractBuffer.addValueString(builder);
+ }
+ else
+ {
+ builder.append("???");
+ }
+ }
+ }
+
+ @Override
+ protected void addValueMarker(StringBuilder builder, boolean beginning)
+ {
+ if (beginning)
+ builder.append("<<").append(_buffers.size()).append('<');
+ else
+ builder.append(">>>");
+ }
}
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java
index 6bd5eeebc32a..5eff27b14e81 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java
@@ -71,7 +71,9 @@ public String toString()
@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
- offer(new AsyncChunk(last, byteBuffer, callback));
+ ByteBuffer slice = byteBuffer.slice();
+ BufferUtil.clear(byteBuffer);
+ offer(new AsyncChunk(last, slice, callback));
}
/**
@@ -301,6 +303,12 @@ public boolean canRetain()
return referenceCounter != null;
}
+ @Override
+ public boolean isRetained()
+ {
+ return canRetain() && referenceCounter.isRetained();
+ }
+
@Override
public void retain()
{
@@ -330,5 +338,17 @@ public void failed(Throwable x)
{
callback.failed(x);
}
+
+ @Override
+ public String toString()
+ {
+ return "%s@%x[rc=%s,l=%b,b=%s]".formatted(
+ getClass().getSimpleName(),
+ hashCode(),
+ referenceCounter == null ? "-" : referenceCounter.get(),
+ isLast(),
+ BufferUtil.toDetailString(getByteBuffer())
+ );
+ }
}
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java
index 26a0d972395b..11fe54811be5 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java
@@ -14,16 +14,15 @@
package org.eclipse.jetty.io.content;
import java.io.IOException;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
-import java.nio.channels.WritePendingException;
-import org.eclipse.jetty.io.ByteBufferAggregator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.IteratingCallback;
+import org.eclipse.jetty.util.thread.SerializedInvoker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,15 +42,10 @@ public class BufferedContentSink implements Content.Sink
private static final Logger LOG = LoggerFactory.getLogger(BufferedContentSink.class);
- private static final int START_BUFFER_SIZE = 1024;
-
private final Content.Sink _delegate;
- private final ByteBufferPool _bufferPool;
- private final boolean _direct;
- private final int _maxBufferSize;
private final int _maxAggregationSize;
- private final Flusher _flusher;
- private ByteBufferAggregator _aggregator;
+ private final RetainableByteBuffer.DynamicCapacity _aggregator;
+ private final SerializedInvoker _serializer = new SerializedInvoker();
private boolean _firstWrite = true;
private boolean _lastWritten;
@@ -64,11 +58,8 @@ public BufferedContentSink(Content.Sink delegate, ByteBufferPool bufferPool, boo
if (maxBufferSize < maxAggregationSize)
throw new IllegalArgumentException("maxBufferSize (" + maxBufferSize + ") must be >= maxAggregationSize (" + maxAggregationSize + ")");
_delegate = delegate;
- _bufferPool = (bufferPool == null) ? ByteBufferPool.NON_POOLING : bufferPool;
- _direct = direct;
- _maxBufferSize = maxBufferSize;
_maxAggregationSize = maxAggregationSize;
- _flusher = new Flusher(delegate);
+ _aggregator = new RetainableByteBuffer.DynamicCapacity(bufferPool, direct, maxBufferSize);
}
@Override
@@ -95,12 +86,10 @@ public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
}
ByteBuffer current = byteBuffer != null ? byteBuffer : BufferUtil.EMPTY_BUFFER;
- if (current.remaining() <= _maxAggregationSize)
+ if (current.remaining() <= _maxAggregationSize && !last && byteBuffer != FLUSH_BUFFER)
{
// current buffer can be aggregated
- if (_aggregator == null)
- _aggregator = new ByteBufferAggregator(_bufferPool, _direct, Math.min(START_BUFFER_SIZE, _maxBufferSize), _maxBufferSize);
- aggregateAndFlush(last, current, callback);
+ aggregateAndFlush(current, callback);
}
else
{
@@ -127,180 +116,85 @@ private void flush(boolean last, ByteBuffer currentBuffer, Callback callback)
if (LOG.isDebugEnabled())
LOG.debug("given buffer is greater than _maxBufferSize");
- RetainableByteBuffer aggregatedBuffer = _aggregator == null ? null : _aggregator.takeRetainableByteBuffer();
- if (aggregatedBuffer == null)
+ if (_aggregator.isEmpty())
{
if (LOG.isDebugEnabled())
LOG.debug("nothing aggregated, flushing current buffer {}", currentBuffer);
- _flusher.offer(last, currentBuffer, callback);
+ _delegate.write(last, currentBuffer, callback);
+ }
+ else if (!currentBuffer.hasRemaining())
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("flushing aggregate {}", _aggregator);
+ _aggregator.writeTo(_delegate, last, callback);
+ }
+ else if (last && currentBuffer.remaining() <= Math.min(_maxAggregationSize, _aggregator.space()) && _aggregator.append(currentBuffer))
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("flushing aggregated {}", _aggregator);
+ _aggregator.writeTo(_delegate, true, callback);
}
- else if (BufferUtil.hasContent(currentBuffer))
+ else
{
if (LOG.isDebugEnabled())
- LOG.debug("flushing aggregated buffer {}", aggregatedBuffer);
- _flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release))
+ LOG.debug("flushing aggregate {} and buffer {}", _aggregator, currentBuffer);
+
+ _aggregator.writeTo(_delegate, false, new Callback()
{
@Override
public void succeeded()
{
- super.succeeded();
- if (LOG.isDebugEnabled())
- LOG.debug("succeeded writing aggregated buffer, flushing current buffer {}", currentBuffer);
- _flusher.offer(last, currentBuffer, callback);
+ _delegate.write(last, currentBuffer, callback);
}
@Override
public void failed(Throwable x)
{
- if (LOG.isDebugEnabled())
- LOG.debug("failure writing aggregated buffer", x);
- super.failed(x);
callback.failed(x);
}
+
+ @Override
+ public InvocationType getInvocationType()
+ {
+ return callback.getInvocationType();
+ }
});
}
- else
- {
- _flusher.offer(false, aggregatedBuffer.getByteBuffer(), Callback.from(aggregatedBuffer::release, callback));
- }
}
/**
* Aggregates the given buffer, flushing the aggregated buffer if necessary.
*/
- private void aggregateAndFlush(boolean last, ByteBuffer currentBuffer, Callback callback)
+ private void aggregateAndFlush(ByteBuffer currentBuffer, Callback callback)
{
- boolean full = _aggregator.aggregate(currentBuffer);
- boolean empty = !currentBuffer.hasRemaining();
- boolean flush = full || currentBuffer == FLUSH_BUFFER;
- boolean complete = last && empty;
- if (LOG.isDebugEnabled())
- LOG.debug("aggregated current buffer, full={}, complete={}, bytes left={}, aggregator={}", full, complete, currentBuffer.remaining(), _aggregator);
- if (complete)
+ if (_aggregator.append(currentBuffer))
{
- RetainableByteBuffer aggregatedBuffer = _aggregator.takeRetainableByteBuffer();
- if (aggregatedBuffer != null)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("complete; writing aggregated buffer as the last one: {} bytes", aggregatedBuffer.remaining());
- _flusher.offer(true, aggregatedBuffer.getByteBuffer(), Callback.from(callback, aggregatedBuffer::release));
- }
- else
- {
- if (LOG.isDebugEnabled())
- LOG.debug("complete; no aggregated buffer, writing last empty buffer");
- _flusher.offer(true, BufferUtil.EMPTY_BUFFER, callback);
- }
+ _serializer.run(callback::succeeded);
+ return;
}
- else if (flush)
- {
- RetainableByteBuffer aggregatedBuffer = _aggregator.takeRetainableByteBuffer();
- if (LOG.isDebugEnabled())
- LOG.debug("writing aggregated buffer: {} bytes, then {}", aggregatedBuffer.remaining(), currentBuffer.remaining());
- if (BufferUtil.hasContent(currentBuffer))
- {
- _flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release))
- {
- @Override
- public void succeeded()
- {
- super.succeeded();
- if (LOG.isDebugEnabled())
- LOG.debug("written aggregated buffer, writing remaining of current: {} bytes{}", currentBuffer.remaining(), (last ? " (last write)" : ""));
- if (last)
- _flusher.offer(true, currentBuffer, callback);
- else
- aggregateAndFlush(false, currentBuffer, callback);
- }
-
- @Override
- public void failed(Throwable x)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("failure writing aggregated buffer", x);
- super.failed(x);
- callback.failed(x);
- }
- });
- }
- else
+ _aggregator.writeTo(_delegate, false, new Callback()
+ {
+ @Override
+ public void succeeded()
{
- _flusher.offer(false, aggregatedBuffer.getByteBuffer(), Callback.from(aggregatedBuffer::release, callback));
+ if (_aggregator.append(currentBuffer))
+ callback.succeeded();
+ else
+ callback.failed(new BufferOverflowException());
}
- }
- else
- {
- if (LOG.isDebugEnabled())
- LOG.debug("buffer fully aggregated, delaying writing - aggregator: {}", _aggregator);
- _flusher.offer(callback);
- }
- }
-
- private static class Flusher extends IteratingCallback
- {
- private static final ByteBuffer COMPLETE_CALLBACK = BufferUtil.allocate(0);
-
- private final Content.Sink _sink;
- private boolean _last;
- private ByteBuffer _buffer;
- private Callback _callback;
- private boolean _lastWritten;
-
- Flusher(Content.Sink sink)
- {
- _sink = sink;
- }
-
- void offer(Callback callback)
- {
- offer(false, COMPLETE_CALLBACK, callback);
- }
- void offer(boolean last, ByteBuffer byteBuffer, Callback callback)
- {
- if (_callback != null)
- throw new WritePendingException();
- _last = last;
- _buffer = byteBuffer;
- _callback = callback;
- iterate();
- }
-
- @Override
- protected Action process()
- {
- if (_lastWritten)
- return Action.SUCCEEDED;
- if (_callback == null)
- return Action.IDLE;
- if (_buffer != COMPLETE_CALLBACK)
+ @Override
+ public void failed(Throwable x)
{
- _lastWritten = _last;
- _sink.write(_last, _buffer, this);
+ callback.failed(x);
}
- else
+
+ @Override
+ public InvocationType getInvocationType()
{
- succeeded();
+ return callback.getInvocationType();
}
- return Action.SCHEDULED;
- }
-
- @Override
- public void succeeded()
- {
- _buffer = null;
- Callback callback = _callback;
- _callback = null;
- callback.succeeded();
- super.succeeded();
- }
-
- @Override
- protected void onCompleteFailure(Throwable cause)
- {
- _buffer = null;
- _callback.failed(cause);
- }
+ });
}
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java
index 821782fd32fe..a4b824fd7758 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java
@@ -20,25 +20,19 @@
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.Retainable;
+import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
-public abstract class ByteBufferChunk implements Content.Chunk
+public abstract class ByteBufferChunk extends RetainableByteBuffer.FixedCapacity implements Content.Chunk
{
- private final ByteBuffer byteBuffer;
private final boolean last;
public ByteBufferChunk(ByteBuffer byteBuffer, boolean last)
{
- this.byteBuffer = Objects.requireNonNull(byteBuffer);
+ super(Objects.requireNonNull(byteBuffer));
this.last = last;
}
- @Override
- public ByteBuffer getByteBuffer()
- {
- return byteBuffer;
- }
-
@Override
public boolean isLast()
{
@@ -65,6 +59,12 @@ public WithReferenceCount(ByteBuffer byteBuffer, boolean last)
super(byteBuffer, last);
}
+ @Override
+ public boolean isRetained()
+ {
+ return references.isRetained();
+ }
+
@Override
public boolean canRetain()
{
@@ -148,6 +148,12 @@ public WithRetainable(ByteBuffer byteBuffer, boolean last, Retainable retainable
this.retainable = retainable;
}
+ @Override
+ public boolean isRetained()
+ {
+ return retainable.isRetained();
+ }
+
@Override
public boolean canRetain()
{
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java
index 9db5923b287f..2fa8f10aa82a 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java
@@ -15,13 +15,13 @@
import java.nio.ByteBuffer;
-import org.eclipse.jetty.io.ByteBufferAccumulator;
import org.eclipse.jetty.io.Content;
+import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.Promise;
public class ContentSourceByteBuffer implements Runnable
{
- private final ByteBufferAccumulator accumulator = new ByteBufferAccumulator();
+ private final RetainableByteBuffer.Mutable.DynamicCapacity dynamic = new RetainableByteBuffer.Mutable.DynamicCapacity();
private final Content.Source source;
private final Promise promise;
@@ -52,12 +52,14 @@ public void run()
return;
}
- accumulator.copyBuffer(chunk.getByteBuffer());
+ dynamic.append(chunk.getByteBuffer().slice());
chunk.release();
if (chunk.isLast())
{
- promise.succeeded(accumulator.takeByteBuffer());
+ ByteBuffer dynamicResult = dynamic.getByteBuffer();
+ dynamic.release();
+ promise.succeeded(dynamicResult);
return;
}
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceRetainableByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceRetainableByteBuffer.java
new file mode 100644
index 000000000000..2f1410cf1db9
--- /dev/null
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceRetainableByteBuffer.java
@@ -0,0 +1,75 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.io.internal;
+
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.io.Content;
+import org.eclipse.jetty.io.RetainableByteBuffer;
+import org.eclipse.jetty.util.Promise;
+
+public class ContentSourceRetainableByteBuffer implements Runnable
+{
+ private final RetainableByteBuffer.Mutable _mutable;
+ private final Content.Source _source;
+ private final Promise _promise;
+
+ public ContentSourceRetainableByteBuffer(Content.Source source, ByteBufferPool pool, boolean direct, int maxSize, Promise promise)
+ {
+ _source = source;
+ _mutable = new RetainableByteBuffer.Mutable.DynamicCapacity(pool, direct, maxSize);
+ _promise = promise;
+ }
+
+ @Override
+ public void run()
+ {
+ while (true)
+ {
+ Content.Chunk chunk = _source.read();
+
+ if (chunk == null)
+ {
+ _source.demand(this);
+ return;
+ }
+
+ if (Content.Chunk.isFailure(chunk))
+ {
+ _promise.failed(chunk.getFailure());
+ if (!chunk.isLast())
+ _source.fail(chunk.getFailure());
+ return;
+ }
+
+ boolean appended = _mutable.append(chunk);
+ chunk.release();
+
+ if (!appended)
+ {
+ IllegalStateException ise = new IllegalStateException("Max size (" + _mutable.capacity() + ") exceeded");
+ _promise.failed(ise);
+ _mutable.release();
+ _source.fail(ise);
+ return;
+ }
+
+ if (chunk.isLast())
+ {
+ _promise.succeeded(_mutable);
+ _mutable.release();
+ return;
+ }
+ }
+ }
+}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/NonRetainableByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/NonRetainableByteBuffer.java
deleted file mode 100644
index f58e92315d29..000000000000
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/NonRetainableByteBuffer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
-//
-// This program and the accompanying materials are made available under the
-// terms of the Eclipse Public License v. 2.0 which is available at
-// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
-// which is available at https://www.apache.org/licenses/LICENSE-2.0.
-//
-// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
-// ========================================================================
-//
-
-package org.eclipse.jetty.io.internal;
-
-import java.nio.ByteBuffer;
-
-import org.eclipse.jetty.io.RetainableByteBuffer;
-
-public class NonRetainableByteBuffer implements RetainableByteBuffer
-{
- private final ByteBuffer byteBuffer;
-
- public NonRetainableByteBuffer(ByteBuffer byteBuffer)
- {
- this.byteBuffer = byteBuffer;
- }
-
- @Override
- public boolean isRetained()
- {
- return false;
- }
-
- @Override
- public ByteBuffer getByteBuffer()
- {
- return byteBuffer;
- }
-}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
index 39f74d7dde11..965752c8c45f 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
@@ -342,7 +342,8 @@ private void acquireEncryptedOutput()
public void onUpgradeTo(ByteBuffer buffer)
{
acquireEncryptedInput();
- BufferUtil.append(_encryptedInput.getByteBuffer(), buffer);
+ if (!_encryptedInput.asMutable().append(buffer))
+ throw new IllegalStateException("too much to upgrade");
}
@Override
@@ -434,7 +435,7 @@ private void releaseEmptyEncryptedInputBuffer()
{
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException();
- if (_encryptedInput != null && !_encryptedInput.hasRemaining())
+ if (_encryptedInput != null && _encryptedInput.isEmpty())
{
_encryptedInput.release();
_encryptedInput = null;
@@ -445,7 +446,7 @@ private void releaseEmptyDecryptedInputBuffer()
{
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException();
- if (_decryptedInput != null && !_decryptedInput.hasRemaining())
+ if (_decryptedInput != null && _decryptedInput.isEmpty())
{
_decryptedInput.release();
_decryptedInput = null;
diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java
index 59040fb24566..44e0a3110d83 100644
--- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java
+++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java
@@ -365,6 +365,7 @@ public void testAcquireRelease()
}
@Test
+ @Deprecated(forRemoval = true)
public void testQuadraticPool()
{
ArrayByteBufferPool pool = new ArrayByteBufferPool.Quadratic();
@@ -438,9 +439,9 @@ public void testReleaseExcessMemory()
Collections.reverse(buffers);
buffers.forEach(RetainableByteBuffer::release);
- Pool bucketPool = pool.poolFor(maxCapacity, true);
+ Pool bucketPool = pool.poolFor(maxCapacity, true);
assertThat(bucketPool, instanceOf(CompoundPool.class));
- CompoundPool compoundPool = (CompoundPool)bucketPool;
+ CompoundPool compoundPool = (CompoundPool)bucketPool;
assertThat(compoundPool.getPrimaryPool().size(), is(ConcurrentPool.OPTIMAL_MAX_SIZE));
assertThat(compoundPool.getSecondaryPool().size(), is(0));
}
diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java
index 0731eff5d32b..6f86ab91b163 100644
--- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java
+++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java
@@ -13,9 +13,13 @@
package org.eclipse.jetty.io;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -34,9 +38,11 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
+import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -270,6 +276,12 @@ public void testFlush(BiConsumer flusher) throws
assertThat(BufferUtil.toString(chunk.getByteBuffer()), is("Hello World!"));
chunk.release();
callback.get(5, TimeUnit.SECONDS);
+
+ buffered.write(true, BufferUtil.EMPTY_BUFFER, Callback.NOOP);
+ chunk = async.read();
+ assertThat(chunk.isLast(), is(true));
+ assertThat(chunk.remaining(), is(0));
+ chunk.release();
}
}
@@ -428,7 +440,7 @@ public void testBufferGrowth()
buffered.write(false, ByteBuffer.wrap(input2), Callback.from(() ->
buffered.write(true, ByteBuffer.wrap(input3), Callback.NOOP)))));
- // We expect 3 buffer flushes: 4096b + 4096b + 1808b == 10_000b.
+ // We expect 3 buffer flushes: 4096b + 3004b + 2000 == 10_000b.
Content.Chunk chunk = async.read();
assertThat(chunk, notNullValue());
assertThat(chunk.remaining(), is(4096));
@@ -438,14 +450,14 @@ public void testBufferGrowth()
chunk = async.read();
assertThat(chunk, notNullValue());
- assertThat(chunk.remaining(), is(4096));
+ assertThat(chunk.remaining(), is(input2.length - (4096 - input1.length)));
accumulatingBuffer.put(chunk.getByteBuffer());
assertThat(chunk.release(), is(true));
assertThat(chunk.isLast(), is(false));
chunk = async.read();
assertThat(chunk, notNullValue());
- assertThat(chunk.remaining(), is(1808));
+ assertThat(chunk.remaining(), is(input3.length));
accumulatingBuffer.put(chunk.getByteBuffer());
assertThat(chunk.release(), is(true));
assertThat(chunk.isLast(), is(true));
@@ -539,13 +551,13 @@ public void succeeded()
callback.succeeded();
Content.Chunk read = await().atMost(5, TimeUnit.SECONDS).until(async::read, Objects::nonNull);
- assertThat(read.isLast(), is(false));
assertThat(read.remaining(), is(1024));
+ assertThat(read.isLast(), is(false));
assertThat(read.release(), is(true));
read = await().atMost(5, TimeUnit.SECONDS).until(async::read, Objects::nonNull);
- assertThat(read.isLast(), is(true));
assertThat(read.remaining(), is(1024));
+ assertThat(read.isLast(), is(true));
assertThat(read.release(), is(true));
assertTrue(complete.await(5, TimeUnit.SECONDS));
@@ -594,4 +606,45 @@ public void succeeded()
assertThat(count.get(), is(-1));
}
}
+
+ @Test
+ public void testFromOutputStream()
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Content.Sink sink = Content.Sink.from(baos);
+
+ AccountingCallback accountingCallback = new AccountingCallback();
+
+ sink.write(false, ByteBuffer.wrap("hello ".getBytes(US_ASCII)), accountingCallback);
+ assertThat(accountingCallback.reports, equalTo(List.of("succeeded")));
+ accountingCallback.reports.clear();
+
+ sink.write(true, ByteBuffer.wrap("world".getBytes(US_ASCII)), accountingCallback);
+ assertThat(accountingCallback.reports, equalTo(List.of("succeeded")));
+ accountingCallback.reports.clear();
+
+ sink.write(true, ByteBuffer.wrap(" again".getBytes(US_ASCII)), accountingCallback);
+ assertThat(accountingCallback.reports.size(), is(1));
+ assertThat(accountingCallback.reports.get(0), instanceOf(EOFException.class));
+ accountingCallback.reports.clear();
+
+ assertThat(baos.toString(US_ASCII), is("hello world"));
+ }
+
+ private static class AccountingCallback implements Callback
+ {
+ private final List