From 56f2ee4079012f9fb6bf5e4088d15070e95f6027 Mon Sep 17 00:00:00 2001 From: Dawid Jenczewski Date: Wed, 4 Dec 2024 10:19:37 +0100 Subject: [PATCH] feat(privmx-endopint-extra): Implement Inbox Stream API #4 --- .../inboxEntryStream/InboxEntryStream.java | 326 ++++++++++++++++++ .../inboxFileStream/InboxFileStream.java | 107 ++++++ .../InboxFileStreamReader.java | 141 ++++++++ .../InboxFileStreamWriter.java | 97 ++++++ 4 files changed, 671 insertions(+) create mode 100644 privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxEntryStream/InboxEntryStream.java create mode 100644 privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStream.java create mode 100644 privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStreamReader.java create mode 100644 privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStreamWriter.java diff --git a/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxEntryStream/InboxEntryStream.java b/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxEntryStream/InboxEntryStream.java new file mode 100644 index 0000000..d3601f3 --- /dev/null +++ b/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxEntryStream/InboxEntryStream.java @@ -0,0 +1,326 @@ +// +// PrivMX Endpoint Java Extra. +// Copyright © 2024 Simplito sp. z o.o. +// +// This file is part of the PrivMX Platform (https://privmx.dev). +// This software is Licensed under the MIT License. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package com.simplito.java.privmx_endpoint_extra.inboxEntryStream; + +import com.simplito.java.privmx_endpoint.model.exceptions.NativeException; +import com.simplito.java.privmx_endpoint.model.exceptions.PrivmxException; +import com.simplito.java.privmx_endpoint.modules.inbox.InboxApi; +import com.simplito.java.privmx_endpoint_extra.inboxFileStream.InboxFileStreamWriter; +import com.simplito.java.privmx_endpoint_extra.storeFileStream.StoreFileStream; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +public class InboxEntryStream { + private final InboxApi inboxApi; + private final Map inboxFiles; + private final long inboxHandle; + private final EntryStreamListener entryStreamListener; + private final List> sendingFiles = new ArrayList<>(); + private State streamState = State.PREPARED; + + private InboxEntryStream(InboxApi api, Map inboxFiles, long inboxHandle, EntryStreamListener entryStreamListener) { + Objects.requireNonNull(inboxFiles); + Objects.requireNonNull(entryStreamListener); + this.inboxApi = api; + this.inboxFiles = inboxFiles; + this.inboxHandle = inboxHandle; + this.entryStreamListener = entryStreamListener; + if (inboxFiles.isEmpty()) { + streamState = State.FILES_SENT; + } + } + + public static InboxEntryStream prepareEntry( + InboxApi inboxApi, + String inboxId, + EntryStreamListener entryStreamListener, + byte[] data + ) { + return prepareEntry(inboxApi, inboxId, entryStreamListener, data, null); + } + + public static InboxEntryStream prepareEntry( + InboxApi inboxApi, + String inboxId, + EntryStreamListener entryStreamListener, + List filesConfig + ) { + return prepareEntry(inboxApi, inboxId, entryStreamListener, "".getBytes(StandardCharsets.UTF_8), filesConfig); + } + + public static InboxEntryStream prepareEntry( + InboxApi inboxApi, + String inboxId, + EntryStreamListener entryStreamListener, + byte[] data, + List filesConfig + ) { + return prepareEntry(inboxApi, inboxId, entryStreamListener, data, filesConfig, null); + } + + public static InboxEntryStream prepareEntry( + InboxApi inboxApi, + String inboxId, + EntryStreamListener entryStreamListener, + byte[] data, + List filesConfig, + String userPrivKey + ) { + Map files = Optional.ofNullable(filesConfig) + .orElse(Collections.emptyList()) + .stream() + .collect( + Collectors.toMap( + fileInfo -> fileInfo, + config -> InboxFileStreamWriter.createFile( + inboxApi, + config.publicMeta, + config.privateMeta, + config.fileSize + ) + ) + ); + List fileHandles = files.values().stream().map(InboxFileStreamWriter::getFileHandle).collect(Collectors.toList()); + Long inboxHandle = inboxApi.prepareEntry(inboxId, data, fileHandles, userPrivKey); + return new InboxEntryStream(inboxApi, files, inboxHandle, entryStreamListener); + } + + public synchronized void sendFiles( + ExecutorService fileStreamExecutor + ) { + if (streamState != State.PREPARED) { + throw new IllegalStateException("Stream should be in state PREPARED. Current state is: " + streamState.name()); + } + if (!sendingFiles.isEmpty()) { + throw new IllegalStateException("Uploading files in progress"); + } + inboxFiles.forEach((fileInfo, fileHandle) -> { + sendingFiles.add(fileStreamExecutor.submit(() -> { + try { + sendFile(fileInfo, fileHandle); + entryStreamListener.onEndFileSending(fileInfo); + } catch (Exception e) { + stopFileStreams(); + onError(e); + entryStreamListener.onErrorDuringSending(fileInfo, e); + } + })); + }); + System.out.println("Start waiting 2"); + for (Future future : sendingFiles) { + if (Thread.interrupted()) { + cancel(); + return; + } + try { + future.get(); + } catch (InterruptedException | CancellationException e) { + // catch when in async mode someone call cancel on result Future. + cancel(); + return; + } catch (Exception e) { + System.out.println("Break waiting on other exception"); + } + } + if (sendingFiles.stream().allMatch(Future::isDone)) { + updateState(State.FILES_SENT); + } else { + onError(new IllegalStateException("Some files cannot be sent")); + } + + } + + private void sendFile( + FileInfo fileInfo, + InboxFileStreamWriter fileHandle + ) throws PrivmxException, NativeException, IllegalStateException, IOException { + final StoreFileStream.Controller controller = new StoreFileStream.Controller() { + @Override + public void onChunkProcessed(Long processedBytes) { + entryStreamListener.onFileChunkProcessed(fileInfo, processedBytes); + if (Thread.interrupted()) { + this.stop(); + } + } + }; + + entryStreamListener.onStartFileSending(fileInfo); + if (fileInfo.fileStream == null) { + fileHandle.setProgressListener(controller); + while (fileInfo.fileSize > fileHandle.getProcessedBytes() && !fileHandle.isClosed()) { + if (controller.isStopped()) { + break; + } + fileHandle.write(inboxHandle, entryStreamListener.onNextChunkRequest(fileInfo)); + } + } else { + fileHandle.writeStream(inboxHandle, fileInfo.fileStream, controller); + } + } + + public void onError(Throwable t) { + cancel(); + stopFileStreams(); + closeFileHandles(); + updateState(State.ERROR); + entryStreamListener.onError(t); + } + + private void updateState(State newState) { + if (streamState != newState) { + synchronized (this) { + streamState = newState; + entryStreamListener.onUpdateState(streamState); + } + } + } + + public void cancel() { + if (streamState == State.ERROR) return; + if (streamState == State.ABORTED) return; + synchronized (this) { + stopFileStreams(); + closeFileHandles(); + if (streamState != State.SENT) { + updateState(State.ABORTED); + } + } + } + + private void stopFileStreams() { + if (streamState == State.PREPARED && !sendingFiles.isEmpty()) { + synchronized (sendingFiles) { + sendingFiles.forEach((task) -> { + task.cancel(true); + }); + } + } + } + + private void closeFileHandles() { + synchronized (inboxFiles) { + inboxFiles.values().forEach(file -> { + try { + if (!file.isClosed()) { + file.close(); + } + } catch (Exception ignore) { + } + }); + } + } + + public void sendFiles() { + try (ExecutorService executor = Executors.newSingleThreadExecutor()) { + sendFiles(executor); + } + } + + public synchronized void sendEntry() throws PrivmxException, NativeException, IllegalStateException { + if (streamState != State.FILES_SENT) { + throw new IllegalStateException("Stream should be in state FILES_SENT. Current state is: " + streamState.name()); + } + try { + inboxApi.sendEntry(inboxHandle); + updateState(State.SENT); + } catch (Exception e) { + onError(e); + } + } + + public enum State { + /** + * The initial state, indicating that {@link InboxEntryStream} is ready to send files. + */ + PREPARED, + /** + * Indicates that all files have been sent successfully and the entry is ready to be sent. + * This state is set when: + * 1. The {@link InboxEntryStream} has been initialized and there are no files to send. + * 2. All files have been sent successfully. + */ + FILES_SENT, + /** + * Indicates that an error occurred during the process of sending files or the Entry. + */ + ERROR, + /** + * Indicates that the entry has been sent successfully. + */ + SENT, + /** + * Indicates that the {@link InboxEntryStream} has been canceled. + */ + ABORTED + } + + public static class FileInfo { + public byte[] publicMeta; + public byte[] privateMeta; + public long fileSize; + public InputStream fileStream; + + public FileInfo( + byte[] publicMeta, + byte[] privateMeta, + long fileSize, + InputStream fileStream + ) { + this.publicMeta = publicMeta; + this.privateMeta = privateMeta; + this.fileSize = fileSize; + this.fileStream = fileStream; + } + } + + public abstract static class EntryStreamListener { + public void onStartFileSending(FileInfo file) { + } + + public void onEndFileSending(FileInfo file) { + + } + + public byte[] onNextChunkRequest(FileInfo file) { + return null; + } + + public void onFileChunkProcessed(FileInfo file, long chunk) { + + } + + public void onErrorDuringSending(FileInfo file, Throwable throwable) { + + } + + public void onError(Throwable t) { + + } + + public void onUpdateState(State currentState) { + + } + } +} diff --git a/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStream.java b/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStream.java new file mode 100644 index 0000000..dd23a7f --- /dev/null +++ b/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStream.java @@ -0,0 +1,107 @@ +// +// PrivMX Endpoint Java Extra. +// Copyright © 2024 Simplito sp. z o.o. +// +// This file is part of the PrivMX Platform (https://privmx.dev). +// This software is Licensed under the MIT License. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package com.simplito.java.privmx_endpoint_extra.inboxFileStream; + +import com.simplito.java.privmx_endpoint.model.exceptions.NativeException; +import com.simplito.java.privmx_endpoint.model.exceptions.PrivmxException; +import com.simplito.java.privmx_endpoint.modules.inbox.InboxApi; +import com.simplito.java.privmx_endpoint_extra.storeFileStream.StoreFileStream; +import com.simplito.java.privmx_endpoint_extra.storeFileStream.StoreFileStream.ProgressListener; + +/** + * Base class for Store file streams. Implements progress listeners. + * + * @category store + */ +public abstract class InboxFileStream { + /** + * Constant value with optimal size of reading/sending data. + */ + public static final long OPTIMAL_SEND_SIZE = 128 * 1024L; + /** + * Reference to file handle. + */ + protected final Long handle; + /** + * Reference to {@link com.simplito.java.privmx_endpoint.modules.inbox.InboxApi}. + */ + protected final InboxApi inboxApi; + private Long processedBytes = 0L; + private StoreFileStream.ProgressListener progressListener; + private Boolean closed = false; + + /** + * Creates instance of {@code StoreFileStream}. + * + * @param handle handle to Store file + * @param inboxApi {@link InboxApi} instance that calls read/write methods on files + */ + protected InboxFileStream( + Long handle, + InboxApi inboxApi + ) { + this.handle = handle; + this.inboxApi = inboxApi; + } + + public Long getFileHandle() { + return handle; + } + + /** + * Sets listening for single chunk sent/read. + * + * @param progressListener callback triggered when chunk is sent/read + */ + public void setProgressListener(ProgressListener progressListener) { + this.progressListener = progressListener; + } + + /** + * Increases the size of current sent/read data by chunkSize and calls {@link StoreFileStream.ProgressListener#onChunkProcessed(Long)}. + * + * @param chunkSize size of processed chunk + */ + protected void callChunkProcessed(Long chunkSize) { + processedBytes += chunkSize; + if (progressListener != null) { + progressListener.onChunkProcessed(processedBytes); + } + } + + public Long getProcessedBytes() { + return this.processedBytes; + } + + /** + * Returns information whether the instance is closed. + * + * @return {@code true} if file handle is closed + */ + public Boolean isClosed() { + return closed; + } + + /** + * Closes file handle. + * + * @return ID of the closed file + * @throws PrivmxException if there is an error while closing file + * @throws NativeException if there is an unknown error while closing file + * @throws IllegalStateException when {@code storeApi} is not initialized or there's no connection + */ + public synchronized String close() throws PrivmxException, NativeException, IllegalStateException { + String result = inboxApi.closeFile(handle); + closed = true; + return result; + } +} diff --git a/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStreamReader.java b/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStreamReader.java new file mode 100644 index 0000000..b38c131 --- /dev/null +++ b/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStreamReader.java @@ -0,0 +1,141 @@ +// +// PrivMX Endpoint Java Extra. +// Copyright © 2024 Simplito sp. z o.o. +// +// This file is part of the PrivMX Platform (https://privmx.dev). +// This software is Licensed under the MIT License. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package com.simplito.java.privmx_endpoint_extra.inboxFileStream; + +import com.simplito.java.privmx_endpoint.model.exceptions.NativeException; +import com.simplito.java.privmx_endpoint.model.exceptions.PrivmxException; +import com.simplito.java.privmx_endpoint.modules.inbox.InboxApi; +import com.simplito.java.privmx_endpoint_extra.storeFileStream.StoreFileStream; + +import java.io.IOException; +import java.io.OutputStream; + + +/** + * Manages handle for file reading. + * + * @category inbox + */ +public class InboxFileStreamReader extends InboxFileStream { + + + private InboxFileStreamReader( + Long handle, + InboxApi api + ) { + super(handle, api); + } + + /** + * Opens Inbox file. + * + * @param api reference to Inbox API + * @param fileId ID of the file to open + * @return Instance ready to read from the Inbox file + * @throws IllegalStateException when {@code inboxApi} is not initialized or there's no connection + * @throws PrivmxException if there is an error while opening Inbox file + * @throws NativeException if there is an unknown error while opening Inbox file + */ + public static InboxFileStreamReader openFile( + InboxApi api, + String fileId + ) throws IllegalStateException, PrivmxException, NativeException { + return new InboxFileStreamReader( + api.openFile(fileId), + api + ); + } + + /** + * Opens Inbox file and writes it into {@link OutputStream}. + * + * @param api reference to Inbox API + * @param fileId ID of the file to open + * @param outputStream stream to write downloaded data with optimized chunk size {@link InboxFileStream#OPTIMAL_SEND_SIZE} + * @return ID of the read file + * @throws IOException if there is an error while writing the stream + * @throws IllegalStateException when inboxApi is not initialized or there's no connection + * @throws PrivmxException if there is an error while opening Inbox file + * @throws NativeException if there is an unknown error while opening Inbox file + */ + public static String openFile( + InboxApi api, + String fileId, + OutputStream outputStream + ) throws IOException, IllegalStateException, PrivmxException, NativeException { + return InboxFileStreamReader.openFile(api, fileId, outputStream, null); + } + + /** + * Opens Inbox file and writes it into {@link OutputStream}. + * + * @param api reference to Inbox API + * @param fileId ID of the file to open + * @param outputStream stream to write downloaded data with optimized chunk size {@link InboxFileStream#OPTIMAL_SEND_SIZE} + * @param streamController controls the process of reading file + * @return ID of the read file + * @throws IOException if there is an error while writing stream + * @throws IllegalStateException when inboxApi is not initialized or there's no connection + * @throws PrivmxException if there is an error while reading Inbox file + * @throws NativeException if there is an unknown error while reading Inbox file + */ + public static String openFile( + InboxApi api, + String fileId, + OutputStream outputStream, + StoreFileStream.Controller streamController + ) throws IOException, IllegalStateException, PrivmxException, NativeException { + InboxFileStreamReader input = InboxFileStreamReader.openFile(api, fileId); + if (streamController != null) { + input.setProgressListener(streamController); + } + byte[] chunk; + do { + if (streamController != null && streamController.isStopped()) { + input.close(); + } + chunk = input.read(InboxFileStream.OPTIMAL_SEND_SIZE); + outputStream.write(chunk); + } while (chunk.length == InboxFileStream.OPTIMAL_SEND_SIZE); + + return input.close(); + } + + /** + * Reads file data and moves the cursor. If read data size is less than length, then EOF. + * + * @param size size of data to read (the recommended size is {@link InboxFileStream#OPTIMAL_SEND_SIZE}) + * @return Read data + * @throws IOException when {@code this} is closed + * @throws PrivmxException when method encounters an exception + * @throws NativeException when method encounters an unknown exception + * @throws IllegalStateException when {@link #inboxApi} is closed + */ + public byte[] read(Long size) throws IOException, PrivmxException, NativeException, IllegalStateException { + if (isClosed()) throw new IOException("File handle is closed"); + byte[] result = inboxApi.readFromFile(handle, size); + callChunkProcessed((long) result.length); + return result; + } + + /** + * Moves read cursor. + * + * @param position new cursor position + * @throws IllegalStateException if {@code inboxApi} is not initialized or connected + * @throws PrivmxException if there is an error while seeking + * @throws NativeException if there is an unknown error while seeking + */ + public void seek(long position) throws IllegalStateException, PrivmxException, NativeException { + inboxApi.seekInFile(handle, position); + } +} diff --git a/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStreamWriter.java b/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStreamWriter.java new file mode 100644 index 0000000..7cc4bf7 --- /dev/null +++ b/privmx-endpoint-extra/src/main/java/com/simplito/java/privmx_endpoint_extra/inboxFileStream/InboxFileStreamWriter.java @@ -0,0 +1,97 @@ +// +// PrivMX Endpoint Java Extra. +// Copyright © 2024 Simplito sp. z o.o. +// +// This file is part of the PrivMX Platform (https://privmx.dev). +// This software is Licensed under the MIT License. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package com.simplito.java.privmx_endpoint_extra.inboxFileStream; + +import com.simplito.java.privmx_endpoint.model.exceptions.NativeException; +import com.simplito.java.privmx_endpoint.model.exceptions.PrivmxException; +import com.simplito.java.privmx_endpoint.modules.inbox.InboxApi; +import com.simplito.java.privmx_endpoint_extra.storeFileStream.StoreFileStream; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +/** + * Manages handle for file writing. + * + * @category inbox + */ +public class InboxFileStreamWriter extends InboxFileStream { + + private InboxFileStreamWriter(Long handle, InboxApi inboxApi) { + super(handle, inboxApi); + } + + /** + * Creates a new file in given Inbox. + * + * @param api reference to Inbox API + * @param publicMeta byte array of any arbitrary metadata that can be read by anyone + * @param privateMeta byte array of any arbitrary metadata that will be encrypted before sending + * @param size size of data to write + * @return Instance ready to write to the created Inbox file + * @throws IllegalStateException when inboxApi is not initialized or there's no connection + * @throws PrivmxException if there is an error while creating Inbox file metadata + * @throws NativeException if there is an unknown error while creating inbox file metadata + */ + public static InboxFileStreamWriter createFile( + InboxApi api, + byte[] publicMeta, + byte[] privateMeta, + long size + ) throws PrivmxException, NativeException, IllegalStateException { + if (api == null) throw new NullPointerException("api cannot be null"); + return new InboxFileStreamWriter( + api.createFileHandle(publicMeta, privateMeta, size), + api + ); + } + + /** + * Writes data to Inbox file. + * + * @param data data to write (the recommended size of data chunk is {@link InboxFileStream#OPTIMAL_SEND_SIZE}) + * @throws PrivmxException if there is an error while writing chunk + * @throws NativeException if there is an unknown error while writing chunk + * @throws IllegalStateException when inboxApi is not initialized or there's no connection + * @throws IOException when {@code this} is closed + * @throws PrivmxException when method encounters an exception + * @throws NativeException when method encounters an unknown exception + * @throws IllegalStateException when {@link #inboxApi} is closed + */ + public void write(long inboxHandle, byte[] data) throws PrivmxException, NativeException, IllegalStateException, IOException { + if (isClosed()) throw new IOException("File handle is closed"); + inboxApi.writeToFile(inboxHandle, handle, data); + callChunkProcessed((long) data.length); + } + + public void writeStream(long inboxHandle, InputStream stream) throws PrivmxException, NativeException, IllegalStateException, IOException { + writeStream(inboxHandle, stream, null); + } + + public void writeStream(long inboxHandle, InputStream inputStream, StoreFileStream.Controller streamController) throws PrivmxException, NativeException, IllegalStateException, IOException { + if (streamController != null) { + setProgressListener(streamController); + } + byte[] chunk = new byte[(int) StoreFileStream.OPTIMAL_SEND_SIZE]; + int readed; + while (true) { + if (streamController != null && streamController.isStopped()) { + break; + } + if ((readed = inputStream.read(chunk)) <= 0) { + break; + } + write(inboxHandle, Arrays.copyOf(chunk, readed)); + } + } +}