diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/PathRequestContent.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/PathRequestContent.java index f27039dfd25a..dd524335352e 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/PathRequestContent.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/PathRequestContent.java @@ -47,13 +47,17 @@ public PathRequestContent(String contentType, Path filePath) throws IOException public PathRequestContent(String contentType, Path filePath, int bufferSize) throws IOException { - this(contentType, filePath, null); - setBufferSize(bufferSize); + this(contentType, filePath, new ByteBufferPool.Sized(null, false, bufferSize)); } public PathRequestContent(String contentType, Path filePath, ByteBufferPool bufferPool) throws IOException { - super(filePath, bufferPool); + this(contentType, filePath, bufferPool instanceof ByteBufferPool.Sized sized ? sized : new ByteBufferPool.Sized(bufferPool)); + } + + public PathRequestContent(String contentType, Path filePath, ByteBufferPool.Sized sizedBufferPool) + { + super(filePath, sizedBufferPool); this.contentType = contentType; } diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/util/MultiPartRequestContentTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/util/MultiPartRequestContentTest.java index 3e427776d643..2ebbb45aec87 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/util/MultiPartRequestContentTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/util/MultiPartRequestContentTest.java @@ -41,6 +41,7 @@ import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.MultiPart; import org.eclipse.jetty.http.MultiPartFormData; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; @@ -283,8 +284,7 @@ protected void process(MultiPartFormData.Parts parts) throws Exception }); MultiPartRequestContent multiPart = new MultiPartRequestContent(); - PathRequestContent content = new PathRequestContent(contentType, tmpPath, client.getByteBufferPool()); - content.setUseDirectByteBuffers(client.isUseOutputDirectByteBuffers()); + PathRequestContent content = new PathRequestContent(contentType, tmpPath, new ByteBufferPool.Sized(client.getByteBufferPool(), client.isUseOutputDirectByteBuffers(), -1)); multiPart.addPart(new MultiPart.ContentSourcePart(name, tmpPath.getFileName().toString(), null, content)); multiPart.close(); ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) diff --git a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java index da534a781aaa..da5f40c8d439 100644 --- a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java +++ b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java @@ -36,8 +36,8 @@ import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.content.ByteBufferContentSource; +import org.eclipse.jetty.io.content.ByteChannelContentSource; import org.eclipse.jetty.io.content.ChunksContentSource; -import org.eclipse.jetty.io.content.PathContentSource; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.QuotedStringTokenizer; @@ -475,7 +475,8 @@ public Path getPath() @Override public Content.Source newContentSource() { - return new PathContentSource(getPath()); + // TODO: use a ByteBuffer pool and direct ByteBuffers? + return new ByteChannelContentSource.PathContentSource(getPath()); } @Override diff --git a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPartByteRanges.java b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPartByteRanges.java index e2de9ca80fb9..94aced30bf04 100644 --- a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPartByteRanges.java +++ b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPartByteRanges.java @@ -15,8 +15,6 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; import java.util.ArrayList; import java.util.Iterator; @@ -26,6 +24,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.IOResources; +import org.eclipse.jetty.io.content.ByteChannelContentSource; import org.eclipse.jetty.io.content.ContentSourceCompletableFuture; import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.thread.AutoLock; @@ -165,46 +164,14 @@ protected int fillBufferFromInputStream(InputStream inputStream, byte[] buffer) } /** - *

A specialized {@link org.eclipse.jetty.io.content.PathContentSource} + *

A specialized {@link org.eclipse.jetty.io.content.ByteChannelContentSource.PathContentSource} * whose content is sliced by a byte range.

*/ - public static class PathContentSource extends org.eclipse.jetty.io.content.PathContentSource + public static class PathContentSource extends ByteChannelContentSource.PathContentSource { - private final ByteRange byteRange; - private long toRead; - public PathContentSource(Path path, ByteRange byteRange) { - super(path); - this.byteRange = byteRange; - } - - @Override - protected SeekableByteChannel open() throws IOException - { - SeekableByteChannel channel = super.open(); - channel.position(byteRange.first()); - toRead = byteRange.getLength(); - return channel; - } - - @Override - protected int read(SeekableByteChannel channel, ByteBuffer byteBuffer) throws IOException - { - int read = super.read(channel, byteBuffer); - if (read <= 0) - return read; - - read = (int)Math.min(read, toRead); - toRead -= read; - byteBuffer.position(read); - return read; - } - - @Override - protected boolean isReadComplete(long read) - { - return read == byteRange.getLength(); + super(new ByteBufferPool.Sized(null), path, byteRange.first(), byteRange.getLength()); } } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java index 292fc7f7d0ce..671e174ba00a 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java @@ -16,6 +16,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import org.eclipse.jetty.util.BufferUtil; @@ -92,6 +93,56 @@ public void clear() } } + /** + * A ByteBufferPool with an additional no-args {@link #acquire()} method to obtain a buffer of a + * preconfigured specific size and type. + */ + class Sized extends Wrapper + { + private final boolean _direct; + private final int _size; + + /** + * Create a sized pool for non direct buffers of a default size from a wrapped pool. + * @param wrapped The actual {@link ByteBufferPool} + */ + public Sized(ByteBufferPool wrapped) + { + this(wrapped, false, -1); + } + + /** + * Create a sized pool for a give directness and size from a wrapped pool. + * @param wrapped The actual {@link ByteBufferPool} + * @param direct {@code true} for direct buffers. + * @param size The specified size in bytes of the buffer, or -1 for a default + */ + public Sized(ByteBufferPool wrapped, boolean direct, int size) + { + super(Objects.requireNonNullElse(wrapped, NON_POOLING)); + _direct = direct; + _size = size > 0 ? size : 4096; + } + + public boolean isDirect() + { + return _direct; + } + + public int getSize() + { + return _size; + } + + /** + * @return A {@link RetainableByteBuffer} suitable for the specified preconfigured size and type. + */ + public RetainableByteBuffer acquire() + { + return getWrapped().acquire(_size, _direct); + } + } + /** *

A {@link ByteBufferPool} that does not pool its * {@link RetainableByteBuffer}s.

diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java index 42fdcda26eee..f0d52ffbf65a 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java @@ -21,8 +21,8 @@ import java.nio.file.Path; import org.eclipse.jetty.io.content.ByteBufferContentSource; +import org.eclipse.jetty.io.content.ByteChannelContentSource; import org.eclipse.jetty.io.content.InputStreamContentSource; -import org.eclipse.jetty.io.content.PathContentSource; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; @@ -137,13 +137,7 @@ public static Content.Source asContentSource(Resource resource, ByteBufferPool b Path path = resource.getPath(); if (path != null) { - PathContentSource pathContentSource = new PathContentSource(path, bufferPool); - if (bufferSize > 0) - { - pathContentSource.setBufferSize(bufferSize); - pathContentSource.setUseDirectByteBuffers(direct); - } - return pathContentSource; + return new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path); } if (resource instanceof MemoryResource memoryResource) { @@ -187,13 +181,7 @@ public static Content.Source asContentSource(Resource resource, ByteBufferPool b Path path = resource.getPath(); if (path != null) { - RangedPathContentSource contentSource = new RangedPathContentSource(path, bufferPool, first, length); - if (bufferSize > 0) - { - contentSource.setBufferSize(bufferSize); - contentSource.setUseDirectByteBuffers(direct); - } - return contentSource; + return new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path, first, length); } // Try an optimization for MemoryResource. @@ -206,13 +194,7 @@ public static Content.Source asContentSource(Resource resource, ByteBufferPool b InputStream inputStream = resource.newInputStream(); if (inputStream == null) throw new IllegalArgumentException("Resource does not support InputStream: " + resource); - RangedInputStreamContentSource contentSource = new RangedInputStreamContentSource(inputStream, bufferPool, first, length); - if (bufferSize > 0) - { - contentSource.setBufferSize(bufferSize); - contentSource.setUseDirectByteBuffers(direct); - } - return contentSource; + return new RangedInputStreamContentSource(inputStream, new ByteBufferPool.Sized(bufferPool, direct, bufferSize), first, length); } catch (IOException e) { @@ -430,57 +412,6 @@ protected void onCompleteFailure(Throwable x) } } - /** - *

A specialized {@link PathContentSource} - * whose content is sliced by a byte range.

- */ - private static class RangedPathContentSource extends PathContentSource - { - private final long first; - private final long length; - private long toRead; - - public RangedPathContentSource(Path path, ByteBufferPool bufferPool, long first, long length) - { - super(path, bufferPool); - // TODO perform sanity checks on first and length? - this.first = first; - this.length = length; - } - - @Override - protected SeekableByteChannel open() throws IOException - { - SeekableByteChannel channel = super.open(); - if (first > -1) - channel.position(first); - toRead = length; - return channel; - } - - @Override - protected int read(SeekableByteChannel channel, ByteBuffer byteBuffer) throws IOException - { - int read = super.read(channel, byteBuffer); - if (read <= 0) - return read; - - read = (int)Math.min(read, toRead); - if (read > -1) - { - toRead -= read; - byteBuffer.position(read); - } - return read; - } - - @Override - protected boolean isReadComplete(long read) - { - return read == length; - } - } - /** *

A specialized {@link InputStreamContentSource} * whose content is sliced by a byte range.

diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ByteChannelContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ByteChannelContentSource.java new file mode 100644 index 000000000000..db0bed464f67 --- /dev/null +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ByteChannelContentSource.java @@ -0,0 +1,297 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io.content; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Objects; + +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.ExceptionUtil; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.SerializedInvoker; + +/** + *

A {@link Content.Source} backed by a {@link ByteChannel}. + * Any calls to {@link #demand(Runnable)} are immediately satisfied.

+ */ +public class ByteChannelContentSource implements Content.Source +{ + private final AutoLock lock = new AutoLock(); + private final SerializedInvoker _invoker = new SerializedInvoker(); + private final ByteBufferPool.Sized _byteBufferPool; + private ByteChannel _byteChannel; + private final long _offset; + private final long _length; + private RetainableByteBuffer _buffer; + private long _totalRead; + private Runnable demandCallback; + private Content.Chunk _terminal; + + public ByteChannelContentSource(SeekableByteChannel seekableByteChannel, long offset, long length) + { + this(new ByteBufferPool.Sized(null), seekableByteChannel, offset, length); + } + + public ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, SeekableByteChannel seekableByteChannel, long offset, long length) + { + this(byteBufferPool, (ByteChannel)seekableByteChannel, offset, length); + if (offset >= 0 && seekableByteChannel != null) + { + try + { + seekableByteChannel.position(offset); + } + catch (IOException e) + { + // lock not needed in constructor + lockedSetTerminal(Content.Chunk.from(e, true)); + } + } + } + + public ByteChannelContentSource(ByteChannel byteChannel) + { + this(new ByteBufferPool.Sized(null), byteChannel, -1L, -1L); + } + + public ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, ByteChannel byteChannel) + { + this(byteBufferPool, byteChannel, -1L, -1L); + } + + private ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, ByteChannel byteChannel, long offset, long length) + { + _byteBufferPool = Objects.requireNonNull(byteBufferPool); + _byteChannel = byteChannel; + _offset = offset < 0 ? 0 : offset; + _length = length; + } + + protected ByteChannel open() throws IOException + { + return _byteChannel; + } + + @Override + public void demand(Runnable demandCallback) + { + try (AutoLock ignored = lock.lock()) + { + if (this.demandCallback != null) + throw new IllegalStateException("demand pending"); + this.demandCallback = demandCallback; + } + _invoker.run(this::invokeDemandCallback); + } + + private void invokeDemandCallback() + { + Runnable demandCallback; + try (AutoLock ignored = lock.lock()) + { + demandCallback = this.demandCallback; + this.demandCallback = null; + } + if (demandCallback != null) + ExceptionUtil.run(demandCallback, this::fail); + } + + protected void lockedSetTerminal(Content.Chunk terminal) + { + if (_terminal == null) + _terminal = Objects.requireNonNull(terminal); + else + ExceptionUtil.addSuppressedIfNotAssociated(_terminal.getFailure(), terminal.getFailure()); + IO.close(_byteChannel); + if (_buffer != null) + _buffer.release(); + _buffer = null; + } + + private void lockedEnsureOpenOrTerminal() + { + if (_terminal == null && (_byteChannel == null || !_byteChannel.isOpen())) + { + try + { + _byteChannel = open(); + if (_byteChannel == null || !_byteChannel.isOpen()) + lockedSetTerminal(Content.Chunk.from(new ClosedChannelException(), true)); + else if (_offset >= 0 && _byteChannel instanceof SeekableByteChannel seekableByteChannel) + seekableByteChannel.position(_offset); + } + catch (IOException e) + { + lockedSetTerminal(Content.Chunk.from(e, true)); + } + } + } + + @Override + public Content.Chunk read() + { + try (AutoLock ignored = lock.lock()) + { + lockedEnsureOpenOrTerminal(); + if (_terminal != null) + return _terminal; + + if (_buffer == null) + { + _buffer = _byteBufferPool.acquire(); + } + else if (_buffer.isRetained()) + { + _buffer.release(); + _buffer = _byteBufferPool.acquire(); + } + + try + { + ByteBuffer byteBuffer = _buffer.getByteBuffer(); + BufferUtil.clearToFill(byteBuffer); + if (_length >= 0) + byteBuffer.limit((int)Math.min(_buffer.capacity(), _length - _totalRead)); + int read = _byteChannel.read(byteBuffer); + BufferUtil.flipToFlush(byteBuffer, 0); + if (read == 0) + return null; + if (read > 0) + { + _totalRead += read; + _buffer.retain(); + if (_length < 0 || _totalRead < _length) + return Content.Chunk.asChunk(byteBuffer, false, _buffer); + + Content.Chunk last = Content.Chunk.asChunk(byteBuffer, true, _buffer); + lockedSetTerminal(Content.Chunk.EOF); + return last; + } + lockedSetTerminal(Content.Chunk.EOF); + } + catch (Throwable t) + { + lockedSetTerminal(Content.Chunk.from(t, true)); + } + } + return _terminal; + } + + @Override + public void fail(Throwable failure) + { + try (AutoLock ignored = lock.lock()) + { + lockedSetTerminal(Content.Chunk.from(failure, true)); + } + } + + @Override + public long getLength() + { + return _length; + } + + @Override + public boolean rewind() + { + try (AutoLock ignored = lock.lock()) + { + // We can remove terminal condition for a rewind that is likely to occur + if (_terminal != null && !Content.Chunk.isFailure(_terminal) && (_byteChannel == null || _byteChannel instanceof SeekableByteChannel)) + _terminal = null; + + lockedEnsureOpenOrTerminal(); + if (_terminal != null || _byteChannel == null || !_byteChannel.isOpen()) + return false; + + if (_offset >= 0 && _byteChannel instanceof SeekableByteChannel seekableByteChannel) + { + try + { + seekableByteChannel.position(_offset); + _totalRead = 0; + return true; + } + catch (Throwable t) + { + lockedSetTerminal(Content.Chunk.from(t, true)); + } + } + return false; + } + } + + /** + * A {@link ByteChannelContentSource} for a {@link Path} + * @deprecated To be replaced by an updated {@link org.eclipse.jetty.io.content.PathContentSource} in 12.1.0 + */ + @Deprecated(forRemoval = true, since = "12.0.11") + public static class PathContentSource extends ByteChannelContentSource + { + private final Path _path; + + public PathContentSource(Path path) + { + super(new ByteBufferPool.Sized(null), null, 0, size(path)); + _path = path; + } + + public PathContentSource(ByteBufferPool.Sized byteBufferPool, Path path) + { + super(byteBufferPool, null, 0, size(path)); + _path = path; + } + + public PathContentSource(ByteBufferPool.Sized byteBufferPool, Path path, long offset, long length) + { + super(byteBufferPool, null, offset, length); + _path = path; + } + + public Path getPath() + { + return _path; + } + + @Override + protected ByteChannel open() throws IOException + { + return Files.newByteChannel(_path, StandardOpenOption.READ); + } + + private static long size(Path path) + { + try + { + return Files.size(path); + } + catch (IOException e) + { + return -1L; + } + } + } +} diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java index 76ee447421fe..a00680251cf4 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java @@ -20,6 +20,7 @@ import java.util.Objects; import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.SerializedInvoker; @@ -111,19 +112,7 @@ private void invokeDemandCallback() this.demandCallback = null; } if (demandCallback != null) - runDemandCallback(demandCallback); - } - - private void runDemandCallback(Runnable demandCallback) - { - try - { - demandCallback.run(); - } - catch (Throwable x) - { - fail(x); - } + ExceptionUtil.run(demandCallback, this::fail); } @Override diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java index e34d5903c6c8..760e4f1f42d6 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java @@ -16,6 +16,7 @@ import java.util.Objects; import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.thread.SerializedInvoker; /** @@ -114,19 +115,7 @@ private void invokeDemandCallback() Runnable demandCallback = this.demandCallback; this.demandCallback = null; if (demandCallback != null) - runDemandCallback(demandCallback); - } - - private void runDemandCallback(Runnable demandCallback) - { - try - { - demandCallback.run(); - } - catch (Throwable x) - { - fail(x); - } + ExceptionUtil.run(demandCallback, this::fail); } private Content.Chunk process(Content.Chunk rawChunk) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java index eebe34365862..61d0785deb72 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java @@ -21,6 +21,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.SerializedInvoker; @@ -39,44 +40,65 @@ public class InputStreamContentSource implements Content.Source private final AutoLock lock = new AutoLock(); private final SerializedInvoker invoker = new SerializedInvoker(); private final InputStream inputStream; - private final ByteBufferPool bufferPool; - private int bufferSize = 4096; - private boolean useDirectByteBuffers; + private ByteBufferPool.Sized bufferPool; private Runnable demandCallback; private Content.Chunk errorChunk; private boolean closed; public InputStreamContentSource(InputStream inputStream) { - this(inputStream, null); + this(inputStream, new ByteBufferPool.Sized(null)); } public InputStreamContentSource(InputStream inputStream, ByteBufferPool bufferPool) + { + this(inputStream, bufferPool instanceof ByteBufferPool.Sized sized ? sized : new ByteBufferPool.Sized(bufferPool)); + } + + public InputStreamContentSource(InputStream inputStream, ByteBufferPool.Sized bufferPool) { this.inputStream = Objects.requireNonNull(inputStream); - this.bufferPool = bufferPool != null ? bufferPool : ByteBufferPool.NON_POOLING; + this.bufferPool = Objects.requireNonNull(bufferPool); } public int getBufferSize() { - return bufferSize; + return bufferPool.getSize(); } + /** + * @param bufferSize The size of the buffer + * @deprecated Use {@link InputStreamContentSource#InputStreamContentSource(InputStream, ByteBufferPool.Sized)} + */ + @Deprecated(forRemoval = true) public void setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; + try (AutoLock ignored = lock.lock()) + { + if (bufferSize != bufferPool.getSize()) + bufferPool = new ByteBufferPool.Sized(bufferPool.getWrapped(), bufferPool.isDirect(), bufferSize); + } } public boolean isUseDirectByteBuffers() { - return useDirectByteBuffers; + return bufferPool.isDirect(); } + /** + * @param useDirectByteBuffers {@code true} if direct buffers will be used. + * @deprecated Use {@link InputStreamContentSource#InputStreamContentSource(InputStream, ByteBufferPool.Sized)} + */ + @Deprecated(forRemoval = true, since = "12.0.11") public void setUseDirectByteBuffers(boolean useDirectByteBuffers) { - this.useDirectByteBuffers = useDirectByteBuffers; + try (AutoLock ignored = lock.lock()) + { + if (useDirectByteBuffers != bufferPool.isDirect()) + bufferPool = new ByteBufferPool.Sized(bufferPool.getWrapped(), useDirectByteBuffers, bufferPool.getSize()); + } } - + @Override public Content.Chunk read() { @@ -88,7 +110,7 @@ public Content.Chunk read() return Content.Chunk.EOF; } - RetainableByteBuffer streamBuffer = bufferPool.acquire(getBufferSize(), useDirectByteBuffers); + RetainableByteBuffer streamBuffer = bufferPool.acquire(); try { ByteBuffer buffer = streamBuffer.getByteBuffer(); @@ -147,19 +169,7 @@ private void invokeDemandCallback() this.demandCallback = null; } if (demandCallback != null) - runDemandCallback(demandCallback); - } - - private void runDemandCallback(Runnable demandCallback) - { - try - { - demandCallback.run(); - } - catch (Throwable x) - { - fail(x); - } + ExceptionUtil.run(demandCallback, this::fail); } @Override diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java index 46eeda7781b5..f731280f737e 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java @@ -14,6 +14,7 @@ package org.eclipse.jetty.io.content; import java.io.IOException; +import java.io.InputStream; import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; @@ -27,6 +28,7 @@ import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.SerializedInvoker; @@ -36,13 +38,15 @@ */ public class PathContentSource implements Content.Source { + // TODO in 12.1.x reimplement this class based on ByteChannelContentSource + private final AutoLock lock = new AutoLock(); private final SerializedInvoker invoker = new SerializedInvoker(); private final Path path; private final long length; private final ByteBufferPool byteBufferPool; - private int bufferSize = 4096; - private boolean useDirectByteBuffers = true; + private int bufferSize; + private boolean useDirectByteBuffers; private SeekableByteChannel channel; private long totalRead; private Runnable demandCallback; @@ -50,10 +54,26 @@ public class PathContentSource implements Content.Source public PathContentSource(Path path) { - this(path, null); + this(path, null, true, -1); } public PathContentSource(Path path, ByteBufferPool byteBufferPool) + { + this(path, + byteBufferPool instanceof ByteBufferPool.Sized sized ? sized.getWrapped() : byteBufferPool, + byteBufferPool instanceof ByteBufferPool.Sized sized ? sized.isDirect() : true, + byteBufferPool instanceof ByteBufferPool.Sized sized ? sized.getSize() : -1); + } + + public PathContentSource(Path path, ByteBufferPool.Sized sizedBufferPool) + { + this(path, + sizedBufferPool == null ? null : sizedBufferPool.getWrapped(), + sizedBufferPool == null ? true : sizedBufferPool.isDirect(), + sizedBufferPool == null ? -1 : sizedBufferPool.getSize()); + } + + private PathContentSource(Path path, ByteBufferPool byteBufferPool, boolean direct, int bufferSize) { try { @@ -63,7 +83,10 @@ public PathContentSource(Path path, ByteBufferPool byteBufferPool) throw new AccessDeniedException(path.toString()); this.path = path; this.length = Files.size(path); + this.byteBufferPool = byteBufferPool != null ? byteBufferPool : ByteBufferPool.NON_POOLING; + this.useDirectByteBuffers = direct; + this.bufferSize = bufferSize > 0 ? bufferSize : 4096; } catch (IOException x) { @@ -87,6 +110,11 @@ public int getBufferSize() return bufferSize; } + /** + * @param bufferSize The size of the buffer + * @deprecated Use {@link InputStreamContentSource#InputStreamContentSource(InputStream, ByteBufferPool.Sized)} + */ + @Deprecated(forRemoval = true) public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; @@ -97,6 +125,11 @@ public boolean isUseDirectByteBuffers() return useDirectByteBuffers; } + /** + * @param useDirectByteBuffers {@code true} if direct buffers should be used + * @deprecated Use {@link InputStreamContentSource#InputStreamContentSource(InputStream, ByteBufferPool.Sized)} + */ + @Deprecated(forRemoval = true) public void setUseDirectByteBuffers(boolean useDirectByteBuffers) { this.useDirectByteBuffers = useDirectByteBuffers; @@ -190,19 +223,7 @@ private void invokeDemandCallback() this.demandCallback = null; } if (demandCallback != null) - runDemandCallback(demandCallback); - } - - private void runDemandCallback(Runnable demandCallback) - { - try - { - demandCallback.run(); - } - catch (Throwable x) - { - fail(x); - } + ExceptionUtil.run(demandCallback, this::fail); } @Override diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java index c57fae5cc023..b4b59f851d4b 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java @@ -15,6 +15,7 @@ import org.eclipse.jetty.io.Content; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.IteratingNestedCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,9 @@ public InvocationType getInvocationType() @Override protected Action process() throws Throwable { + if (current != null) + current.release(); + if (terminated) return Action.SUCCEEDED; @@ -53,36 +57,33 @@ protected Action process() throws Throwable if (current == null) { - source.demand(this::iterate); - return Action.IDLE; + source.demand(this::succeeded); + return Action.SCHEDULED; } if (chunkProcessor != null && chunkProcessor.process(current, this)) return Action.SCHEDULED; + terminated = current.isLast(); + if (Content.Chunk.isFailure(current)) - throw current.getFailure(); + { + failed(current.getFailure()); + return Action.SCHEDULED; + } sink.write(current.isLast(), current.getByteBuffer(), this); return Action.SCHEDULED; } @Override - public void succeeded() - { - terminated = current.isLast(); - current.release(); - current = null; - super.succeeded(); - } - - @Override - public void failed(Throwable x) + protected void onCompleteFailure(Throwable x) { if (current != null) + { current.release(); - current = null; - source.fail(x); - super.failed(x); + current = Content.Chunk.next(current); + } + ExceptionUtil.callAndThen(x, source::fail, super::onCompleteFailure); } } diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTest.java index 45799c4f1fcc..ae20cf57d004 100644 --- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTest.java +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTest.java @@ -22,6 +22,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.concurrent.CancellationException; @@ -35,6 +37,7 @@ import org.eclipse.jetty.io.content.AsyncContent; import org.eclipse.jetty.io.content.ByteBufferContentSource; +import org.eclipse.jetty.io.content.ByteChannelContentSource; import org.eclipse.jetty.io.content.ContentSourceInputStream; import org.eclipse.jetty.io.content.ContentSourceTransformer; import org.eclipse.jetty.io.content.InputStreamContentSource; @@ -46,6 +49,9 @@ import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.IO; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -53,6 +59,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -65,7 +72,40 @@ public class ContentSourceTest { + private static ArrayByteBufferPool.Tracking byteBufferPool; + + @BeforeEach + public void beforeEach() + { + byteBufferPool = new ArrayByteBufferPool.Tracking(); + } + + @AfterEach + public void afterEach() + { + if (!byteBufferPool.getLeaks().isEmpty()) + byteBufferPool.dumpLeaks(); + assertThat(byteBufferPool.getLeaks(), empty()); + byteBufferPool.clear(); + byteBufferPool = null; + } + public static List all() throws Exception + { + return sources("all"); + } + + public static List multi() throws Exception + { + return sources("multi"); + } + + public static List rewind() throws Exception + { + return sources("rewind"); + } + + private static List sources(String mode) throws Exception { AsyncContent asyncSource = new AsyncContent(); try (asyncSource) @@ -93,17 +133,61 @@ public String toString() Path tmpDir = MavenTestingUtils.getTargetTestingPath(); Files.createDirectories(tmpDir); - Path path = Files.createTempFile(tmpDir, ContentSourceTest.class.getSimpleName(), ".txt"); - Files.writeString(path, "onetwo", StandardOpenOption.CREATE, StandardOpenOption.WRITE); - PathContentSource pathSource = new PathContentSource(path); - pathSource.setBufferSize(3); + Path path12 = Files.createTempFile(tmpDir, ContentSourceTest.class.getSimpleName(), ".txt"); + Files.writeString(path12, "onetwo", StandardOpenOption.CREATE, StandardOpenOption.WRITE); + Path path0123 = Files.createTempFile(tmpDir, ContentSourceTest.class.getSimpleName(), ".txt"); + Files.writeString(path0123, "zeroonetwothree", StandardOpenOption.CREATE, StandardOpenOption.WRITE); - InputStreamContentSource inputSource = new InputStreamContentSource(new ByteArrayInputStream("onetwo".getBytes(UTF_8))); + PathContentSource path0 = new PathContentSource(path12, byteBufferPool); + PathContentSource path1 = new PathContentSource(path12, byteBufferPool); + path1.setBufferSize(3); + InputStreamContentSource inputSource = new InputStreamContentSource(new ByteArrayInputStream("onetwo".getBytes(UTF_8))); InputStreamContentSource inputSource2 = new InputStreamContentSource(new ContentSourceInputStream(new ByteBufferContentSource(UTF_8.encode("one"), UTF_8.encode("two")))); - return List.of(asyncSource, byteBufferSource, transformerSource, pathSource, inputSource, inputSource2); + ByteChannelContentSource bccs0 = new ByteChannelContentSource(new ByteBufferPool.Sized(byteBufferPool, false, 1024), Files.newByteChannel(path12, StandardOpenOption.READ)); + ByteChannelContentSource bccs1 = new ByteChannelContentSource(new ByteBufferPool.Sized(byteBufferPool, false, 4096), Files.newByteChannel(path12, StandardOpenOption.READ), 0, 6); + ByteChannelContentSource bccs2 = new ByteChannelContentSource(new ByteBufferPool.Sized(byteBufferPool, false, 8192), Files.newByteChannel(path0123, StandardOpenOption.READ), 4, 6); + ByteChannelContentSource bccs3 = new ByteChannelContentSource(new ByteBufferPool.Sized(null, false, 3), Files.newByteChannel(path0123, StandardOpenOption.READ), 4, 6); + + ByteChannelContentSource.PathContentSource pcs0 = new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(byteBufferPool, false, 1024), path12); + ByteChannelContentSource.PathContentSource pcs1 = new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(byteBufferPool, false, 1024), path0123, 4, 6); + ByteChannelContentSource.PathContentSource pcs2 = new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(null, false, 3), path12); + + return switch (mode) + { + case "rewind" -> List.of( + byteBufferSource, + path1, + bccs3, + pcs2); + case "multi" -> List.of( + asyncSource, + byteBufferSource, + transformerSource, + path1, + inputSource, + inputSource2, + bccs3, + pcs2); + case "all" -> List.of( + asyncSource, + byteBufferSource, + transformerSource, + path0, + path1, + inputSource, + inputSource2, + bccs0, + bccs1, + bccs2, + bccs3, + pcs0, + pcs1, + pcs2); + default -> Collections.emptyList(); + }; } /** @@ -169,6 +253,104 @@ public void run() assertThat(builder.toString(), is("onetwo")); } + @ParameterizedTest + @MethodSource("rewind") + public void testReadRewindReadAll(Content.Source source) throws Exception + { + StringBuilder builder = new StringBuilder(); + var task = new CompletableTask<>() + { + @Override + public void run() + { + while (true) + { + Content.Chunk chunk = source.read(); + if (chunk == null) + { + source.demand(this); + break; + } + + if (chunk.hasRemaining() && builder.isEmpty()) + assertTrue(source.rewind()); + + if (chunk.hasRemaining()) + builder.append(BufferUtil.toString(chunk.getByteBuffer())); + chunk.release(); + + if (chunk.isLast()) + { + complete(null); + break; + } + } + } + }; + source.demand(task); + task.get(10, TimeUnit.SECONDS); + assertThat(builder.toString(), is("oneonetwo")); + } + + @ParameterizedTest + @MethodSource("rewind") + public void testReadAllRewindReadAll(Content.Source source) throws Exception + { + // A raw BCCS cannot be rewound if fully consumed, as it is not able to re-open a passed in channel + Assumptions.assumeTrue(!(source instanceof ByteChannelContentSource) || source instanceof ByteChannelContentSource.PathContentSource); + + String first = Content.Source.asString(source); + assertThat(first, is("onetwo")); + source.rewind(); + String second = Content.Source.asString(source); + assertThat(second, is("onetwo")); + } + + @ParameterizedTest + @MethodSource("all") + public void testReadRetain(Content.Source source) throws Exception + { + List chunks = new ArrayList<>(); + + var task = new CompletableTask<>() + { + @Override + public void run() + { + while (true) + { + Content.Chunk chunk = source.read(); + if (chunk == null) + { + source.demand(this); + break; + } + + if (chunk.hasRemaining()) + chunks.add(chunk); + + if (chunk.isLast()) + { + complete(null); + break; + } + } + } + }; + source.demand(task); + + task.get(10, TimeUnit.SECONDS); + + StringBuilder builder = new StringBuilder(); + for (Content.Chunk chunk : chunks) + { + if (chunk.hasRemaining()) + builder.append(BufferUtil.toString(chunk.getByteBuffer())); + chunk.release(); + } + assertThat(builder.toString(), is("onetwo")); + } + @ParameterizedTest @MethodSource("all") public void testDemandReadDemandDoesNotRecurse(Content.Source source) throws Exception @@ -226,7 +408,7 @@ public void run() } @ParameterizedTest - @MethodSource("all") + @MethodSource("multi") public void testReadFailReadReturnsError(Content.Source source) throws Exception { Content.Chunk chunk = nextChunk(source); @@ -240,6 +422,17 @@ public void testReadFailReadReturnsError(Content.Source source) throws Exception assertTrue(Content.Chunk.isFailure(chunk, true)); } + @ParameterizedTest + @MethodSource("all") + public void testFailReadReturnsError(Content.Source source) throws Exception + { + source.fail(new CancellationException()); + + // We must read the error. + Content.Chunk chunk = source.read(); + assertTrue(Content.Chunk.isFailure(chunk, true)); + } + @ParameterizedTest @MethodSource("all") public void testReadLastDemandInvokesDemandCallback(Content.Source source) throws Exception @@ -268,13 +461,9 @@ public void testReadErrorDemandInvokesDemandCallback(Content.Source source) thro } @ParameterizedTest - @MethodSource("all") - public void testDemandCallbackThrows(Content.Source source) throws Exception + @MethodSource("multi") + public void testReadDemandCallbackThrows(Content.Source source) throws Exception { - // TODO fix for OSCS -// if (source instanceof OutputStreamContentSource) -// return; - Content.Chunk chunk = nextChunk(source); assertNotNull(chunk); chunk.release(); @@ -288,6 +477,19 @@ public void testDemandCallbackThrows(Content.Source source) throws Exception assertTrue(Content.Chunk.isFailure(chunk, true)); } + @ParameterizedTest + @MethodSource("all") + public void testDemandCallbackThrows(Content.Source source) throws Exception + { + source.demand(() -> + { + throw new CancellationException(); + }); + + Content.Chunk chunk = source.read(); + assertTrue(Content.Chunk.isFailure(chunk, true)); + } + @Test public void testSimple() { diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/internal/ContentCopierTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/internal/ContentCopierTest.java index 029b953a038a..63002c38c333 100644 --- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/internal/ContentCopierTest.java +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/internal/ContentCopierTest.java @@ -16,21 +16,50 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.TestSink; import org.eclipse.jetty.io.TestSource; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.fail; public class ContentCopierTest { + @Test + public void testSimpleCopy() throws Exception + { + TimeoutException originalFailure = new TimeoutException("timeout"); + TestSource originalSource = new TestSource( + Content.Chunk.from(BufferUtil.toBuffer("How "), false), + null, + Content.Chunk.from(BufferUtil.toBuffer("now "), false), + null, + Content.Chunk.from(BufferUtil.toBuffer("brown "), false), + Content.Chunk.from(BufferUtil.toBuffer("cow."), true) + ); + + Callback.Completable callback = new Callback.Completable(); + TestSink resultSink = new TestSink(); + ContentCopier contentCopier = new ContentCopier(originalSource, resultSink, null, callback); + contentCopier.iterate(); + + callback.get(5, TimeUnit.SECONDS); + + StringBuilder result = new StringBuilder(); + for (Content.Chunk chunk : resultSink.takeAccumulatedChunks()) + result.append(BufferUtil.toString(chunk.getByteBuffer())); + assertThat(result.toString(), equalTo("How now brown cow.")); + } + @Test public void testTransientErrorsBecomeTerminalErrors() throws Exception { diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientStreamTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientStreamTest.java index 216b24cc1190..43bb16e32b22 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientStreamTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientStreamTest.java @@ -226,8 +226,6 @@ public boolean handle(Request request, org.eclipse.jetty.server.Response respons @ParameterizedTest @MethodSource("transports") - @Tag("DisableLeakTracking:client:H2") - @Tag("DisableLeakTracking:client:H2C") public void testDownloadWithFailure(Transport transport) throws Exception { byte[] data = new byte[64 * 1024]; diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java index 069e844da717..e677da1c6d26 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import org.eclipse.jetty.util.thread.Invocable; @@ -148,6 +149,16 @@ public static void ifExceptionThrowAllAs(Class type, Th throw as(type, throwable); } + /** Check if two {@link Throwable}s are associated. + * @param t1 A Throwable or null + * @param t2 Another Throwable or null + * @return true iff the exceptions are not associated by being the same instance, sharing a cause or one suppressing the other. + */ + public static boolean areAssociated(Throwable t1, Throwable t2) + { + return t1 != null && t2 != null && !areNotAssociated(t1, t2); + } + /** Check if two {@link Throwable}s are associated. * @param t1 A Throwable or null * @param t2 Another Throwable or null @@ -256,15 +267,7 @@ public void ifExceptionThrowAs(Class type) throws T public void callAndCatch(Invocable.Callable task) { - try - { - if (task != null) - task.call(); - } - catch (Throwable t) - { - add(t); - } + ExceptionUtil.call(task, this::add); } } @@ -300,8 +303,102 @@ public static Throwable combine(Throwable t1, Throwable t2) return t1; } - private ExceptionUtil() + public static void callAndThen(Throwable cause, Consumer first, Consumer second) + { + try + { + first.accept(cause); + } + catch (Throwable t) + { + addSuppressedIfNotAssociated(cause, t); + } + finally + { + second.accept(cause); + } + } + + public static void callAndThen(Throwable cause, Consumer first, Runnable second) + { + try + { + first.accept(cause); + } + catch (Throwable t) + { + addSuppressedIfNotAssociated(cause, t); + } + finally + { + second.run(); + } + } + + public static void callAndThen(Runnable first, Runnable second) + { + try + { + first.run(); + } + catch (Throwable t) + { + // ignored + } + finally + { + second.run(); + } + } + + /** + * Call a {@link Invocable.Callable} and handle failures + * @param callable The runnable to call + * @param failure The handling of failures + */ + public static void call(Invocable.Callable callable, Consumer failure) { + try + { + callable.call(); + } + catch (Throwable thrown) + { + try + { + failure.accept(thrown); + } + catch (Throwable alsoThrown) + { + ExceptionUtil.addSuppressedIfNotAssociated(alsoThrown, thrown); + ExceptionUtil.ifExceptionThrowUnchecked(alsoThrown); + } + } + } + + /** + * Call a {@link Runnable} and handle failures + * @param runnable The runnable to call + * @param failure The handling of failures + */ + public static void run(Runnable runnable, Consumer failure) + { + try + { + runnable.run(); + } + catch (Throwable thrown) + { + try + { + failure.accept(thrown); + } + catch (Throwable alsoThrown) + { + ExceptionUtil.addSuppressedIfNotAssociated(alsoThrown, thrown); + ExceptionUtil.ifExceptionThrowUnchecked(alsoThrown); + } + } } /** @@ -326,4 +423,8 @@ public static T get(CompletableFuture completableFuture) throw new RuntimeException(e.getCause()); } } + + private ExceptionUtil() + { + } }