Skip to content

Commit

Permalink
Add stream API methods (oras-project#60)
Browse files Browse the repository at this point in the history
Co-authored-by: Valentin Delaye <[email protected]>
  • Loading branch information
2 people authored and AayushSaini101 committed Feb 12, 2025
1 parent a20fdfc commit 327ec97
Show file tree
Hide file tree
Showing 4 changed files with 394 additions and 49 deletions.
219 changes: 178 additions & 41 deletions src/main/java/land/oras/Registry.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package land.oras;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import land.oras.auth.*;
Expand Down Expand Up @@ -217,24 +224,16 @@ public Manifest pushArtifact(
public void pullArtifact(ContainerRef containerRef, Path path, boolean overwrite) {
Manifest manifest = getManifest(containerRef);
for (Layer layer : manifest.getLayers()) {
// Archive
if (layer.getMediaType().equals(Const.DEFAULT_BLOB_DIR_MEDIA_TYPE)) {
Path archive = ArchiveUtils.createTempArchive();
fetchBlob(containerRef.withDigest(layer.getDigest()), archive);
LOG.debug("Extracting: {} to {}", archive, path);
ArchiveUtils.extractTarGz(archive, path);
LOG.debug("Extracted: {}", path);
}
// Single artifact layer
else {
// Take the filename of default to the digest
String fileName = layer.getAnnotations().getOrDefault(Const.ANNOTATION_TITLE, layer.getDigest());
Path filePath = path.resolve(fileName);
if (Files.exists(filePath) && !overwrite) {
LOG.info("File already exists. Not overriding: {}", filePath);
} else {
fetchBlob(containerRef.withDigest(layer.getDigest()), filePath);
}
Path targetPath =
path.resolve(layer.getAnnotations().getOrDefault(Const.ANNOTATION_TITLE, layer.getDigest()));

try (InputStream is = fetchBlob(containerRef.withDigest(layer.getDigest()))) {
Files.copy(
is,
targetPath,
overwrite ? StandardCopyOption.REPLACE_EXISTING : StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
throw new OrasException("Failed to pull artifact", e);
}
}
}
Expand Down Expand Up @@ -266,29 +265,43 @@ public Manifest pushArtifact(
List<Layer> layers = new ArrayList<>();
// Upload all files as blobs
for (Path path : paths) {

// Save filename in case of compressed archive
String fileName = path.getFileName().toString();
boolean isDirectory = false;

// Add layer annotation for the specific file
Map<String, String> layerAnnotations = new HashMap<>(annotations.getFileAnnotations(fileName));

// Add title annotation
layerAnnotations.put(Const.ANNOTATION_TITLE, fileName);

// Compress directory to tar.gz
if (path.toFile().isDirectory()) {
path = ArchiveUtils.createTarGz(path);
isDirectory = true;
layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "true");
}
Layer layer = uploadBlob(containerRef, path, layerAnnotations);
if (isDirectory) {
layer = layer.withMediaType(Const.DEFAULT_BLOB_DIR_MEDIA_TYPE);
try {
if (Files.isDirectory(path)) {
// Create tar.gz archive for directory
Path tempArchive = ArchiveUtils.createTarGz(path);
try (InputStream is = Files.newInputStream(tempArchive)) {
long size = Files.size(tempArchive);
Layer layer = pushBlobStream(containerRef, is, size)
.withMediaType(Const.DEFAULT_BLOB_DIR_MEDIA_TYPE) // Use tar+gzip for directories
.withAnnotations(Map.of(
Const.ANNOTATION_TITLE,
path.getFileName().toString(),
Const.ANNOTATION_ORAS_UNPACK,
"true"));
layers.add(layer);
LOG.info("Uploaded directory: {}", layer.getDigest());
}
Files.delete(tempArchive);
} else {
try (InputStream is = Files.newInputStream(path)) {
long size = Files.size(path);
// Set mediaType for individual files
String mediaType = Files.probeContentType(path);
if (mediaType == null) {
mediaType = "application/octet-stream";
}
Layer layer = pushBlobStream(containerRef, is, size)
.withMediaType(mediaType)
.withAnnotations(Map.of(
Const.ANNOTATION_TITLE,
path.getFileName().toString()));
layers.add(layer);
LOG.info("Uploaded: {}", layer.getDigest());
}
}
} catch (IOException e) {
throw new OrasException("Failed to push artifact", e);
}
layers.add(layer);
LOG.info("Uploaded: {}", layer.getDigest());
}
// Push the config like any other blob
Config pushedConfig = pushConfig(containerRef, config != null ? config : Config.empty());
Expand Down Expand Up @@ -643,4 +656,128 @@ public Registry build() {
return registry.build();
}
}

/**
* Push a blob using input stream to avoid loading the whole blob in memory
* @param containerRef the container ref
* @param input the input stream
* @param size the size of the blob
* @return The Layer containing the uploaded blob information
* @throws OrasException if upload fails or digest calculation fails
*/
public Layer pushBlobStream(ContainerRef containerRef, InputStream input, long size) {
Path tempFile = null;
try {
// Create a temporary file to store the stream content
tempFile = Files.createTempFile("oras-upload-", ".tmp");

// Copy input stream to temp file while calculating digest
String digest;
try (InputStream bufferedInput = new BufferedInputStream(input);
DigestInputStream digestInput =
new DigestInputStream(bufferedInput, MessageDigest.getInstance("SHA-256"));
OutputStream fileOutput = Files.newOutputStream(tempFile)) {

digestInput.transferTo(fileOutput);
byte[] digestBytes = digestInput.getMessageDigest().digest();
digest = "sha256:" + bytesToHex(digestBytes);
}

// Check if the blob already exists
if (hasBlob(containerRef.withDigest(digest))) {
LOG.info("Blob already exists: {}", digest);
return Layer.fromDigest(digest, size);
}

// Construct the URI for initiating the upload
URI baseUri = URI.create("%s://%s".formatted(getScheme(), containerRef.getBlobsUploadPath()));
System.out.println("Initiating blob upload at: " + baseUri);

// Create an empty input stream for the initial POST request
InputStream emptyStream = new ByteArrayInputStream(new byte[0]);

// Start with a POST request to initiate the upload
OrasHttpClient.ResponseWrapper<String> initiateResponse = client.uploadStream(
"POST",
baseUri,
emptyStream,
0,
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE));

if (initiateResponse.statusCode() != 202) {
throw new OrasException("Failed to initiate blob upload: " + initiateResponse.statusCode());
}

// Get the location URL for the actual upload
String locationUrl = initiateResponse.headers().get("location");
if (locationUrl == null || locationUrl.isEmpty()) {
throw new OrasException("No location URL provided for blob upload");
}

// Ensure the location URL is absolute
if (!locationUrl.startsWith("http")) {
locationUrl = "%s://%s%s".formatted(getScheme(), containerRef.getRegistry(), locationUrl);
}

// Construct the final upload URI with the digest parameter
String separator = locationUrl.contains("?") ? "&" : "?";
URI finalizeUri = URI.create(locationUrl + separator + "digest=" + digest);

// Upload the content from the temporary file
try (InputStream uploadStream = Files.newInputStream(tempFile)) {
OrasHttpClient.ResponseWrapper<String> uploadResponse = client.uploadStream(
"PUT",
finalizeUri,
uploadStream,
size,
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE));

if (uploadResponse.statusCode() != 201 && uploadResponse.statusCode() != 202) {
throw new OrasException("Failed to upload blob: " + uploadResponse.statusCode() + " - Response: "
+ uploadResponse.response());
}

return Layer.fromDigest(digest, size);
}
} catch (IOException | NoSuchAlgorithmException e) {
System.err.println("Error during blob upload: " + e.getMessage());
e.printStackTrace();
throw new OrasException("Failed to push blob stream", e);
} finally {
// Clean up the temporary file
if (tempFile != null) {
try {
Files.deleteIfExists(tempFile);
} catch (IOException e) {
LOG.warn("Failed to delete temporary file: {}", tempFile, e);
}
}
}
}

/**
* Bites to hex string
* @param bytes of bytes[]
*/
private static String bytesToHex(byte[] bytes) {
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
}

/**
* Get blob as stream to avoid loading into memory
* @param containerRef The container ref
* @return The input stream
*/
public InputStream getBlobStream(ContainerRef containerRef) {
// Similar to fetchBlob()
return fetchBlob(containerRef);
}
}
24 changes: 24 additions & 0 deletions src/main/java/land/oras/utils/DigestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,28 @@ public static String digest(String algorithm, byte[] bytes) {
throw new OrasException("Failed to calculate digest", e);
}
}

