Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream API methods #60

Merged
merged 12 commits into from
Feb 12, 2025
219 changes: 178 additions & 41 deletions src/main/java/land/oras/Registry.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package land.oras;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import land.oras.auth.*;
Expand Down Expand Up @@ -216,24 +223,16 @@ public Manifest pushArtifact(
public void pullArtifact(ContainerRef containerRef, Path path, boolean overwrite) {
Manifest manifest = getManifest(containerRef);
for (Layer layer : manifest.getLayers()) {
// Archive
if (layer.getMediaType().equals(Const.DEFAULT_BLOB_DIR_MEDIA_TYPE)) {
Path archive = ArchiveUtils.createTempArchive();
fetchBlob(containerRef.withDigest(layer.getDigest()), archive);
LOG.debug("Extracting: {} to {}", archive, path);
ArchiveUtils.extractTarGz(archive, path);
LOG.debug("Extracted: {}", path);
}
// Single artifact layer
else {
// Take the filename of default to the digest
String fileName = layer.getAnnotations().getOrDefault(Const.ANNOTATION_TITLE, layer.getDigest());
Path filePath = path.resolve(fileName);
if (Files.exists(filePath) && !overwrite) {
LOG.info("File already exists. Not overriding: {}", filePath);
} else {
fetchBlob(containerRef.withDigest(layer.getDigest()), filePath);
}
Path targetPath =
path.resolve(layer.getAnnotations().getOrDefault(Const.ANNOTATION_TITLE, layer.getDigest()));

try (InputStream is = fetchBlob(containerRef.withDigest(layer.getDigest()))) {
Files.copy(
is,
targetPath,
overwrite ? StandardCopyOption.REPLACE_EXISTING : StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
throw new OrasException("Failed to pull artifact", e);
}
}
}
Expand Down Expand Up @@ -265,29 +264,43 @@ public Manifest pushArtifact(
List<Layer> layers = new ArrayList<>();
// Upload all files as blobs
for (Path path : paths) {

// Save filename in case of compressed archive
String fileName = path.getFileName().toString();
boolean isDirectory = false;

// Add layer annotation for the specific file
Map<String, String> layerAnnotations = new HashMap<>(annotations.getFileAnnotations(fileName));

// Add title annotation
layerAnnotations.put(Const.ANNOTATION_TITLE, fileName);

// Compress directory to tar.gz
if (path.toFile().isDirectory()) {
path = ArchiveUtils.createTarGz(path);
isDirectory = true;
layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "true");
}
Layer layer = uploadBlob(containerRef, path, layerAnnotations);
if (isDirectory) {
layer = layer.withMediaType(Const.DEFAULT_BLOB_DIR_MEDIA_TYPE);
try {
if (Files.isDirectory(path)) {
// Create tar.gz archive for directory
Path tempArchive = ArchiveUtils.createTarGz(path);
try (InputStream is = Files.newInputStream(tempArchive)) {
long size = Files.size(tempArchive);
Layer layer = pushBlobStream(containerRef, is, size)
.withMediaType(Const.DEFAULT_BLOB_DIR_MEDIA_TYPE) // Use tar+gzip for directories
.withAnnotations(Map.of(
Const.ANNOTATION_TITLE,
path.getFileName().toString(),
Const.ANNOTATION_ORAS_UNPACK,
"true"));
layers.add(layer);
LOG.info("Uploaded directory: {}", layer.getDigest());
}
Files.delete(tempArchive);
} else {
try (InputStream is = Files.newInputStream(path)) {
long size = Files.size(path);
// Set mediaType for individual files
String mediaType = Files.probeContentType(path);
if (mediaType == null) {
mediaType = "application/octet-stream";
}
Layer layer = pushBlobStream(containerRef, is, size)
.withMediaType(mediaType)
.withAnnotations(Map.of(
Const.ANNOTATION_TITLE,
path.getFileName().toString()));
layers.add(layer);
LOG.info("Uploaded: {}", layer.getDigest());
}
}
} catch (IOException e) {
throw new OrasException("Failed to push artifact", e);
}
layers.add(layer);
LOG.info("Uploaded: {}", layer.getDigest());
}
// Push the config like any other blob
Config pushedConfig = pushConfig(containerRef, config != null ? config : Config.empty());
Expand Down Expand Up @@ -642,4 +655,128 @@ public Registry build() {
return registry.build();
}
}

/**
* Push a blob using input stream to avoid loading the whole blob in memory
* @param containerRef the container ref
* @param input the input stream
* @param size the size of the blob
* @return The Layer containing the uploaded blob information
* @throws OrasException if upload fails or digest calculation fails
*/
public Layer pushBlobStream(ContainerRef containerRef, InputStream input, long size) {
Path tempFile = null;
try {
// Create a temporary file to store the stream content
tempFile = Files.createTempFile("oras-upload-", ".tmp");

// Copy input stream to temp file while calculating digest
String digest;
try (InputStream bufferedInput = new BufferedInputStream(input);
DigestInputStream digestInput =
new DigestInputStream(bufferedInput, MessageDigest.getInstance("SHA-256"));
OutputStream fileOutput = Files.newOutputStream(tempFile)) {

digestInput.transferTo(fileOutput);
byte[] digestBytes = digestInput.getMessageDigest().digest();
digest = "sha256:" + bytesToHex(digestBytes);
}

// Check if the blob already exists
if (hasBlob(containerRef.withDigest(digest))) {
LOG.info("Blob already exists: {}", digest);
return Layer.fromDigest(digest, size);
}

// Construct the URI for initiating the upload
URI baseUri = URI.create("%s://%s".formatted(getScheme(), containerRef.getBlobsUploadPath()));
System.out.println("Initiating blob upload at: " + baseUri);

// Create an empty input stream for the initial POST request
InputStream emptyStream = new ByteArrayInputStream(new byte[0]);

// Start with a POST request to initiate the upload
OrasHttpClient.ResponseWrapper<String> initiateResponse = client.uploadStream(
"POST",
baseUri,
emptyStream,
0,
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE));

if (initiateResponse.statusCode() != 202) {
throw new OrasException("Failed to initiate blob upload: " + initiateResponse.statusCode());
}

// Get the location URL for the actual upload
String locationUrl = initiateResponse.headers().get("location");
if (locationUrl == null || locationUrl.isEmpty()) {
throw new OrasException("No location URL provided for blob upload");
}

// Ensure the location URL is absolute
if (!locationUrl.startsWith("http")) {
locationUrl = "%s://%s%s".formatted(getScheme(), containerRef.getRegistry(), locationUrl);
}

// Construct the final upload URI with the digest parameter
String separator = locationUrl.contains("?") ? "&" : "?";
URI finalizeUri = URI.create(locationUrl + separator + "digest=" + digest);

// Upload the content from the temporary file
try (InputStream uploadStream = Files.newInputStream(tempFile)) {
OrasHttpClient.ResponseWrapper<String> uploadResponse = client.uploadStream(
"PUT",
finalizeUri,
uploadStream,
size,
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE));

if (uploadResponse.statusCode() != 201 && uploadResponse.statusCode() != 202) {
throw new OrasException("Failed to upload blob: " + uploadResponse.statusCode() + " - Response: "
+ uploadResponse.response());
}

return Layer.fromDigest(digest, size);
}
} catch (IOException | NoSuchAlgorithmException e) {
System.err.println("Error during blob upload: " + e.getMessage());
e.printStackTrace();
throw new OrasException("Failed to push blob stream", e);
} finally {
// Clean up the temporary file
if (tempFile != null) {
try {
Files.deleteIfExists(tempFile);
} catch (IOException e) {
LOG.warn("Failed to delete temporary file: {}", tempFile, e);
}
}
}
}

/**
* Bites to hex string
* @param bytes of bytes[]
*/
private static String bytesToHex(byte[] bytes) {
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
}

/**
* Get blob as stream to avoid loading into memory
* @param containerRef The container ref
* @return The input stream
*/
public InputStream getBlobStream(ContainerRef containerRef) {
// Similar to fetchBlob()
return fetchBlob(containerRef);
}
}
24 changes: 24 additions & 0 deletions src/main/java/land/oras/utils/DigestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,28 @@ public static String digest(String algorithm, byte[] bytes) {
throw new OrasException("Failed to calculate digest", e);
}
}

