Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RetainableByteBuffer as mutable #11801

Merged
merged 92 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 87 commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
4abb57b
Make chunk a RBB
gregw Mar 31, 2024
974d4c4
Make chunk a RBB
gregw Mar 31, 2024
74ce7c2
revert EventsHandler change
gregw Mar 31, 2024
5fd2543
WIP
gregw Mar 31, 2024
6992609
updates from review
gregw Apr 2, 2024
4776471
updates from review
gregw Apr 3, 2024
5723732
more tests
gregw Apr 3, 2024
67c302b
inline asReadOnly
gregw Apr 3, 2024
010267e
Added Appendable as a replacement for both Accumulator and Aggregator
gregw Apr 10, 2024
d18c93a
Better slice implementation
gregw Apr 10, 2024
ecdc2de
protect from modification if retained
gregw Apr 10, 2024
d29fb05
avoid flip-flopping in append loops
gregw Apr 10, 2024
b0fb6cd
long methods
gregw Apr 10, 2024
ecaf1b6
use some appends
gregw Apr 10, 2024
f17395f
javadoc
gregw Apr 10, 2024
521dccd
Fixed test
gregw Apr 11, 2024
ec022ff
appendable wrap
gregw Apr 11, 2024
0d6a1c7
detailString
gregw Apr 11, 2024
df3bf77
Revert NON_POOLING change
gregw Apr 11, 2024
a5cd43f
Testing Fixed
gregw Apr 12, 2024
34620d7
Limit usage to buffer size requests
gregw Apr 12, 2024
f7cf481
Simplified hierarchy and naming
gregw Apr 12, 2024
4c1670d
Simplified hierarchy and naming
gregw Apr 12, 2024
4a726fb
more testing
gregw Apr 12, 2024
eb1e17a
improved test
gregw Apr 12, 2024
91e9561
disable leaky tests
gregw Apr 12, 2024
595323c
removed redundant NetworkBuffer class
gregw Apr 13, 2024
1cb541e
Better heuristic for retaining buffers
gregw Apr 22, 2024
4442c49
Updated BufferedResponseHandler
gregw Apr 25, 2024
4481998
Use RBB in BAEP
gregw Apr 25, 2024
2b55515
added takeRetainableByteBuffer
gregw Apr 25, 2024
dadf4c0
Reworked ChunkAccumulator
gregw Apr 25, 2024
f91532a
Removed ByteBufferAggregator usage
gregw Apr 26, 2024
3292fdc
Improved BufferedContentSink
gregw Apr 26, 2024
a9c7cd9
Improved BufferedContentSink
gregw Apr 26, 2024
ca7420b
Improved BufferedContentSink
gregw Apr 27, 2024
3318003
Deprecate various external extensions of RBB
gregw Apr 28, 2024
5f1f644
Fixes for retain during debugging.
gregw Apr 29, 2024
ca2f8fc
Revert tags
gregw May 1, 2024
1ca24bf
Avoid flaky tests
gregw May 1, 2024
55ff1ab
Made NonPooled wrapping explicit
gregw May 1, 2024
7270ff9
javadoc
gregw May 6, 2024
24c9cba
Avoid bizarre wait on buffer init
gregw May 6, 2024
3def54d
added add and put methods
gregw May 8, 2024
e0215a9
Renamed Appendable to Mutable
gregw May 10, 2024
03cbeb0
WIP on HTTP2
gregw May 10, 2024
1ff0707
WIP on HTTP2
gregw May 10, 2024
81e1f33
WIP on HTTP2
gregw May 12, 2024
2ff2103
WIP
gregw May 12, 2024
55c8b29
WIP
gregw May 13, 2024
6cd8f16
reverted HTTP2 WIP
gregw May 13, 2024
70e5ce9
Changed add to throw OverFlow
gregw May 13, 2024
4983982
WIP on HTTP2
gregw May 13, 2024
c486446
Fixed HTTP2ServerTest
gregw May 14, 2024
42f9164
WIP on HTTP2
gregw May 14, 2024
6177bee
WIP on HTTP2
gregw May 14, 2024
94cc62b
WIP on HTTP2
gregw May 15, 2024
82da0da
WIP on HTTP2
gregw May 15, 2024
3792241
Added toDetailString
gregw May 15, 2024
cb1af15
cleanup
gregw May 15, 2024
8d4a3e7
fixed clear usage
gregw May 15, 2024
2a6b01d
fixed clear usage
gregw May 15, 2024
94b4917
fixed ws message buffer
gregw May 15, 2024
36506e7
fixed ws message buffer
gregw May 15, 2024
9b2bef3
Marked test as flaky
gregw May 16, 2024
9445ef2
Fixes after rebase to 12.1.x
gregw May 16, 2024
ad75635
update ee11
gregw May 17, 2024
fda044d
take never retains
gregw May 17, 2024
fefc233
take never retains
gregw May 17, 2024
9012b73
work in progress
gregw May 17, 2024
c50055b
fluent style
gregw May 17, 2024
abeaf8e
javadoc
gregw May 18, 2024
87e8f01
javadoc
gregw May 19, 2024
1d8f0ac
javadoc
gregw May 19, 2024
0012282
Merge branch 'jetty-12.1.x' into fix/jetty-12.1/10541/chunkAsRBB
gregw May 21, 2024
293a225
introduced take and takeFrom
gregw May 21, 2024
8b112d7
take can return single buffer
gregw May 21, 2024
7fa0d3c
Merge branch 'jetty-12.1.x' into fix/jetty-12.1/10541/chunkAsRBB
gregw May 22, 2024
8630fb1
ByteBufferPool acquire returns Mutable
gregw May 22, 2024
077fd25
updates from review
gregw May 22, 2024
cc9fe32
updates from review
gregw May 22, 2024
145684f
Merge branch 'jetty-12.1.x' into fix/jetty-12.1/10541/chunkAsRBB
gregw May 23, 2024
0a4c799
more updates from review
gregw May 23, 2024
6b99ff7
fixes for review updates
gregw May 23, 2024
ab59f9b
Merge remote-tracking branch 'origin/jetty-12.1.x' into fix/jetty-12.…
gregw May 23, 2024
022b83f
fixes for review updates
gregw May 23, 2024
2bcbdc3
Merge branch 'jetty-12.1.x' into fix/jetty-12.1/10541/chunkAsRBB
gregw Jun 6, 2024
4627f75
replace usages of ByteBufferPool.Accumulator with RetainableByteBuffe…
lorban Jun 20, 2024
e1a7e79
replace usages of ByteBufferPool.Accumulator with RetainableByteBuffe…
lorban Jun 20, 2024
d6dc39a
replace usages of ByteBufferPool.Accumulator with RetainableByteBuffe…
lorban Jun 20, 2024
db00da7
replace usages of ByteBufferPool.Accumulator with RetainableByteBuffe…
lorban Jun 20, 2024
54738d9
updates from review
gregw Jun 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ private void registerDemand(ContentSource contentSource)

