Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bulk create by copy operation for files #4483

Merged
merged 21 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ lazy val kernel = project
scalaTest % Test
),
addCompilerPlugin(kindProjector),
addCompilerPlugin(betterMonadicFor),
coverageFailOnMinimum := false
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class

make[RemoteContextResolution].named("aggregate").fromEffect { (otherCtxResolutions: Set[RemoteContextResolution]) =>
for {
bulkOpCtx <- ContextValue.fromFile("contexts/bulk-operation.json")
errorCtx <- ContextValue.fromFile("contexts/error.json")
metadataCtx <- ContextValue.fromFile("contexts/metadata.json")
searchCtx <- ContextValue.fromFile("contexts/search.json")
Expand All @@ -96,7 +97,8 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
contexts.remoteContexts -> remoteContextsCtx,
contexts.tags -> tagsCtx,
contexts.version -> versionCtx,
contexts.validation -> validationCtx
contexts.validation -> validationCtx,
contexts.bulkOperation -> bulkOpCtx
)
.merge(otherCtxResolutions.toSeq: _*)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,41 @@
package ch.epfl.bluebrain.nexus.storage.files
package ch.epfl.bluebrain.nexus.delta.kernel.utils

import cats.data.NonEmptyList
import cats.effect.{IO, Ref}
import cats.implicits._
import ch.epfl.bluebrain.nexus.storage.StorageError.CopyOperationFailed
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection
import fs2.io.file.{CopyFlag, CopyFlags, Files, Path}

trait CopyFiles {
def copyValidated(files: NonEmptyList[ValidatedCopyFile]): IO[Unit]
trait TransactionalFileCopier {
def copyAll(files: NonEmptyList[CopyBetween]): IO[Unit]
}

object CopyFiles {
def mk(): CopyFiles = files =>
copyAll(files.map(v => CopyBetween(Path.fromNioPath(v.absSourcePath), Path.fromNioPath(v.absDestPath))))
final case class CopyBetween(source: Path, destination: Path)

def copyAll(files: NonEmptyList[CopyBetween]): IO[Unit] =
final case class CopyOperationFailed(failingCopy: CopyBetween, e: Throwable) extends Rejection {
override def reason: String =
s"Copy operation failed from source ${failingCopy.source} to destination ${failingCopy.destination}. Underlying error: $e"
}

object TransactionalFileCopier {

private val logger = Logger[TransactionalFileCopier]

def mk(): TransactionalFileCopier = files => copyAll(files)

private def copyAll(files: NonEmptyList[CopyBetween]): IO[Unit] =
Ref.of[IO, Option[CopyOperationFailed]](None).flatMap { errorRef =>
files
.parTraverse { case c @ CopyBetween(source, dest) =>
copySingle(source, dest).onError(_ => errorRef.set(Some(CopyOperationFailed(c))))
copySingle(source, dest).onError(e => errorRef.set(Some(CopyOperationFailed(c, e))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the ref get used here rather than modifying the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think I understand the question 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we setting a ref when we could just return the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll refactor this afterwards

}
.void
.handleErrorWith(_ => rollbackCopiesAndRethrow(errorRef, files.map(_.destination)))
.handleErrorWith { e =>
val destinations = files.map(_.destination)
logger.error(e)(s"Transactional files copy failed, deleting created files: ${destinations}") >>
rollbackCopiesAndRethrow(errorRef, destinations)
}
}

def parent(p: Path): Path = Path.fromNioPath(p.toNioPath.getParent)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
package ch.epfl.bluebrain.nexus.storage.files
package ch.epfl.bluebrain.nexus.delta.kernel.utils

import cats.data.NonEmptyList
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.storage.StorageError.CopyOperationFailed
import ch.epfl.bluebrain.nexus.storage.files.CopyFiles.parent
import ch.epfl.bluebrain.nexus.delta.kernel.utils.TransactionalFileCopier.parent
import fs2.io.file.PosixPermission._
import fs2.io.file._
import munit.CatsEffectSuite
import munit.catseffect.IOFixture

import java.util.UUID

class CopyFileSuite extends CatsEffectSuite {
class TransactionalFileCopierSuite extends CatsEffectSuite {

val myFixture: IOFixture[Path] = ResourceSuiteLocalFixture("create-temp-dir-fixture", Files[IO].tempDirectory)
override def munitFixtures: List[IOFixture[Path]] = List(myFixture)
lazy val tempDir: Path = myFixture()

val copier: TransactionalFileCopier = TransactionalFileCopier.mk()

test("successfully copy contents of multiple files") {
for {
(source1, source1Contents) <- givenAFileExists
(source2, source2Contents) <- givenAFileExists
(dest1, dest2, dest3) = (genFilePath, genFilePath, genFilePath)
files = NonEmptyList.of(CopyBetween(source1, dest1), CopyBetween(source2, dest2), CopyBetween(source1, dest3))
_ <- CopyFiles.copyAll(files)
_ <- copier.copyAll(files)
_ <- fileShouldExistWithContents(source1Contents, dest1)
_ <- fileShouldExistWithContents(source1Contents, dest3)
_ <- fileShouldExistWithContents(source2Contents, dest2)
Expand All @@ -37,7 +38,7 @@ class CopyFileSuite extends CatsEffectSuite {
sourceAttr <- Files[IO].getBasicFileAttributes(source)
dest = genFilePath
files = NonEmptyList.of(CopyBetween(source, dest))
_ <- CopyFiles.copyAll(files)
_ <- copier.copyAll(files)
_ <- fileShouldExistWithContentsAndAttributes(dest, contents, sourceAttr)
} yield ()
}
Expand All @@ -49,22 +50,22 @@ class CopyFileSuite extends CatsEffectSuite {
(source, _) <- givenAFileWithPermissions(sourcePermissions)
dest = genFilePath
files = NonEmptyList.of(CopyBetween(source, dest))
_ <- CopyFiles.copyAll(files)
_ <- copier.copyAll(files)
_ <- fileShouldExistWithPermissions(dest, sourcePermissions)
} yield ()
}

test("rollback by deleting file copies and directories if error thrown during a copy") {
for {
(source, _) <- givenAFileExists
(failingDest, _) <- givenAFileExists
(dest1, dest3) = (genFilePath, genFilePath)
failingCopy = CopyBetween(source, failingDest)
files = NonEmptyList.of(CopyBetween(source, dest1), failingCopy, CopyBetween(source, dest3))
error <- CopyFiles.copyAll(files).intercept[CopyOperationFailed]
_ <- List(dest1, dest3, parent(dest1), parent(dest3)).traverse(fileShouldNotExist)
_ <- fileShouldExist(failingDest)
} yield assertEquals(error, CopyOperationFailed(failingCopy))
(source, _) <- givenAFileExists
(existingFilePath, _) <- givenAFileExists
(dest1, dest3) = (genFilePath, genFilePath)
failingCopy = CopyBetween(source, existingFilePath)
files = NonEmptyList.of(CopyBetween(source, dest1), failingCopy, CopyBetween(source, dest3))
error <- copier.copyAll(files).intercept[CopyOperationFailed]
_ <- List(dest1, dest3, parent(dest1), parent(dest3)).traverse(fileShouldNotExist)
_ <- fileShouldExist(existingFilePath)
} yield assertEquals(error.failingCopy, failingCopy)
dantb marked this conversation as resolved.
Show resolved Hide resolved
}

test("rollback read-only files upon failure") {
Expand All @@ -76,10 +77,10 @@ class CopyFileSuite extends CatsEffectSuite {
dest2 = genFilePath
failingCopy = CopyBetween(source, failingDest)
files = NonEmptyList.of(CopyBetween(source, dest2), failingCopy)
error <- CopyFiles.copyAll(files).intercept[CopyOperationFailed]
error <- copier.copyAll(files).intercept[CopyOperationFailed]
_ <- List(dest2, parent(dest2)).traverse(fileShouldNotExist)
_ <- fileShouldExist(failingDest)
} yield assertEquals(error, CopyOperationFailed(failingCopy))
} yield assertEquals(error.failingCopy, failingCopy)
}

def genFilePath: Path = tempDir / genString / s"$genString.txt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveReference.{Fil
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection.{InvalidFileSelf, ResourceNotFound}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.{ArchiveRejection, ArchiveValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.RemoteContextResolutionFixture
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.generators.FileGen
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection.FileNotFound
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileAttributes}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{schemas, FileGen}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.schemas
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StorageFixtures
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.AbsolutePath
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import ch.epfl.bluebrain.nexus.delta.kernel.utils.{StatefulUUIDF, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError.InvalidPath
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection.ProjectContextRejection
import ch.epfl.bluebrain.nexus.delta.plugins.archive.routes.ArchiveRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.generators.FileGen
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection.FileNotFound
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{File, FileAttributes}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutesSpec
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{schemas, FileGen}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.schemas
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StorageFixtures
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage
import akka.actor
import akka.actor.typed.ActorSystem
import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{ClasspathResourceLoader, UUIDF}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{ClasspathResourceLoader, TransactionalFileCopier, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.ElasticSearchViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files.FilesLog
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.batch.{BatchCopy, BatchFiles}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.contexts.{files => fileCtxId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.{BatchFilesRoutes, FilesRoutes}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.schemas.{files => filesSchemaId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.contexts.{storages => storageCtxId, storagesMetadata => storageMetaCtxId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageAccess
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.DiskStorageCopyFiles
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.RemoteDiskStorageCopyFiles
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.routes.StoragesRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.schemas.{storage => storagesSchemaId}
Expand All @@ -40,7 +44,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext.ContextRejection
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import com.typesafe.config.Config
Expand Down Expand Up @@ -147,6 +151,10 @@ class StoragePluginModule(priority: Int) extends ModuleDef {

many[ResourceShift[_, _, _]].ref[Storage.Shift]

make[FilesLog].from { (cfg: StoragePluginConfig, xas: Transactors, clock: Clock[IO]) =>
ScopedEventLog(Files.definition(clock), cfg.files.eventLog, xas)
}

make[Files]
.fromEffect {
(
Expand Down Expand Up @@ -185,6 +193,41 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
}
}

make[TransactionalFileCopier].fromValue(TransactionalFileCopier.mk())

make[DiskStorageCopyFiles].from { copier: TransactionalFileCopier => DiskStorageCopyFiles.mk(copier) }

make[RemoteDiskStorageCopyFiles].from { client: RemoteDiskStorageClient => RemoteDiskStorageCopyFiles.mk(client) }

make[BatchCopy].from {
(
files: Files,
storages: Storages,
aclCheck: AclCheck,
storagesStatistics: StoragesStatistics,
diskCopy: DiskStorageCopyFiles,
remoteDiskCopy: RemoteDiskStorageCopyFiles,
uuidF: UUIDF
) =>
BatchCopy.mk(files, storages, aclCheck, storagesStatistics, diskCopy, remoteDiskCopy)(uuidF)
}

make[BatchFiles].from {
(
fetchContext: FetchContext[ContextRejection],
files: Files,
filesLog: FilesLog,
batchCopy: BatchCopy,
uuidF: UUIDF
) =>
BatchFiles.mk(
files,
fetchContext.mapRejection(FileRejection.ProjectContextRejection),
FilesLog.eval(filesLog),
dantb marked this conversation as resolved.
Show resolved Hide resolved
batchCopy
)(uuidF)
}

make[FilesRoutes].from {
(
cfg: StoragePluginConfig,
Expand All @@ -209,6 +252,28 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
)
}

make[BatchFilesRoutes].from {
(
cfg: StoragePluginConfig,
identities: Identities,
aclCheck: AclCheck,
batchFiles: BatchFiles,
schemeDirectives: DeltaSchemeDirectives,
indexingAction: AggregateIndexingAction,
shift: File.Shift,
baseUri: BaseUri,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
val storageConfig = cfg.storages.storageTypeConfig
new BatchFilesRoutes(identities, aclCheck, batchFiles, schemeDirectives, indexingAction(_, _, _)(shift))(
baseUri,
storageConfig,
cr,
ordering
)
}

make[File.Shift].from { (files: Files, base: BaseUri, storageTypeConfig: StorageTypeConfig) =>
File.shift(files)(base, storageTypeConfig)
}
Expand Down Expand Up @@ -283,4 +348,8 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
many[PriorityRoute].add { (fileRoutes: FilesRoutes) =>
PriorityRoute(priority, fileRoutes.routes, requiresStrictEntity = false)
}

many[PriorityRoute].add { (batchFileRoutes: BatchFilesRoutes) =>
PriorityRoute(priority, batchFileRoutes.routes, requiresStrictEntity = false)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.files

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileId

trait FetchFileResource {

/**
* Fetch the last version of a file
*
* @param id
* the identifier that will be expanded to the Iri of the file with its optional rev/tag
*/
def fetch(id: FileId): IO[FileResource]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.files

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.model._
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectContext
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef}

trait FetchFileStorage {
def fetchAndValidateActiveStorage(storageIdOpt: Option[IdSegment], ref: ProjectRef, pc: ProjectContext)(implicit
caller: Caller
): IO[(ResourceRef.Revision, Storage)]
}
Loading