diff --git a/src/intTest/java/com/box/sdk/BoxFolderIT.java b/src/intTest/java/com/box/sdk/BoxFolderIT.java index 2426ee147..f3327eac8 100644 --- a/src/intTest/java/com/box/sdk/BoxFolderIT.java +++ b/src/intTest/java/com/box/sdk/BoxFolderIT.java @@ -13,8 +13,10 @@ import static com.box.sdk.UniqueTestFolder.randomizeName; import static com.box.sdk.UniqueTestFolder.removeUniqueFolder; import static com.box.sdk.UniqueTestFolder.setupUniqeFolder; +import static com.box.sdk.UniqueTestFolder.uploadFileToUniqueFolder; import static com.box.sdk.UniqueTestFolder.uploadFileToUniqueFolderWithSomeContent; import static com.box.sdk.UniqueTestFolder.uploadFileWithSomeContent; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.emptyOrNullString; @@ -32,9 +34,13 @@ import com.box.sdk.BoxCollaboration.Role; import com.box.sdk.sharedlink.BoxSharedLinkRequest; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.net.MalformedURLException; import java.net.URL; import java.text.SimpleDateFormat; @@ -48,7 +54,10 @@ import java.util.Map; import java.util.TimeZone; import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import org.hamcrest.Matchers; import org.junit.AfterClass; import org.junit.Before; @@ -173,7 +182,7 @@ public void uploadFileSucceeds() { BoxFile uploadedFile = null; try { - uploadedFile = uploadFileToUniqueFolderWithSomeContent(api, "Test File.txt"); + uploadedFile = uploadFileToUniqueFolderWithSomeContent(api, randomizeName("Test File")); assertThat(rootFolder, hasItem(Matchers.hasProperty("ID", equalTo(uploadedFile.getID())))); } finally { @@ -192,7 +201,7 @@ public void uploadFileUploadFileCallbackSucceeds() { try { - final String fileContent = "Test file"; + final String fileContent = randomizeName("Test file"); uploadedFile = rootFolder.uploadFile(outputStream -> { outputStream.write(fileContent.getBytes()); callbackWasCalled.set(true); @@ -757,6 +766,142 @@ public void iterateWithMarker() { } } + @Test + public void uploadFileVersionInSeparateThreadsSucceeds() throws IOException, InterruptedException { + BoxAPIConnection api = jwtApiForServiceAccount(); + Semaphore semaphore = new Semaphore(0); + + PipedOutputStream outputStream = new PipedOutputStream(); + PipedInputStream inputStream = new PipedInputStream(); + outputStream.connect(inputStream); + + String fileContent = "This is only a test"; + final BoxFile uploadedFile = uploadFileToUniqueFolder(api, randomizeName("Test File"), fileContent); + + new Thread( + () -> { + try { + new BoxFile(api, uploadedFile.getID()).download(outputStream); + } finally { + try { + outputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }).start(); + + new Thread( + () -> { + new BoxFile(api, uploadedFile.getID()).uploadNewVersion(inputStream); + try { + inputStream.close(); + semaphore.release(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).start(); + + + semaphore.acquire(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + new BoxFile(api, uploadedFile.getID()).download(output); + assertThat(output.toString(), is(fileContent)); + } + + @Test + public void uploadFileVersionWithProgressInSeparateThreadsSucceeds() throws IOException, InterruptedException { + BoxAPIConnection api = jwtApiForServiceAccount(); + Semaphore semaphore = new Semaphore(0); + + PipedOutputStream outputStream = new PipedOutputStream(); + PipedInputStream inputStream = new PipedInputStream(); + outputStream.connect(inputStream); + AtomicLong bytesUploaded = new AtomicLong(0); + ProgressListener progressListener = (numBytes, totalBytes) -> bytesUploaded.set(numBytes); + + String fileContent = "This is only a test"; + long fileSize = fileContent.getBytes(UTF_8).length; + + final BoxFile uploadedFile = uploadFileToUniqueFolder(api, randomizeName("Test File"), fileContent); + + new Thread( + () -> { + try { + new BoxFile(api, uploadedFile.getID()).download(outputStream); + } finally { + semaphore.release(); + } + }).start(); + + new Thread( + () -> { + new BoxFile(api, uploadedFile.getID()) + .uploadNewVersion(inputStream, new Date(), fileSize, progressListener); + semaphore.release(); + }).start(); + + + semaphore.acquire(2); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + new BoxFile(api, uploadedFile.getID()).download(output); + assertThat(output.toString(), is(fileContent)); + assertThat(bytesUploaded.get(), is(fileSize)); + } + + @Test + public void uploadFileInSeparateThreadSucceeds() throws IOException, InterruptedException { + BoxAPIConnection api = jwtApiForServiceAccount(); + Semaphore semaphore = new Semaphore(0); + + PipedOutputStream outputStream = new PipedOutputStream(); + PipedInputStream inputStream = new PipedInputStream(); + outputStream.connect(inputStream); + + String fileContent = "Test"; + byte[] bytes = fileContent.getBytes(UTF_8); + + AtomicReference uploadedFileId = new AtomicReference<>(); + + new Thread( + () -> { + IntStream.range(0, bytes.length) + .forEach(i -> { + try { + outputStream.write(bytes[i]); + Thread.sleep(100); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + }); + try { + outputStream.close(); + semaphore.release(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).start(); + + new Thread( + () -> { + BoxFile.Info uploadedFile = getUniqueFolder(api) + .uploadFile(inputStream, randomizeName("dynamic_upload")); + uploadedFileId.set(uploadedFile.getID()); + try { + inputStream.close(); + semaphore.release(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).start(); + + + semaphore.acquire(2); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + new BoxFile(api, uploadedFileId.get()).download(output); + assertThat(output.toString(), is(fileContent)); + } + private Collection getNames(Iterable page) { Collection result = new ArrayList<>(); for (BoxItem.Info info : page) { diff --git a/src/main/java/com/box/sdk/AbstractBoxMultipartRequest.java b/src/main/java/com/box/sdk/AbstractBoxMultipartRequest.java index d814cee02..a3759af47 100644 --- a/src/main/java/com/box/sdk/AbstractBoxMultipartRequest.java +++ b/src/main/java/com/box/sdk/AbstractBoxMultipartRequest.java @@ -24,7 +24,7 @@ abstract class AbstractBoxMultipartRequest extends BoxAPIRequest { private final Map fields = new HashMap<>(); private InputStream inputStream; private String filename; - private long fileSize; + private long fileSize = -1; private UploadFileCallback callback; AbstractBoxMultipartRequest(BoxAPIConnection api, URL url) { @@ -48,7 +48,9 @@ public void setFile(InputStream inputStream, String filename) { * * @param inputStream a stream containing the file contents. * @param filename the name of the file. - * @param fileSize the size of the file. + * @param fileSize the size of the file. If the file content is coming from the dynamic stream, + * and it's full size cannot be determined on starting upload use fizeSize=-1 + * or use {@link AbstractBoxMultipartRequest#setFile(InputStream, String)} */ public void setFile(InputStream inputStream, String filename, long fileSize) { this.setFile(inputStream, filename); @@ -151,7 +153,9 @@ protected void writeMethodWithBody(Request.Builder requestBuilder, ProgressListe private RequestBody getBody(ProgressListener progressListener) { if (this.callback == null) { - return new RequestBodyFromStream(this.inputStream, getPartContentType(filename), progressListener); + return new RequestBodyFromStream( + this.inputStream, getPartContentType(filename), progressListener, fileSize + ); } else { return new RequestBodyFromCallback(this.callback, getPartContentType(filename)); } diff --git a/src/main/java/com/box/sdk/BinaryBodyUtils.java b/src/main/java/com/box/sdk/BinaryBodyUtils.java index 34108aec1..58c4bcb24 100644 --- a/src/main/java/com/box/sdk/BinaryBodyUtils.java +++ b/src/main/java/com/box/sdk/BinaryBodyUtils.java @@ -16,8 +16,9 @@ private BinaryBodyUtils() { /** * Writes response body bytes to output stream. After all closes the input stream. + * * @param response Response that is going to be written. - * @param output Output stream. + * @param output Output stream. */ static void writeStream(BoxAPIResponse response, OutputStream output) { writeStream(response, output, null); @@ -25,8 +26,9 @@ static void writeStream(BoxAPIResponse response, OutputStream output) { /** * Writes response body bytes to output stream. After all closes the input stream. + * * @param response Response that is going to be written. - * @param output Output stream. + * @param output Output stream. * @param listener Listener that will be notified on writing response. Can be null. */ @@ -46,7 +48,8 @@ static void writeStream(BoxAPIResponse response, OutputStream output, ProgressLi /** * Writes content of input stream to provided output. Method is NOT closing input stream. - * @param input Input that will be read. + * + * @param input Input that will be read. * @param output Output stream. */ static void writeStreamTo(InputStream input, OutputStream output) { @@ -59,6 +62,13 @@ static void writeStreamTo(InputStream input, OutputStream output) { } } catch (IOException e) { throw new RuntimeException(e); + } finally { + try { + input.close(); + output.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } } } } diff --git a/src/main/java/com/box/sdk/FileUploadParams.java b/src/main/java/com/box/sdk/FileUploadParams.java index e89a0e363..386d2bf02 100644 --- a/src/main/java/com/box/sdk/FileUploadParams.java +++ b/src/main/java/com/box/sdk/FileUploadParams.java @@ -123,6 +123,7 @@ public FileUploadParams setModified(Date modified) { /** * Gets the size of the file's content used for monitoring the upload's progress. + * If the size cannot be determined value will be -1. * * @return the size of the file's content. */ @@ -132,6 +133,9 @@ public long getSize() { /** * Sets the size of the file content used for monitoring the upload's progress. + * When the content is coming from a dynamic source - other thread reading value + * set size to -1 to tell SDK that file size cannot be determined. Usefull + * when encuntering problems with writing different size of bytes than assumed. * * @param size the size of the file's content. * @return this FileUploadParams object for chaining. diff --git a/src/main/java/com/box/sdk/ProgressListener.java b/src/main/java/com/box/sdk/ProgressListener.java index 036720b55..b86c1c868 100644 --- a/src/main/java/com/box/sdk/ProgressListener.java +++ b/src/main/java/com/box/sdk/ProgressListener.java @@ -7,6 +7,8 @@ public interface ProgressListener { /** * Invoked when the progress of the API call changes. + * In case of file uploads which are coming from a dynamic stream the file size cannot be determined and + * total bytes will be reported as -1. * * @param numBytes the number of bytes completed. * @param totalBytes the total number of bytes. diff --git a/src/main/java/com/box/sdk/RequestBodyFromStream.java b/src/main/java/com/box/sdk/RequestBodyFromStream.java index a28275d1d..483baef4c 100644 --- a/src/main/java/com/box/sdk/RequestBodyFromStream.java +++ b/src/main/java/com/box/sdk/RequestBodyFromStream.java @@ -5,6 +5,8 @@ import okhttp3.MediaType; import okhttp3.RequestBody; import okio.BufferedSink; +import okio.Okio; +import okio.Source; /** * Utility class to write body from stream to BufferedSink used by OkHttp. @@ -13,41 +15,70 @@ final class RequestBodyFromStream extends RequestBody { private final InputStream inputStream; private final ProgressListener progressListener; private final MediaType mediaType; - private final int contentLength; + private final long fileSize; - RequestBodyFromStream(InputStream inputStream, MediaType mediaType, ProgressListener progressListener) { + RequestBodyFromStream( + InputStream inputStream, MediaType mediaType, ProgressListener progressListener, long fileSize + ) { this.inputStream = inputStream; this.progressListener = progressListener; this.mediaType = mediaType; - try { - this.contentLength = inputStream.available(); - } catch (IOException e) { - throw new RuntimeException("Cannot read input stream for upload", e); - } + this.fileSize = fileSize; } @Override - public long contentLength() { - return contentLength; + public MediaType contentType() { + return mediaType; } @Override - public MediaType contentType() { - return mediaType; + public void writeTo(BufferedSink bufferedSink) { + if (progressListener == null) { + writeWithoutProgressListener(bufferedSink); + } else { + writeWithProgressListener(bufferedSink); + } } + /** + * Returns content length. If the content length cannot be determined + * (is coming from a stream) this value will be -1. + * @return Long representing content length + */ @Override - public void writeTo(BufferedSink bufferedSink) throws IOException { - byte[] buffer = new byte[AbstractBoxMultipartRequest.BUFFER_SIZE]; - int n = this.inputStream.read(buffer); - int totalWritten = 0; - while (n != -1) { - bufferedSink.write(buffer, 0, n); - totalWritten += n; - if (progressListener != null) { - progressListener.onProgressChanged(totalWritten, this.contentLength()); + public long contentLength() { + return fileSize; + } + + private void writeWithoutProgressListener(BufferedSink bufferedSink) { + try (Source source = Okio.source(this.inputStream)) { + bufferedSink.writeAll(source); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void writeWithProgressListener(BufferedSink bufferedSink) { + try { + byte[] buffer = new byte[AbstractBoxMultipartRequest.BUFFER_SIZE]; + int n = this.inputStream.read(buffer); + int totalWritten = 0; + while (n != -1) { + bufferedSink.write(buffer, 0, n); + totalWritten += n; + if (progressListener != null) { + progressListener.onProgressChanged(totalWritten, this.contentLength()); + } + n = this.inputStream.read(buffer); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + this.inputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); } - n = this.inputStream.read(buffer); } } } diff --git a/src/test/java/com/box/sdk/RequestBodyFromStreamTest.java b/src/test/java/com/box/sdk/RequestBodyFromStreamTest.java index 7e3fbd10a..d442c6d1a 100644 --- a/src/test/java/com/box/sdk/RequestBodyFromStreamTest.java +++ b/src/test/java/com/box/sdk/RequestBodyFromStreamTest.java @@ -3,7 +3,6 @@ import static com.box.sdk.AbstractBoxMultipartRequest.BUFFER_SIZE; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import okhttp3.MediaType; @@ -15,7 +14,7 @@ public class RequestBodyFromStreamTest { @Test - public void reportCorrectProgressWhenFileIsEmpty() throws IOException { + public void reportCorrectProgressWhenFileIsEmpty() { ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[]{}); ProgressListener progressListener = (numBytes, totalBytes) -> { MatcherAssert.assertThat(numBytes, Matchers.is((long) 0)); @@ -23,13 +22,17 @@ public void reportCorrectProgressWhenFileIsEmpty() throws IOException { }; RequestBodyFromStream request = new RequestBodyFromStream( - inputStream, MediaType.parse("application/json"), progressListener + inputStream, + MediaType.parse("application/json"), + progressListener, + 0 ); request.writeTo(new Buffer()); } + @Test - public void reportCorrectProgressWhenFileSizeIfLessThanBuffer() throws IOException { + public void reportCorrectProgressWhenFileSizeIfLessThanBuffer() { int howManyBytes = 1000; ByteArrayInputStream inputStream = new ByteArrayInputStream(generateBytes(howManyBytes)); ProgressListener progressListener = (numBytes, totalBytes) -> { @@ -38,14 +41,17 @@ public void reportCorrectProgressWhenFileSizeIfLessThanBuffer() throws IOExcepti }; RequestBodyFromStream request = new RequestBodyFromStream( - inputStream, MediaType.parse("application/json"), progressListener + inputStream, + MediaType.parse("application/json"), + progressListener, + howManyBytes ); request.writeTo(new Buffer()); } @Test - public void reportCorrectProgressWhenFileSizeIfEqualToBuffer() throws IOException { + public void reportCorrectProgressWhenFileSizeIfEqualToBuffer() { int howManyBytes = BUFFER_SIZE; ByteArrayInputStream inputStream = new ByteArrayInputStream(generateBytes(howManyBytes)); ProgressListener progressListener = (numBytes, totalBytes) -> { @@ -54,14 +60,17 @@ public void reportCorrectProgressWhenFileSizeIfEqualToBuffer() throws IOExceptio }; RequestBodyFromStream request = new RequestBodyFromStream( - inputStream, MediaType.parse("application/json"), progressListener + inputStream, + MediaType.parse("application/json"), + progressListener, + howManyBytes ); request.writeTo(new Buffer()); } @Test - public void reportCorrectProgressWhenFileSizeIfGreaterThanBuffer() throws IOException { + public void reportCorrectProgressWhenFileSizeIfGreaterThanBuffer() { int howManyBytes = BUFFER_SIZE + 1000; ByteArrayInputStream inputStream = new ByteArrayInputStream(generateBytes(howManyBytes)); AtomicInteger counter = new AtomicInteger(0); @@ -76,7 +85,10 @@ public void reportCorrectProgressWhenFileSizeIfGreaterThanBuffer() throws IOExce }; RequestBodyFromStream request = new RequestBodyFromStream( - inputStream, MediaType.parse("application/json"), progressListener + inputStream, + MediaType.parse("application/json"), + progressListener, + howManyBytes ); request.writeTo(new Buffer());