private class ContentSource implements Content.Source
{
private static final Content.Chunk ALREADY_READ_CHUNK = new Content.Chunk()
private static final Content.Chunk ALREADY_READ_CHUNK = new Content.Chunk.Empty()
{
@Override
public ByteBuffer getByteBuffer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private boolean parseAndFill()
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("Parsing {} in {}", BufferUtil.toDetailString(networkBuffer.getByteBuffer()), this);
LOG.debug("Parsing {} in {}", networkBuffer, this);
// Always parse even empty buffers to advance the parser.
if (parse())
{
Expand Down Expand Up @@ -347,7 +347,7 @@ private boolean parse()
if (getHttpChannel().isTunnel(method, status))
return true;

if (!networkBuffer.hasRemaining())
if (networkBuffer.isEmpty())
return false;

if (!HttpStatus.isInformational(status))
Expand All @@ -359,7 +359,7 @@ private boolean parse()
return false;
}

if (!networkBuffer.hasRemaining())
if (networkBuffer.isEmpty())
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ public void dispose() throws Exception
{
LifeCycle.stop(client);
LifeCycle.stop(server);
Set<ArrayByteBufferPool.Tracking.Buffer> serverLeaks = serverBufferPool.getLeaks();
Set<ArrayByteBufferPool.Tracking.TrackedBuffer> serverLeaks = serverBufferPool.getLeaks();
assertEquals(0, serverLeaks.size(), serverBufferPool.dumpLeaks());
Set<ArrayByteBufferPool.Tracking.Buffer> clientLeaks = clientBufferPool.getLeaks();
Set<ArrayByteBufferPool.Tracking.TrackedBuffer> clientLeaks = clientBufferPool.getLeaks();
assertEquals(0, clientLeaks.size(), clientBufferPool.dumpLeaks());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ protected int networkFill(ByteBuffer input) throws IOException
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());

ArrayByteBufferPool bufferPool = (ArrayByteBufferPool)server.getByteBufferPool();
Pool<RetainableByteBuffer> bucket = bufferPool.poolFor(16 * 1024 + 1, connector.getConnectionFactory(HttpConnectionFactory.class).isUseInputDirectByteBuffers());
Pool<RetainableByteBuffer.Pooled> bucket = bufferPool.poolFor(16 * 1024 + 1, connector.getConnectionFactory(HttpConnectionFactory.class).isUseInputDirectByteBuffers());
assertEquals(1, bucket.size());
assertEquals(1, bucket.getIdleCount());

Expand All @@ -773,7 +773,7 @@ public void testEncryptedOutputBufferRepooling() throws Exception
ByteBufferPool bufferPool = new ByteBufferPool.Wrapper(new ArrayByteBufferPool())
{
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
RetainableByteBuffer.Wrapper buffer = new RetainableByteBuffer.Wrapper(super.acquire(size, direct))
{
Expand Down Expand Up @@ -843,7 +843,7 @@ public void testEncryptedOutputBufferRepoolingAfterNetworkFlushReturnsFalse(bool
ByteBufferPool bufferPool = new ByteBufferPool.Wrapper(new ArrayByteBufferPool())
{
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
RetainableByteBuffer.Wrapper buffer = new RetainableByteBuffer.Wrapper(super.acquire(size, direct))
{
Expand Down Expand Up @@ -928,7 +928,7 @@ public void testEncryptedOutputBufferRepoolingAfterNetworkFlushThrows(boolean cl
ByteBufferPool bufferPool = new ByteBufferPool.Wrapper(new ArrayByteBufferPool())
{
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
RetainableByteBuffer.Wrapper buffer = new RetainableByteBuffer.Wrapper(super.acquire(size, direct))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import static org.eclipse.jetty.io.Content.Source.asByteBuffer;
import static org.eclipse.jetty.toolchain.test.StackUtils.supply;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.eclipse.jetty.util.BufferUtil.toBuffer;
import static org.eclipse.jetty.util.BufferUtil.toHexString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -169,7 +171,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
MultiPart.Part part = parts.iterator().next();
assertEquals(name, part.getName());
assertEquals("text/plain", part.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertArrayEquals(data, Content.Source.asByteBuffer(part.getContentSource()).array());
assertEquals(toHexString(toBuffer(data)), toHexString(asByteBuffer(part.getContentSource())));
}
});

Expand Down Expand Up @@ -222,7 +224,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
assertEquals(contentType, part.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertEquals(fileName, part.getFileName());
assertEquals(data.length, part.getContentSource().getLength());
assertArrayEquals(data, Content.Source.asByteBuffer(part.getContentSource()).array());
assertEquals(toHexString(toBuffer(data)), toHexString(asByteBuffer(part.getContentSource())));
}
});

Expand Down Expand Up @@ -336,7 +338,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
assertEquals("application/octet-stream", filePart.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertEquals(tmpPath.getFileName().toString(), filePart.getFileName());
assertEquals(Files.size(tmpPath), filePart.getContentSource().getLength());
assertArrayEquals(data, Content.Source.asByteBuffer(filePart.getContentSource()).array());
assertEquals(toHexString(toBuffer(data)), toHexString(asByteBuffer(filePart.getContentSource())));
}
});

Expand Down Expand Up @@ -377,7 +379,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception
assertEquals("file", filePart.getName());
assertEquals("application/octet-stream", filePart.getHeaders().get(HttpHeader.CONTENT_TYPE));
assertEquals("fileName", filePart.getFileName());
assertArrayEquals(fileData, Content.Source.asByteBuffer(filePart.getContentSource()).array());
assertEquals(toHexString(toBuffer(fileData)), toHexString(asByteBuffer(filePart.getContentSource())));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public RetainableByteBuffer decode(ByteBuffer compressed)
RetainableByteBuffer result = acquire(length);
for (RetainableByteBuffer buffer : _inflateds)
{
BufferUtil.append(result.getByteBuffer(), buffer.getByteBuffer());
buffer.appendTo(result);
buffer.release();
}
_inflateds.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ else if (type != HttpTokens.Type.SPACE && type != HttpTokens.Type.HTAB)
if (state == State.EPILOGUE)
notifyComplete();
else
throw new EOFException("unexpected EOF");
throw new EOFException("unexpected EOF in " + state);
}
}
catch (Throwable x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@

package org.eclipse.jetty.http;

import java.nio.ByteBuffer;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.BufferUtil;

public class Trailers implements Content.Chunk
public class Trailers extends Content.Chunk.Empty
{
private final HttpFields trailers;

Expand All @@ -27,12 +24,6 @@ public Trailers(HttpFields trailers)
this.trailers = trailers;
}

@Override
public ByteBuffer getByteBuffer()
{
return BufferUtil.EMPTY_BUFFER;
}

@Override
public boolean isLast()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ interface Factory
HttpContent getContent(String path) throws IOException;
}

// TODO add a writeTo semantic, then update IOResources to use a RBB.Dynamic

/**
* HttpContent Wrapper.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public void before()
pool = new ByteBufferPool.Wrapper(new ArrayByteBufferPool())
{
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
{
counter.incrementAndGet();
return new RetainableByteBuffer.Wrapper(super.acquire(size, direct))
return new RetainableByteBuffer.Mutable.Wrapper(super.acquire(size, direct))
{
@Override
public boolean release()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.content.AsyncContent;
import org.eclipse.jetty.io.content.InputStreamContentSource;
import org.eclipse.jetty.toolchain.test.FS;
Expand Down Expand Up @@ -1601,26 +1602,19 @@ public Content.Chunk read()
}
}

private static class NonRetainableChunk implements Content.Chunk
private static class NonRetainableChunk extends RetainableByteBuffer.NonRetainableByteBuffer implements Content.Chunk
{
private final ByteBuffer _content;
private final boolean _isLast;
private final Throwable _failure;

public NonRetainableChunk(Content.Chunk chunk)
{
_content = BufferUtil.copy(chunk.getByteBuffer());
super(BufferUtil.copy(chunk.getByteBuffer()));
_isLast = chunk.isLast();
_failure = chunk.getFailure();
chunk.release();
}

@Override
public ByteBuffer getByteBuffer()
{
return _content;
}

@Override
public boolean isLast()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void onHeaders(HeadersFrame frame)
@Override
public void onData(DataFrame frame)
{
NetworkBuffer networkBuffer = producer.networkBuffer;
RetainableByteBuffer.Mutable networkBuffer = producer.networkBuffer;
session.onData(new StreamData(frame, networkBuffer));
}

Expand Down Expand Up @@ -304,15 +304,15 @@ public void onConnectionFailure(int error, String reason)
protected class HTTP2Producer implements ExecutionStrategy.Producer
{
private final Callback fillableCallback = new FillableCallback();
private NetworkBuffer networkBuffer;
private RetainableByteBuffer.Mutable networkBuffer;
private boolean shutdown;
private boolean failed;

private void setInputBuffer(ByteBuffer byteBuffer)
{
acquireNetworkBuffer();
// TODO handle buffer overflow?
networkBuffer.put(byteBuffer);
if (!networkBuffer.append(byteBuffer))
LOG.warn("overflow");
}

@Override
Expand All @@ -339,7 +339,7 @@ public Runnable produce()
{
while (networkBuffer.hasRemaining())
{
session.getParser().parse(networkBuffer.getBuffer());
session.getParser().parse(networkBuffer.getByteBuffer());
if (failed)
return null;
}
Expand All @@ -357,7 +357,7 @@ public Runnable produce()

// Here we know that this.networkBuffer is not retained by
// application code: either it has been released, or it's a new one.
int filled = fill(getEndPoint(), networkBuffer.getBuffer());
int filled = fill(getEndPoint(), networkBuffer.getByteBuffer());
if (LOG.isDebugEnabled())
LOG.debug("Filled {} bytes in {}", filled, networkBuffer);

Expand Down Expand Up @@ -391,30 +391,30 @@ private void acquireNetworkBuffer()
{
if (networkBuffer == null)
{
networkBuffer = new NetworkBuffer();
networkBuffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers()).asMutable();
if (LOG.isDebugEnabled())
LOG.debug("Acquired {}", networkBuffer);
}
}

private void reacquireNetworkBuffer()
{
NetworkBuffer currentBuffer = networkBuffer;
RetainableByteBuffer.Mutable currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();

if (currentBuffer.hasRemaining())
throw new IllegalStateException();

currentBuffer.release();
networkBuffer = new NetworkBuffer();
networkBuffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers()).asMutable();
gregw marked this conversation as resolved.
Show resolved Hide resolved
if (LOG.isDebugEnabled())
LOG.debug("Reacquired {}<-{}", currentBuffer, networkBuffer);
}

private void releaseNetworkBuffer()
{
NetworkBuffer currentBuffer = networkBuffer;
RetainableByteBuffer.Mutable currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();

Expand Down Expand Up @@ -472,69 +472,21 @@ public boolean canRetain()
}

@Override
public void retain()
{
retainable.retain();
}

@Override
public boolean release()
{
return retainable.release();
}
}

private class NetworkBuffer implements Retainable
{
private final RetainableByteBuffer delegate;

private NetworkBuffer()
{
delegate = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers());
}

public ByteBuffer getBuffer()
{
return delegate.getByteBuffer();
}

public boolean isRetained()
{
return delegate.isRetained();
}

public boolean hasRemaining()
{
return delegate.hasRemaining();
}

@Override
public boolean canRetain()
{
return delegate.canRetain();
return retainable.isRetained();
}

@Override
public void retain()
{
delegate.retain();
retainable.retain();
}

@Override
public boolean release()
{
if (delegate.release())
{
if (LOG.isDebugEnabled())
LOG.debug("Released retained {}", this);
return true;
}
return false;
}

private void put(ByteBuffer source)
{
BufferUtil.append(delegate.getByteBuffer(), source);
return retainable.release();
}
}
}
Loading
Loading