Skip to content

Commit

Permalink
Add support for bulk copying files to another project
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Nov 28, 2023
1 parent edb4e50 commit ddd0c99
Show file tree
Hide file tree
Showing 29 changed files with 748 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.files
import akka.actor.typed.ActorSystem
import akka.actor.{ActorSystem => ClassicActorSystem}
import akka.http.scaladsl.model.ContentTypes.`application/octet-stream`
import akka.http.scaladsl.model.{ContentType, HttpEntity, Uri}
import akka.http.scaladsl.model.{BodyPartEntity, ContentType, HttpEntity, Uri}
import cats.effect.{Clock, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.cache.LocalCache
Expand All @@ -19,9 +19,9 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.schemas.{files => fileSchema}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.{RemoteDiskStorageConfig, StorageTypeConfig}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.{StorageFetchRejection, StorageIsDeprecated}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.{DifferentStorageType, InvalidStorageType, StorageFetchRejection, StorageIsDeprecated}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage, StorageRejection, StorageType}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, FetchFileRejection, SaveFileRejection}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{CopyFileRejection, FetchAttributeRejection, FetchFileRejection, SaveFileRejection}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{Storages, StoragesStatistics}
Expand Down Expand Up @@ -195,6 +195,57 @@ final class Files(
} yield res
}.span("createLink")

/**
* Create a file from a source file potentially in a different organization
* @param sourceId
* File lookup id for the source file
* @param dest
* Project, storage and file details for the file we're creating
*/
def copyTo(
sourceId: FileId,
dest: CopyFileDestination
)(implicit c: Caller): IO[FileResource] = {
for {
file <- fetchSourceFile(sourceId)
(pc, destStorageRef, destStorage) <- fetchDestinationStorage(dest)
_ <- validateStorageTypeForCopy(file.storageType, destStorage)
space <- fetchStorageAvailableSpace(destStorage)
_ <- IO.raiseUnless(space.exists(_ < file.attributes.bytes))(
FileTooLarge(destStorage.storageValue.maxFileSize, space)
)
iri <- dest.fileId.fold(generateId(pc))(FileId(_, dest.project).expandIri(fetchContext.onCreate).map(_._1))
destinationDesc <- FileDescription(dest.filename.getOrElse(file.attributes.filename), file.attributes.mediaType)
attributes <- CopyFile(destStorage, remoteDiskStorageClient).apply(file.attributes, destinationDesc).adaptError {
case r: CopyFileRejection => CopyRejection(file.id, file.storage.iri, destStorage.id, r)
}
res <- eval(CreateFile(iri, dest.project, destStorageRef, destStorage.tpe, attributes, c.subject, dest.tag))
} yield res
}.span("copyFile")

private def fetchSourceFile(id: FileId)(implicit c: Caller) =
for {
file <- fetch(id)
sourceStorage <- storages.fetch(file.value.storage, id.project)
_ <- validateAuth(id.project, sourceStorage.value.storageValue.readPermission)
} yield file.value

private def fetchDestinationStorage(dest: CopyFileDestination)(implicit c: Caller) =
for {
pc <- fetchContext.onCreate(dest.project)
(destStorageRef, destStorage) <- fetchActiveStorage(dest.storage, dest.project, pc)
} yield (pc, destStorageRef, destStorage)

private def validateStorageTypeForCopy(source: StorageType, destination: Storage): IO[Unit] =
IO.raiseWhen(source == StorageType.S3Storage)(
WrappedStorageRejection(
InvalidStorageType(destination.id, source, Set(StorageType.DiskStorage, StorageType.RemoteDiskStorage))
)
) >>
IO.raiseUnless(source == destination.tpe)(
WrappedStorageRejection(DifferentStorageType(destination.id, found = destination.tpe, expected = source))
)

