diff --git a/src/main/java/land/oras/Registry.java b/src/main/java/land/oras/Registry.java index 6ee4cef..785efc6 100644 --- a/src/main/java/land/oras/Registry.java +++ b/src/main/java/land/oras/Registry.java @@ -1,11 +1,18 @@ package land.oras; +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.security.DigestInputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import land.oras.auth.*; @@ -216,24 +223,16 @@ public Manifest pushArtifact( public void pullArtifact(ContainerRef containerRef, Path path, boolean overwrite) { Manifest manifest = getManifest(containerRef); for (Layer layer : manifest.getLayers()) { - // Archive - if (layer.getMediaType().equals(Const.DEFAULT_BLOB_DIR_MEDIA_TYPE)) { - Path archive = ArchiveUtils.createTempArchive(); - fetchBlob(containerRef.withDigest(layer.getDigest()), archive); - LOG.debug("Extracting: {} to {}", archive, path); - ArchiveUtils.extractTarGz(archive, path); - LOG.debug("Extracted: {}", path); - } - // Single artifact layer - else { - // Take the filename of default to the digest - String fileName = layer.getAnnotations().getOrDefault(Const.ANNOTATION_TITLE, layer.getDigest()); - Path filePath = path.resolve(fileName); - if (Files.exists(filePath) && !overwrite) { - LOG.info("File already exists. Not overriding: {}", filePath); - } else { - fetchBlob(containerRef.withDigest(layer.getDigest()), filePath); - } + Path targetPath = + path.resolve(layer.getAnnotations().getOrDefault(Const.ANNOTATION_TITLE, layer.getDigest())); + + try (InputStream is = fetchBlob(containerRef.withDigest(layer.getDigest()))) { + Files.copy( + is, + targetPath, + overwrite ? StandardCopyOption.REPLACE_EXISTING : StandardCopyOption.ATOMIC_MOVE); + } catch (IOException e) { + throw new OrasException("Failed to pull artifact", e); } } } @@ -265,29 +264,43 @@ public Manifest pushArtifact( List layers = new ArrayList<>(); // Upload all files as blobs for (Path path : paths) { - - // Save filename in case of compressed archive - String fileName = path.getFileName().toString(); - boolean isDirectory = false; - - // Add layer annotation for the specific file - Map layerAnnotations = new HashMap<>(annotations.getFileAnnotations(fileName)); - - // Add title annotation - layerAnnotations.put(Const.ANNOTATION_TITLE, fileName); - - // Compress directory to tar.gz - if (path.toFile().isDirectory()) { - path = ArchiveUtils.createTarGz(path); - isDirectory = true; - layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "true"); - } - Layer layer = uploadBlob(containerRef, path, layerAnnotations); - if (isDirectory) { - layer = layer.withMediaType(Const.DEFAULT_BLOB_DIR_MEDIA_TYPE); + try { + if (Files.isDirectory(path)) { + // Create tar.gz archive for directory + Path tempArchive = ArchiveUtils.createTarGz(path); + try (InputStream is = Files.newInputStream(tempArchive)) { + long size = Files.size(tempArchive); + Layer layer = pushBlobStream(containerRef, is, size) + .withMediaType(Const.DEFAULT_BLOB_DIR_MEDIA_TYPE) // Use tar+gzip for directories + .withAnnotations(Map.of( + Const.ANNOTATION_TITLE, + path.getFileName().toString(), + Const.ANNOTATION_ORAS_UNPACK, + "true")); + layers.add(layer); + LOG.info("Uploaded directory: {}", layer.getDigest()); + } + Files.delete(tempArchive); + } else { + try (InputStream is = Files.newInputStream(path)) { + long size = Files.size(path); + // Set mediaType for individual files + String mediaType = Files.probeContentType(path); + if (mediaType == null) { + mediaType = "application/octet-stream"; + } + Layer layer = pushBlobStream(containerRef, is, size) + .withMediaType(mediaType) + .withAnnotations(Map.of( + Const.ANNOTATION_TITLE, + path.getFileName().toString())); + layers.add(layer); + LOG.info("Uploaded: {}", layer.getDigest()); + } + } + } catch (IOException e) { + throw new OrasException("Failed to push artifact", e); } - layers.add(layer); - LOG.info("Uploaded: {}", layer.getDigest()); } // Push the config like any other blob Config pushedConfig = pushConfig(containerRef, config != null ? config : Config.empty()); @@ -642,4 +655,128 @@ public Registry build() { return registry.build(); } } + + /** + * Push a blob using input stream to avoid loading the whole blob in memory + * @param containerRef the container ref + * @param input the input stream + * @param size the size of the blob + * @return The Layer containing the uploaded blob information + * @throws OrasException if upload fails or digest calculation fails + */ + public Layer pushBlobStream(ContainerRef containerRef, InputStream input, long size) { + Path tempFile = null; + try { + // Create a temporary file to store the stream content + tempFile = Files.createTempFile("oras-upload-", ".tmp"); + + // Copy input stream to temp file while calculating digest + String digest; + try (InputStream bufferedInput = new BufferedInputStream(input); + DigestInputStream digestInput = + new DigestInputStream(bufferedInput, MessageDigest.getInstance("SHA-256")); + OutputStream fileOutput = Files.newOutputStream(tempFile)) { + + digestInput.transferTo(fileOutput); + byte[] digestBytes = digestInput.getMessageDigest().digest(); + digest = "sha256:" + bytesToHex(digestBytes); + } + + // Check if the blob already exists + if (hasBlob(containerRef.withDigest(digest))) { + LOG.info("Blob already exists: {}", digest); + return Layer.fromDigest(digest, size); + } + + // Construct the URI for initiating the upload + URI baseUri = URI.create("%s://%s".formatted(getScheme(), containerRef.getBlobsUploadPath())); + System.out.println("Initiating blob upload at: " + baseUri); + + // Create an empty input stream for the initial POST request + InputStream emptyStream = new ByteArrayInputStream(new byte[0]); + + // Start with a POST request to initiate the upload + OrasHttpClient.ResponseWrapper initiateResponse = client.uploadStream( + "POST", + baseUri, + emptyStream, + 0, + Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE)); + + if (initiateResponse.statusCode() != 202) { + throw new OrasException("Failed to initiate blob upload: " + initiateResponse.statusCode()); + } + + // Get the location URL for the actual upload + String locationUrl = initiateResponse.headers().get("location"); + if (locationUrl == null || locationUrl.isEmpty()) { + throw new OrasException("No location URL provided for blob upload"); + } + + // Ensure the location URL is absolute + if (!locationUrl.startsWith("http")) { + locationUrl = "%s://%s%s".formatted(getScheme(), containerRef.getRegistry(), locationUrl); + } + + // Construct the final upload URI with the digest parameter + String separator = locationUrl.contains("?") ? "&" : "?"; + URI finalizeUri = URI.create(locationUrl + separator + "digest=" + digest); + + // Upload the content from the temporary file + try (InputStream uploadStream = Files.newInputStream(tempFile)) { + OrasHttpClient.ResponseWrapper uploadResponse = client.uploadStream( + "PUT", + finalizeUri, + uploadStream, + size, + Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE)); + + if (uploadResponse.statusCode() != 201 && uploadResponse.statusCode() != 202) { + throw new OrasException("Failed to upload blob: " + uploadResponse.statusCode() + " - Response: " + + uploadResponse.response()); + } + + return Layer.fromDigest(digest, size); + } + } catch (IOException | NoSuchAlgorithmException e) { + System.err.println("Error during blob upload: " + e.getMessage()); + e.printStackTrace(); + throw new OrasException("Failed to push blob stream", e); + } finally { + // Clean up the temporary file + if (tempFile != null) { + try { + Files.deleteIfExists(tempFile); + } catch (IOException e) { + LOG.warn("Failed to delete temporary file: {}", tempFile, e); + } + } + } + } + + /** + * Bites to hex string + * @param bytes of bytes[] + */ + private static String bytesToHex(byte[] bytes) { + StringBuilder hexString = new StringBuilder(); + for (byte b : bytes) { + String hex = Integer.toHexString(0xff & b); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex); + } + return hexString.toString(); + } + + /** + * Get blob as stream to avoid loading into memory + * @param containerRef The container ref + * @return The input stream + */ + public InputStream getBlobStream(ContainerRef containerRef) { + // Similar to fetchBlob() + return fetchBlob(containerRef); + } } diff --git a/src/main/java/land/oras/utils/DigestUtils.java b/src/main/java/land/oras/utils/DigestUtils.java index 02d7363..8399828 100644 --- a/src/main/java/land/oras/utils/DigestUtils.java +++ b/src/main/java/land/oras/utils/DigestUtils.java @@ -86,4 +86,28 @@ public static String digest(String algorithm, byte[] bytes) { throw new OrasException("Failed to calculate digest", e); } } + + /** + * Calculate the sha256 digest of a InputStream + * @param input The input + * @return The digest + */ + public static String sha256(InputStream input) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = input.read(buffer)) != -1) { + digest.update(buffer, 0, bytesRead); + } + byte[] hashBytes = digest.digest(); + StringBuilder sb = new StringBuilder(); + for (byte b : hashBytes) { + sb.append(String.format("%02x", b)); + } + return "sha256:%s".formatted(sb.toString()); + } catch (Exception e) { + throw new OrasException("Failed to calculate digest", e); + } + } } diff --git a/src/main/java/land/oras/utils/OrasHttpClient.java b/src/main/java/land/oras/utils/OrasHttpClient.java index 9cccade..3ed41ed 100644 --- a/src/main/java/land/oras/utils/OrasHttpClient.java +++ b/src/main/java/land/oras/utils/OrasHttpClient.java @@ -268,6 +268,35 @@ public ResponseWrapper put(URI uri, byte[] body, Map hea HttpRequest.BodyPublishers.ofByteArray(body)); } + /** + * Upload a stream + * @param method The method (POST or PUT) + * @param uri The URI + * @param input The input stream + * @param size The size of the stream + * @param headers The headers + * @return The response + */ + public ResponseWrapper uploadStream( + String method, URI uri, InputStream input, long size, Map headers) { + try { + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(() -> input); + + HttpRequest.Builder requestBuilder = + HttpRequest.newBuilder().uri(uri).method(method, publisher); + + // Add headers + headers.forEach(requestBuilder::header); + + // Execute request + HttpRequest request = requestBuilder.build(); + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + return toResponseWrapper(response); + } catch (Exception e) { + throw new OrasException("Failed to upload stream", e); + } + } + /** * Execute a request * @param method The method @@ -296,17 +325,21 @@ private ResponseWrapper executeRequest( HttpRequest request = builder.build(); logRequest(request, body); HttpResponse response = client.send(request, handler); - return new ResponseWrapper( - response.body(), - response.statusCode(), - response.headers().map().entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, e -> e.getValue().get(0)))); + return toResponseWrapper(response); } catch (Exception e) { throw new OrasException("Unable to create HTTP request", e); } } + private ResponseWrapper toResponseWrapper(HttpResponse response) { + return new ResponseWrapper<>( + response.body(), + response.statusCode(), + response.headers().map().entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, e -> e.getValue().get(0)))); + } + /** * Logs the request in debug/trace mode * @param request The request diff --git a/src/test/java/land/oras/RegistryTest.java b/src/test/java/land/oras/RegistryTest.java index b29acf2..89cc723 100644 --- a/src/test/java/land/oras/RegistryTest.java +++ b/src/test/java/land/oras/RegistryTest.java @@ -11,8 +11,11 @@ import java.nio.file.Path; import java.util.List; import java.util.Map; +import java.util.Random; import land.oras.utils.Const; +import land.oras.utils.DigestUtils; import land.oras.utils.RegistryContainer; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -214,4 +217,154 @@ void testShouldPushCompressedDirectory() throws IOException { assertEquals(blobDir.getFileName().toString(), annotations.get(Const.ANNOTATION_TITLE)); assertEquals("true", annotations.get(Const.ANNOTATION_ORAS_UNPACK)); } + + // Push blob - successfull + // Push blob - failed - when blob already exists + // Push blob - Handles io exception + // Handle large stream content + @Test + void shouldPushAndGetBlobStream() throws IOException { + Registry registry = Registry.Builder.builder() + .withInsecure(true) + .withSkipTlsVerify(true) + .build(); + ContainerRef containerRef = + ContainerRef.parse("%s/library/artifact-stream".formatted(this.registry.getRegistry())); + + // Create a file with test data to get accurate stream size + Path testFile = Files.createTempFile("test-data-", ".tmp"); + String testData = "Hello World Stream Test"; + Files.writeString(testFile, testData); + long fileSize = Files.size(testFile); + + // Test pushBlobStream using file input stream + Layer layer; + try (InputStream inputStream = Files.newInputStream(testFile)) { + layer = registry.pushBlobStream(containerRef, inputStream, fileSize); + + // Verify the digest matches SHA-256 of content + assertEquals(DigestUtils.sha256(testFile), layer.getDigest()); + assertEquals(fileSize, layer.getSize()); + } + + // Test getBlobStream + try (InputStream resultStream = registry.getBlobStream(containerRef.withDigest(layer.getDigest()))) { + String result = new String(resultStream.readAllBytes()); + assertEquals(testData, result); + } + + // Clean up + Files.delete(testFile); + registry.deleteBlob(containerRef.withDigest(layer.getDigest())); + } + + @Test + void shouldHandleExistingBlobInStreamPush() throws IOException { + Registry registry = Registry.Builder.builder() + .withInsecure(true) + .withSkipTlsVerify(true) + .build(); + ContainerRef containerRef = + ContainerRef.parse("%s/library/artifact-stream".formatted(this.registry.getRegistry())); + + // Create test file + Path testFile = Files.createTempFile("test-data-", ".tmp"); + Files.writeString(testFile, "Test Content"); + long fileSize = Files.size(testFile); + String expectedDigest = DigestUtils.sha256(testFile); + + // First push + Layer firstLayer; + try (InputStream inputStream = Files.newInputStream(testFile)) { + firstLayer = registry.pushBlobStream(containerRef, inputStream, fileSize); + } + + // Second push of same content should detect existing blob + Layer secondLayer; + try (InputStream inputStream = Files.newInputStream(testFile)) { + secondLayer = registry.pushBlobStream(containerRef, inputStream, fileSize); + } + + // Verify both operations return same digest + assertEquals(expectedDigest, firstLayer.getDigest()); + assertEquals(expectedDigest, secondLayer.getDigest()); + assertEquals(firstLayer.getSize(), secondLayer.getSize()); + + // Clean up + Files.delete(testFile); + registry.deleteBlob(containerRef.withDigest(firstLayer.getDigest())); + } + + @Test + void shouldHandleIOExceptionInStreamPush() throws IOException { + Registry registry = Registry.Builder.builder() + .withInsecure(true) + .withSkipTlsVerify(true) + .build(); + ContainerRef containerRef = + ContainerRef.parse("%s/library/artifact-stream".formatted(this.registry.getRegistry())); + + // Create a failing input stream + InputStream failingStream = new InputStream() { + @Override + public int read() throws IOException { + throw new IOException("Simulated IO failure"); + } + }; + + // Verify exception is wrapped in OrasException + OrasException exception = + assertThrows(OrasException.class, () -> registry.pushBlobStream(containerRef, failingStream, 100)); + assertEquals("Failed to push blob stream", exception.getMessage()); + assertTrue(exception.getCause() instanceof IOException); + } + + @Test + void shouldHandleNonExistentBlobInGetStream() { + Registry registry = Registry.Builder.builder() + .withInsecure(true) + .withSkipTlsVerify(true) + .build(); + ContainerRef containerRef = + ContainerRef.parse("%s/library/artifact-stream".formatted(this.registry.getRegistry())); + + // Try to get non-existent blob + String nonExistentDigest = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + + // Verify it throws OrasException + assertThrows(OrasException.class, () -> registry.getBlobStream(containerRef.withDigest(nonExistentDigest))); + } + + @Test + void shouldHandleLargeStreamContent() throws IOException { + Registry registry = Registry.Builder.builder() + .withInsecure(true) + .withSkipTlsVerify(true) + .build(); + ContainerRef containerRef = + ContainerRef.parse("%s/library/artifact-stream".formatted(this.registry.getRegistry())); + + // Create temp file with 5MB of random data + Path largeFile = Files.createTempFile("large-test-", ".tmp"); + byte[] largeData = new byte[5 * 1024 * 1024]; + new Random().nextBytes(largeData); + Files.write(largeFile, largeData); + long fileSize = Files.size(largeFile); + + // Push large content + Layer layer; + try (InputStream inputStream = Files.newInputStream(largeFile)) { + layer = registry.pushBlobStream(containerRef, inputStream, fileSize); + } + + // Verify content with stream + try (InputStream resultStream = registry.getBlobStream(containerRef.withDigest(layer.getDigest()))) { + byte[] result = resultStream.readAllBytes(); + Assertions.assertArrayEquals(largeData, result); + } + + // Clean up + Files.delete(largeFile); + registry.deleteBlob(containerRef.withDigest(layer.getDigest())); + } }