/**
* Calculate the sha256 digest of a InputStream
* @param input The input
* @return The digest
*/
public static String sha256(InputStream input) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = input.read(buffer)) != -1) {
digest.update(buffer, 0, bytesRead);
}
byte[] hashBytes = digest.digest();
StringBuilder sb = new StringBuilder();
for (byte b : hashBytes) {
sb.append(String.format("%02x", b));
}
return "sha256:%s".formatted(sb.toString());
} catch (Exception e) {
throw new OrasException("Failed to calculate digest", e);
}
}
}
45 changes: 39 additions & 6 deletions src/main/java/land/oras/utils/OrasHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,35 @@ public ResponseWrapper<String> put(URI uri, byte[] body, Map<String, String> hea
HttpRequest.BodyPublishers.ofByteArray(body));
}

/**
* Upload a stream
* @param method The method (POST or PUT)
* @param uri The URI
* @param input The input stream
* @param size The size of the stream
* @param headers The headers
* @return The response
*/
public ResponseWrapper<String> uploadStream(
String method, URI uri, InputStream input, long size, Map<String, String> headers) {
try {
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(() -> input);

HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder().uri(uri).method(method, publisher);

// Add headers
headers.forEach(requestBuilder::header);

// Execute request
HttpRequest request = requestBuilder.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
return toResponseWrapper(response);
} catch (Exception e) {
throw new OrasException("Failed to upload stream", e);
}
}

/**
* Execute a request
* @param method The method
Expand Down Expand Up @@ -297,17 +326,21 @@ private <T> ResponseWrapper<T> executeRequest(
HttpRequest request = builder.build();
logRequest(request, body);
HttpResponse<T> response = client.send(request, handler);
return new ResponseWrapper<T>(
response.body(),
response.statusCode(),
response.headers().map().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().get(0))));
return toResponseWrapper(response);
} catch (Exception e) {
throw new OrasException("Unable to create HTTP request", e);
}
}

private <T> ResponseWrapper<T> toResponseWrapper(HttpResponse<T> response) {
return new ResponseWrapper<>(
response.body(),
response.statusCode(),
response.headers().map().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().get(0))));
}

/**
* Logs the request in debug/trace mode
* @param request The request
Expand Down
Loading

0 comments on commit 327ec97

Please sign in to comment.