Skip to content

Commit

Permalink
docs: Create docs for Inbox stream API (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
djenczewski committed Dec 10, 2024
1 parent 45aa509 commit cc32376
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@
import java.util.concurrent.Future;
import java.util.stream.Collectors;

/**
* Provides a streamlined process for creating and sending inbox entries
* with optional file attachments.
* <p> This class simplifies the complexities of interacting with the Inbox API for sending entries,
* especially when dealing with multiple files. It manages the lifecycle of the entry creation
* process, including file uploads and final entry submission.
*
* @category inbox
*/
public class InboxEntryStream {
private final InboxApi inboxApi;
private final Map<FileInfo, InboxFileStreamWriter> inboxFiles;
Expand All @@ -52,42 +61,96 @@ private InboxEntryStream(InboxApi api, Map<FileInfo, InboxFileStreamWriter> inbo
}
}

/**
* Creates {@link InboxEntryStream} instance ready for streaming.
* <p> This method initializes an {@link InboxEntryStream} and prepares it for sending
* an entry with the provided data. It creates an Inbox handle and sets the
* initial state of the stream to {@link State#FILES_SENT}
*
* @param inboxApi reference to Inbox API
* @param inboxId ID of the Inbox
* @param entryStreamListener the listener for stream state changes.
* @param data entry data to send
* @return instance of {@link InboxEntryStream} prepared for streaming.
*/
public static InboxEntryStream prepareEntry(
InboxApi inboxApi,
String inboxId,
EntryStreamListener entryStreamListener,
byte[] data
) {
) throws PrivmxException, NativeException, IllegalStateException {
return prepareEntry(inboxApi, inboxId, entryStreamListener, data, null);
}

/**
* Creates {@link InboxEntryStream} instance ready for streaming.
* <p>This method initializes an {@link InboxEntryStream} and prepares it for sending an entry with
* associated files and empty data. It creates Inbox and file handles, setting the initial state of the stream
* to {@link State#PREPARED}, indicating readiness for file transfer.
*
* @param inboxApi reference to Inbox API
* @param inboxId ID of the Inbox
* @param entryStreamListener the listener for stream state changes.
* @param filesConfig information about each entry's file to send.
* @return instance of {@link InboxEntryStream} prepared for streaming.
*/
public static InboxEntryStream prepareEntry(
InboxApi inboxApi,
String inboxId,
EntryStreamListener entryStreamListener,
List<FileInfo> filesConfig
) {
) throws PrivmxException, NativeException, IllegalStateException {
return prepareEntry(inboxApi, inboxId, entryStreamListener, "".getBytes(StandardCharsets.UTF_8), filesConfig);
}

/**
* Creates an {@link InboxEntryStream} instance ready for streaming, with optional files and encryption.
* <p>This method initializes an {@link InboxEntryStream} and prepares it for sending an entry with
* the provided data and optional associated files. It creates Inbox and file handles (if
* {@code filesConfig} is provided), setting the initial state of the stream to
* {@link State#PREPARED}, indicating readiness for data and file transfer.
*
* @param inboxApi reference to Inbox API
* @param inboxId ID of the Inbox
* @param entryStreamListener the listener for stream state changes
* @param data entry data to send
* @param filesConfig information about each entry's file to send.
* @return instance of {@link InboxEntryStream} prepared for streaming.
*/
public static InboxEntryStream prepareEntry(
InboxApi inboxApi,
String inboxId,
EntryStreamListener entryStreamListener,
byte[] data,
List<FileInfo> filesConfig
) {
) throws PrivmxException, NativeException, IllegalStateException {
return prepareEntry(inboxApi, inboxId, entryStreamListener, data, filesConfig, null);
}

/**
* Creates an {@link InboxEntryStream} instance ready for streaming, with optional files and encryption.
* <p>This method initializes an {@link InboxEntryStream} and prepares it for sending an entry with the provided data,
* optional associated files, and optional encryption using the sender's private key. It creates an Inbox handle
* and initializes file handles for any associated files. The initial state of the stream is determined based
* on the presence of files: if no files are provided, the state is set to {@link State#FILES_SENT}; otherwise,
* it's set to {@link State#PREPARED}, indicating readiness for file transfer.
*
* @param inboxApi reference to Inbox API
* @param inboxId ID of the Inbox
* @param entryStreamListener the listener for stream state changes
* @param data entry data to send
* @param filesConfig information about each entry's file to send
* @param userPrivKey sender's private key which can be used later to encrypt data for that sender
* @return instance of {@link InboxEntryStream} prepared for streaming.
*/
public static InboxEntryStream prepareEntry(
InboxApi inboxApi,
String inboxId,
EntryStreamListener entryStreamListener,
byte[] data,
List<FileInfo> filesConfig,
String userPrivKey
) {
) throws PrivmxException, NativeException, IllegalStateException {
Map<FileInfo, InboxFileStreamWriter> files = Optional.ofNullable(filesConfig)
.orElse(Collections.emptyList())
.stream()
Expand All @@ -107,15 +170,20 @@ public static InboxEntryStream prepareEntry(
return new InboxEntryStream(inboxApi, files, inboxHandle, entryStreamListener);
}

/**
* Initiates the process of sending files using the provided executor.
* <p>This method submits each file for sending to the {@code fileStreamExecutor}
* and wait for completion.
*
* @param fileStreamExecutor the executor service responsible for executing file sending tasks
* @throws IllegalStateException If the stream is not in the {@link State#PREPARED} state.
*/
public synchronized void sendFiles(
ExecutorService fileStreamExecutor
) {
) throws IllegalStateException {
if (streamState != State.PREPARED) {
throw new IllegalStateException("Stream should be in state PREPARED. Current state is: " + streamState.name());
}
if (!sendingFiles.isEmpty()) {
throw new IllegalStateException("Uploading files in progress");
}
inboxFiles.forEach((fileInfo, fileHandle) -> {
sendingFiles.add(fileStreamExecutor.submit(() -> {
try {
Expand Down Expand Up @@ -150,6 +218,17 @@ public synchronized void sendFiles(

}

/**
* Sends files using a single-threaded executor (see: {@link Executors#newSingleThreadExecutor}).
*
* @throws IllegalStateException when this stream is not in state {@link State#PREPARED}.
*/
public void sendFiles() {
try (ExecutorService executor = Executors.newSingleThreadExecutor()) {
sendFiles(executor);
}
}

private void sendFile(
FileInfo fileInfo,
InboxFileStreamWriter fileHandle
Expand Down Expand Up @@ -182,7 +261,7 @@ public void onChunkProcessed(Long processedBytes) {
}
}

public void onError(Throwable t) {
private void onError(Throwable t) {
cancel();
stopFileStreams();
closeFileHandles();
Expand All @@ -199,15 +278,19 @@ private void updateState(State newState) {
}
}

/**
* Cancels the stream and sets its state to {@link State#ABORTED}.
* <p>If the stream is currently sending files, all pending file operations will be canceled.
* <p>If the stream is in the process of sending the entry, this operation will not have any effect.
*/
public void cancel() {
if (streamState == State.ERROR) return;
if (streamState == State.ABORTED) return;
if (streamState == State.SENT) return;
synchronized (this) {
stopFileStreams();
closeFileHandles();
if (streamState != State.SENT) {
updateState(State.ABORTED);
}
updateState(State.ABORTED);
}
}

Expand All @@ -234,12 +317,15 @@ private void closeFileHandles() {
}
}

public void sendFiles() {
try (ExecutorService executor = Executors.newSingleThreadExecutor()) {
sendFiles(executor);
}
}

/**
* Sends the entry data and closes this stream, transitioning it to the {@link State#SENT} state.
* <p>This method should only be called after all files associated with the entry have been
* successfully sent, indicated by the stream being in the {@link State#FILES_SENT} state.
*
* @throws PrivmxException when method encounters an exception during call {@link InboxApi#sendEntry} method
* @throws NativeException when method encounters an unknown exception during call {@link InboxApi#sendEntry} method
* @throws IllegalStateException if the stream is not in the {@link State#FILES_SENT} state
*/
public synchronized void sendEntry() throws PrivmxException, NativeException, IllegalStateException {
if (streamState != State.FILES_SENT) {
throw new IllegalStateException("Stream should be in state FILES_SENT. Current state is: " + streamState.name());
Expand All @@ -252,6 +338,9 @@ public synchronized void sendEntry() throws PrivmxException, NativeException, Il
}
}

/**
* Contains available states of {@link InboxEntryStream}.
*/
public enum State {
/**
* The initial state, indicating that {@link InboxEntryStream} is ready to send files.
Expand All @@ -278,12 +367,42 @@ public enum State {
ABORTED
}

/**
* Represents information about a file to be sent by {@link InboxEntryStream}.
*/
public static class FileInfo {

/**
* Byte array of any arbitrary metadata that can be read by anyone.
*/
public byte[] publicMeta;

/**
* Byte array of any arbitrary metadata that will be encrypted before sending.
*/
public byte[] privateMeta;

/**
* The total size of the file data.
*/
public long fileSize;

/**
* An optional {@link InputStream} providing the file data.
* <p>If this value is {@code null}, the stream will call
* {@link EntryStreamListener#onNextChunkRequest} to request chunks of data
* for sending.
*/
public InputStream fileStream;

/**
* Creates instance of {@link FileInfo}.
*
* @param publicMeta byte array of any arbitrary metadata that can be read by anyone
* @param privateMeta byte array of any arbitrary metadata that will be encrypted before sending
* @param fileSize the total size of the file data
* @param fileStream reference to {@link InputStream} instance used as source for stream file
*/
public FileInfo(
byte[] publicMeta,
byte[] privateMeta,
Expand All @@ -297,30 +416,83 @@ public FileInfo(
}
}

/**
* Interface for listening to state changes and exchanging data with an {@link InboxEntryStream} instance.
* <p>
* This interface provides callbacks for various events that occur during the lifecycle of an inbox entry stream,
* such as starting and ending file sending, requesting file chunks, handling errors, and updating the stream state.
* <p>
* Implement this interface to monitor and interact with the entry stream.
*/
public abstract static class EntryStreamListener {

/**
* Override this method to handle when file start sending.
*
* @param file information about the file being sent
*/
public void onStartFileSending(FileInfo file) {

}

/**
* Override this method to handle when file has been send successfully.
*
* @param file information about the sent file
*/
public void onEndFileSending(FileInfo file) {

}

/**
* Override this method to handle event when {@link FileInfo#fileStream} is {@code null}
* and the stream requests a chunk of the file to send.
* <p>This method should return the next chunk of the file.
* <p>Returning {@code null} will cause a
* {@link NullPointerException} during file sending and stop the {@link InboxEntryStream} instance with
* the state {@link State#ERROR}.
*
* @param file info about file for which chunk is requested
* @return next chunk of file.
*/
public byte[] onNextChunkRequest(FileInfo file) {
return null;
}

public void onFileChunkProcessed(FileInfo file, long chunk) {
/**
* Override this method to handle event when each chunk of a file has been sent successfully.
*
* @param file information about the file for which the chunk was processed
* @param processedBytes accumulated size of sent data
*/
public void onFileChunkProcessed(FileInfo file, long processedBytes) {

}

/**
* Override this method to handle event when some error occurs during file sending.
*
* @param file information about the file that caused the error
* @param throwable exception that occurred during file sending
*/
public void onErrorDuringSending(FileInfo file, Throwable throwable) {

}

public void onError(Throwable t) {
/**
* Override this method to handle event when some error occurs during creating entry.
*
* @param throwable exception that occurred during entry creation
*/
public void onError(Throwable throwable) {

}

/**
* Override this method to handle event when stream state has been updated.
*
* @param currentState current state of the stream
*/
public void onUpdateState(State currentState) {

}
Expand Down
Loading

0 comments on commit cc32376

Please sign in to comment.