/**
* Update an existing file
*
Expand Down Expand Up @@ -456,20 +507,31 @@ final class Files(

private def extractFileAttributes(iri: Iri, entity: HttpEntity, storage: Storage): IO[FileAttributes] =
for {
storageAvailableSpace <- storage.storageValue.capacity.fold(IO.none[Long]) { capacity =>
storagesStatistics
.get(storage.id, storage.project)
.redeem(
_ => Some(capacity),
stat => Some(capacity - stat.spaceUsed)
)
}
(description, source) <- formDataExtractor(iri, entity, storage.storageValue.maxFileSize, storageAvailableSpace)
attributes <- SaveFile(storage, remoteDiskStorageClient, config)
.apply(description, source)
.adaptError { case e: SaveFileRejection => SaveRejection(iri, storage.id, e) }
(description, source) <- extractFormData(iri, storage, entity)
attributes <- saveFile(iri, storage, description, source)
} yield attributes

private def extractFormData(iri: Iri, storage: Storage, entity: HttpEntity): IO[(FileDescription, BodyPartEntity)] =
for {
storageAvailableSpace <- fetchStorageAvailableSpace(storage)
(description, source) <- formDataExtractor(iri, entity, storage.storageValue.maxFileSize, storageAvailableSpace)
} yield (description, source)

private def saveFile(iri: Iri, storage: Storage, description: FileDescription, source: BodyPartEntity) =
SaveFile(storage, remoteDiskStorageClient, config)
.apply(description, source)
.adaptError { case e: SaveFileRejection => SaveRejection(iri, storage.id, e) }

private def fetchStorageAvailableSpace(storage: Storage): IO[Option[Long]] =
storage.storageValue.capacity.fold(IO.none[Long]) { capacity =>
storagesStatistics
.get(storage.id, storage.project)
.redeem(
_ => Some(capacity),
stat => Some(capacity - stat.spaceUsed)
)
}

private def expandStorageIri(segment: IdSegment, pc: ProjectContext): IO[Iri] =
Storages.expandIri(segment, pc).adaptError { case s: StorageRejection =>
WrappedStorageRejection(s)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model

import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag

/**
* Details of the file we're creating in the copy
*
* @param project
* Orgnization and project for the new file
* @param fileId
* Optional identifier for the new file
* @param storage
* Optional storage for the new file which must have the same type as the source file's storage
* @param tag
* Optional tag to create the new file with
* @param filename
* Optional filename for the new file. If omitted, the source filename will be used
*/
final case class CopyFileDestination(
project: ProjectRef,
fileId: Option[IdSegment],
storage: Option[IdSegment],
tag: Option[UserTag],
filename: Option[String]
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.annotation.nowarn
* @param mediaType
* the optional media type of the file
* @param bytes
* the size of the file file in bytes
* the size of the file in bytes
* @param digest
* the digest information of the file
* @param origin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ object FileId {
def apply(ref: ResourceRef, project: ProjectRef): FileId = FileId(IdSegmentRef(ref), project)
def apply(id: IdSegment, tag: UserTag, project: ProjectRef): FileId = FileId(IdSegmentRef(id, tag), project)
def apply(id: IdSegment, rev: Int, project: ProjectRef): FileId = FileId(IdSegmentRef(id, rev), project)
def apply(id: IdSegment, project: ProjectRef): FileId = FileId(IdSegmentRef(id), project)

val iriExpander: ExpandIri[InvalidFileId] = new ExpandIri(InvalidFileId.apply)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.HttpResponseFields
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfRejectionHandler.all._
import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.syntax.httpResponseFieldsSyntax
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
Expand Down Expand Up @@ -147,6 +148,12 @@ object FileRejection {
s"Linking a file '$id' cannot be performed without a 'filename' or a 'path' that does not end with a filename."
)

/**
* Rejection returned when attempting to fetch a file and including both the target tag and revision.
*/
final case class InvalidFileLookup(id: IdSegment)
extends FileRejection(s"Only one of 'tag' and 'rev' can be used to lookup file '$id'.")

/**
* Rejection returned when attempting to create/update a file with a Multipart/Form-Data payload that does not
* contain a ''file'' fieldName
Expand Down Expand Up @@ -235,6 +242,19 @@ object FileRejection {
final case class LinkRejection(id: Iri, storageId: Iri, rejection: StorageFileRejection)
extends FileRejection(s"File '$id' could not be linked using storage '$storageId'", Some(rejection.loggedDetails))

/**
* Rejection returned when interacting with the storage operations bundle to copy a file already in storage
*/
final case class CopyRejection(
sourceId: Iri,
sourceStorageId: Iri,
destStorageId: Iri,
rejection: StorageFileRejection
) extends FileRejection(
s"File '$sourceId' could not be copied from storage '$sourceStorageId' to storage '$destStorageId'",
Some(rejection.loggedDetails)
)

/**
* Signals a rejection caused when interacting with other APIs when fetching a resource
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes

import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileId
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection.InvalidFileLookup
import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import io.circe.Decoder

final case class CopyFilePayload(
destFilename: Option[String],
sourceProj: ProjectRef,
sourceFile: IdSegment,
sourceTag: Option[UserTag],
sourceRev: Option[Int]
) {
def toSourceFileId: Either[InvalidFileLookup, FileId] = (sourceTag, sourceRev) match {
case (Some(tag), None) => Right(FileId(sourceFile, tag, sourceProj))
case (None, Some(rev)) => Right(FileId(sourceFile, rev, sourceProj))
case (None, None) => Right(FileId(sourceFile, sourceProj))
case (Some(_), Some(_)) => Left(InvalidFileLookup(sourceFile))
}
}

object CopyFilePayload {

implicit val dec: Decoder[CopyFilePayload] = Decoder.instance { cur =>
for {
destFilename <- cur.get[Option[String]]("destinationFilename")
sourceProj <- cur.get[ProjectRef]("sourceProjectRef")
sourceFileId <- cur.get[String]("sourceFileId").map(IdSegment(_))
sourceTag <- cur.get[Option[UserTag]]("sourceTag")
sourceRev <- cur.get[Option[Int]]("sourceRev")
} yield CopyFilePayload(destFilename, sourceProj, sourceFileId, sourceTag, sourceRev)
}
}
Loading

0 comments on commit ddd0c99

Please sign in to comment.