Skip to content

Commit

Permalink
Integration test for remote/disk storage copy
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Nov 11, 2023
1 parent 1954dfd commit 10e0771
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.files

import akka.actor.typed.ActorSystem
import akka.actor.{ActorSystem => ClassicActorSystem}
import akka.http.scaladsl.model.BodyPartEntity
//import akka.http.javadsl.model.BodyPartEntity
import akka.http.scaladsl.model.ContentTypes.`application/octet-stream`
import akka.http.scaladsl.model.{ContentType, HttpEntity, Uri}
import akka.http.scaladsl.model._
import cats.effect.{Clock, ContextShift, IO, Timer}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.cache.LocalCache
Expand Down Expand Up @@ -201,16 +199,21 @@ final class Files(

// TODO comments
def copy(
id: FileId,
sourceId: FileId,
destination: FileCopyDestination
)(implicit c: Caller): IO[FileResource] = {
for {
(file, description, sourceEntity) <- fetchSourceFile(id)
_ <- logger.info(s"Fetching source file")
(file, description, sourceEntity) <- fetchSourceFile(sourceId)
_ <- logger.info(s"Fetched source file, fetching destination storage")
(pc, storageRef, storage) <- fetchDestinationStorage(destination)
_ <- logger.info(s"Fetched destination storage, validating storage type")
_ <- validateStorageTypeForCopy(file.storageType, storage)
iri <- generateId(pc)
iri <- destination.id.fold(generateId(pc))(_.expandIri(fetchContext.onCreate).map(_._1))
_ <- logger.info(s"Validated storage type, saving file")
// TODO description has no metadata related to the Id only the file contents, can we reuse the old one?
attributes <- saveFile(iri, storage, description, sourceEntity)
_ <- logger.info(s"Saved file, evaluating creation command destination storage")
res <- eval(CreateFile(iri, destination.project, storageRef, storage.tpe, attributes, c.subject, destination.tag))
} yield res
}.span("copyFile")
Expand All @@ -223,8 +226,12 @@ final class Files(
_ <- validateAuth(id.project, sourceStorage.value.storageValue.readPermission)
attributes = file.value.attributes
sourceBytes <- fetchFile(sourceStorage.value, attributes, file.id)
entity = HttpEntity(attributes.mediaType.getOrElse(`application/octet-stream`), sourceBytes)
(description, sourceEntity) <- extractFormData(iri, sourceStorage.value, entity)
bodyPartEntity =
HttpEntity(attributes.mediaType.getOrElse(ContentTypes.NoContentType), attributes.bytes, sourceBytes)
// TODO should this be strict?
multipartEntity =
Multipart.FormData(Multipart.FormData.BodyPart("file", bodyPartEntity, Map("filename" -> attributes.filename)))
(description, sourceEntity) <- extractFormData(iri, sourceStorage.value, multipartEntity.toEntity())
} yield (file.value, description, sourceEntity)

private def fetchDestinationStorage(destination: FileCopyDestination)(implicit c: Caller) =
Expand All @@ -236,11 +243,7 @@ final class Files(
private def validateStorageTypeForCopy(source: StorageType, destination: Storage): IO[Unit] =
IO.raiseWhen(source == StorageType.S3Storage)(
WrappedStorageRejection(
InvalidStorageType(
destination.id,
found = source,
expected = Set(StorageType.DiskStorage, StorageType.RemoteDiskStorage)
)
InvalidStorageType(destination.id, source, Set(StorageType.DiskStorage, StorageType.RemoteDiskStorage))
)
) >>
IO.raiseUnless(source == destination.tpe)(
Expand Down Expand Up @@ -269,6 +272,7 @@ final class Files(
tag: Option[UserTag]
)(implicit caller: Caller): IO[FileResource] = {
for {
_ <- logger.info(s"DTBDTB updating file...")
(iri, pc) <- id.expandIri(fetchContext.onModify)
_ <- test(UpdateFile(iri, id.project, testStorageRef, testStorageType, testAttributes, rev, caller.subject, tag))
(storageRef, storage) <- fetchActiveStorage(storageId, id.project, pc)
Expand Down Expand Up @@ -305,6 +309,7 @@ final class Files(
tag: Option[UserTag]
)(implicit caller: Caller): IO[FileResource] = {
for {
_ <- logger.info(s"DTBDTB updating file link...")
(iri, pc) <- id.expandIri(fetchContext.onModify)
_ <- test(UpdateFile(iri, id.project, testStorageRef, testStorageType, testAttributes, rev, caller.subject, tag))
(storageRef, storage) <- fetchActiveStorage(storageId, id.project, pc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, MultipartUnmars
import akka.stream.scaladsl.{Keep, Sink}
import cats.effect.{ContextShift, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.http.MediaTypeDetectorConfig
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{FileUtils, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileDescription
Expand Down Expand Up @@ -44,6 +45,8 @@ sealed trait FormDataExtractor {
}
object FormDataExtractor {

private val log = Logger[FormDataExtractor]

private val fieldName: String = "file"

private val defaultContentType: ContentType.Binary = ContentTypes.`application/octet-stream`
Expand Down Expand Up @@ -91,6 +94,7 @@ object FormDataExtractor {
case Unmarshaller.NoContentException =>
WrappedAkkaRejection(RequestEntityExpectedRejection)
case x: UnsupportedContentTypeException =>
log.info(s"Supported media type is ${x.supported}").unsafeRunSync()
WrappedAkkaRejection(UnsupportedRequestContentTypeRejection(x.supported, x.actualContentType))
case x: IllegalArgumentException =>
WrappedAkkaRejection(ValidationRejection(Option(x.getMessage).getOrElse(""), Some(x)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.annotation.nowarn
* @param mediaType
* the optional media type of the file
* @param bytes
* the size of the file file in bytes
* the size of the file in bytes
* @param digest
* the digest information of the file
* @param origin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag

final case class FileCopyDestination(project: ProjectRef, storageId: Option[IdSegment], tag: Option[UserTag])
final case class FileCopyDestination(
project: ProjectRef,
id: Option[FileId],
storageId: Option[IdSegment],
tag: Option[UserTag]
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ object FileId {
def apply(ref: ResourceRef, project: ProjectRef): FileId = FileId(IdSegmentRef(ref), project)
def apply(id: IdSegment, tag: UserTag, project: ProjectRef): FileId = FileId(IdSegmentRef(id, tag), project)
def apply(id: IdSegment, rev: Int, project: ProjectRef): FileId = FileId(IdSegmentRef(id, rev), project)
def apply(id: IdSegment, project: ProjectRef): FileId = FileId(IdSegmentRef(id), project)

val iriExpander: ExpandIri[InvalidFileId] = new ExpandIri(InvalidFileId.apply)
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ object FileRejection {
final case class InvalidFileId(id: String)
extends FileRejection(s"File identifier '$id' cannot be expanded to an Iri.")

final case class InvalidProjectRef(ref: String) extends FileRejection(s"Project ref '$ref' is malformed.")

/**
* Signals the impossibility to update a file when the digest is not computed
*
Expand Down Expand Up @@ -289,6 +291,7 @@ object FileRejection {
case SaveRejection(_, _, SaveFileRejection.ResourceAlreadyExists(_)) => (StatusCodes.Conflict, Seq.empty)
case FetchRejection(_, _, _) => (StatusCodes.InternalServerError, Seq.empty)
case SaveRejection(_, _, _) => (StatusCodes.InternalServerError, Seq.empty)
case InvalidProjectRef(_) => (StatusCodes.BadRequest, Seq.empty)
case _ => (StatusCodes.BadRequest, Seq.empty)
}
}
Loading

0 comments on commit 10e0771

Please sign in to comment.