/**
* Calculate the sha256 digest of a InputStream
* @param input The input
* @return The digest
*/
public static String sha256(InputStream input) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = input.read(buffer)) != -1) {
digest.update(buffer, 0, bytesRead);
}
byte[] hashBytes = digest.digest();
StringBuilder sb = new StringBuilder();
for (byte b : hashBytes) {
sb.append(String.format("%02x", b));
}
return "sha256:%s".formatted(sb.toString());
} catch (Exception e) {
throw new OrasException("Failed to calculate digest", e);
}
}
}
45 changes: 39 additions & 6 deletions src/main/java/land/oras/utils/OrasHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,35 @@ public ResponseWrapper<String> put(URI uri, byte[] body, Map<String, String> hea
HttpRequest.BodyPublishers.ofByteArray(body));
}

/**
* Upload a stream
* @param method The method (POST or PUT)
* @param uri The URI
* @param input The input stream
* @param size The size of the stream
* @param headers The headers
* @return The response
*/
public ResponseWrapper<String> uploadStream(
String method, URI uri, InputStream input, long size, Map<String, String> headers) {
try {
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(() -> input);

HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder().uri(uri).method(method, publisher);

// Add headers
headers.forEach(requestBuilder::header);

// Execute request
HttpRequest request = requestBuilder.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
return toResponseWrapper(response);
} catch (Exception e) {
throw new OrasException("Failed to upload stream", e);
}
}

/**
* Execute a request
* @param method The method
Expand Down Expand Up @@ -296,17 +325,21 @@ private <T> ResponseWrapper<T> executeRequest(
HttpRequest request = builder.build();
logRequest(request, body);
HttpResponse<T> response = client.send(request, handler);
return new ResponseWrapper<T>(
response.body(),
response.statusCode(),
response.headers().map().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().get(0))));
return toResponseWrapper(response);
} catch (Exception e) {
throw new OrasException("Unable to create HTTP request", e);
}
}

private <T> ResponseWrapper<T> toResponseWrapper(HttpResponse<T> response) {
return new ResponseWrapper<>(
response.body(),
response.statusCode(),
response.headers().map().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().get(0))));
}

/**
* Logs the request in debug/trace mode
* @param request The request
Expand Down
Loading
Loading