From c53860a9eb4d649f5c2cacfbf7ba9f5aa80b69d1 Mon Sep 17 00:00:00 2001 From: Simon Dumas <simon.dumas@epfl.ch> Date: Mon, 6 May 2024 10:39:06 +0200 Subject: [PATCH 1/2] Add progress logging, allow to skip file events --- .../operations/s3/S3FileOperations.scala | 2 +- ship/src/main/resources/ship-default.conf | 3 +++ .../bluebrain/nexus/ship/EventProcessor.scala | 22 ++++++++++--------- .../nexus/ship/config/InputConfig.scala | 3 ++- .../nexus/ship/files/FileProcessor.scala | 11 +++++++++- .../nexus/ship/resources/SourcePatcher.scala | 2 +- .../ship/config/ShipConfigFixtures.scala | 3 ++- .../ship/resources/SourcePatcherSuite.scala | 2 +- 8 files changed, 32 insertions(+), 16 deletions(-) diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala index 0cb1569a7c..b82df55e64 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala @@ -86,7 +86,7 @@ object S3FileOperations { uuidF: UUIDF ): IO[S3FileMetadata] = { for { - _ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path") + _ <- log.debug(s"Fetching attributes for S3 file. Bucket $bucket at path $path") resp <- client.headObject(bucket, path.toString()) metadata <- mkS3Metadata(client, bucket, path, resp) } yield metadata diff --git a/ship/src/main/resources/ship-default.conf b/ship/src/main/resources/ship-default.conf index b789b1ceb2..15b1b15ef8 100644 --- a/ship/src/main/resources/ship-default.conf +++ b/ship/src/main/resources/ship-default.conf @@ -85,6 +85,9 @@ ship { # If true, no resource validation is performed disable-resource-validation = false + # To skip file events to make the batch run faster and focus on other events + skip-file-events = false + storages { # S3 compatible storage configuration diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala index 0615131d50..fe17486163 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala @@ -37,18 +37,20 @@ object EventProcessor { } eventStream .evalScan(ImportReport.start) { case (report, event) => + val processed = report.progress.foldLeft(0L) { case (acc, (_, stats)) => acc + stats.success + stats.dropped } processorsMap.get(event.`type`) match { case Some(processor) => - processor - .evaluate(event) - .map { status => - report + (event, status) - } - .onError { err => - logger.error(err)( - s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'." - ) - } + IO.whenA(processed % 1000 == 0)(logger.info(s"Current progress is: ${report.progress}")) >> + processor + .evaluate(event) + .map { status => + report + (event, status) + } + .onError { err => + logger.error(err)( + s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'." + ) + } case None => logger.warn(s"No processor is provided for '${event.`type`}', skipping...") >> IO.pure(report + (event, ImportStatus.Dropped)) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala index d758dc66ef..5035d4a083 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala @@ -22,7 +22,8 @@ final case class InputConfig( storages: StoragesConfig, importBucket: String, targetBucket: String, - disableResourceValidation: Boolean + disableResourceValidation: Boolean, + skipFileEvents: Boolean ) object InputConfig { diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileProcessor.scala index ee982e0586..9472c79ec5 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileProcessor.scala @@ -93,6 +93,14 @@ object FileProcessor { private val logger = Logger[FileProcessor] + private val noop = new EventProcessor[FileEvent] { + override def resourceType: EntityType = Files.entityType + + override def decoder: Decoder[FileEvent] = FileEvent.serializer.codec + + override def evaluate(event: FileEvent): IO[ImportStatus] = IO.pure(ImportStatus.Dropped) + } + def apply( fetchContext: FetchContext, s3Client: S3StorageClient, @@ -101,7 +109,8 @@ object FileProcessor { config: InputConfig, clock: EventClock, xas: Transactors - )(implicit jsonLdApi: JsonLdApi): FileProcessor = { + )(implicit jsonLdApi: JsonLdApi): EventProcessor[FileEvent] = if (config.skipFileEvents) noop + else { val storages = StorageWiring.storages(fetchContext, rcr, config, clock, xas) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/SourcePatcher.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/SourcePatcher.scala index 4a334b215d..79773bfac5 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/SourcePatcher.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/SourcePatcher.scala @@ -22,7 +22,7 @@ object SourcePatcher { // Resources will be keep their ids as we explicitly pass them along the source payload val removeEmptyIds: Json => Json = root.obj.modify { case obj if obj("@id").contains(emptyString) => obj.remove("@id") - case obj => obj + case obj => obj } def apply(fileSelfParser: FileSelf, projectMapper: ProjectMapper, targetBase: BaseUri): SourcePatcher = diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala index 24f3a86f42..1d36dd3947 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala @@ -59,7 +59,8 @@ trait ShipConfigFixtures extends ConfigFixtures with StorageFixtures with Classp StoragesConfig(eventLogConfig, pagination, config.copy(amazon = Some(amazonConfig))), importBucket, targetBucket, - disableResourceValidation = false + disableResourceValidation = false, + skipFileEvents = false ) } diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/resources/SourcePatcherSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/resources/SourcePatcherSuite.scala index 0004d1b001..2775e182b3 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/resources/SourcePatcherSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/resources/SourcePatcherSuite.scala @@ -15,7 +15,7 @@ class SourcePatcherSuite extends NexusSuite { } test("Removing empty ids removes the @id field when it is empty") { - val source = json"""{ "@id": "", "name": "Bob" }""" + val source = json"""{ "@id": "", "name": "Bob" }""" val expected = json"""{ "name": "Bob" }""" assertEquals(SourcePatcher.removeEmptyIds(source), expected) } From 01abdf8d4cbe79f3bbcf92ad85cbbd56fc56a057 Mon Sep 17 00:00:00 2001 From: Simon Dumas <simon.dumas@epfl.ch> Date: Mon, 6 May 2024 10:48:31 +0200 Subject: [PATCH 2/2] Run static analysis for all modules --- build.sbt | 1 - 1 file changed, 1 deletion(-) diff --git a/build.sbt b/build.sbt index bd3330c316..099e4b05a5 100755 --- a/build.sbt +++ b/build.sbt @@ -1085,7 +1085,6 @@ val coreModules = List("kernel", "rdf", "sdk", "sourcingPsql", "testkit") val staticAnalysis = s""" |scalafmtSbtCheck ; - |project delta ; |scalafmtCheck ; |Test/scalafmtCheck ; |scapegoat ;