Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add progress logging, allow to skip file events #4932

Merged
merged 3 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,6 @@ val coreModules = List("kernel", "rdf", "sdk", "sourcingPsql", "testkit")
val staticAnalysis =
s"""
|scalafmtSbtCheck ;
|project delta ;
|scalafmtCheck ;
|Test/scalafmtCheck ;
|scapegoat ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object S3FileOperations {
uuidF: UUIDF
): IO[S3FileMetadata] = {
for {
_ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path")
_ <- log.debug(s"Fetching attributes for S3 file. Bucket $bucket at path $path")
resp <- client.headObject(bucket, path.toString())
metadata <- mkS3Metadata(path, resp)
} yield metadata
Expand Down
3 changes: 3 additions & 0 deletions ship/src/main/resources/ship-default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ ship {
# If true, no resource validation is performed
disable-resource-validation = false

# To skip file events to make the batch run faster and focus on other events
skip-file-events = false

storages {

# S3 compatible storage configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,20 @@ object EventProcessor {
}
eventStream
.evalScan(ImportReport.start) { case (report, event) =>
val processed = report.progress.foldLeft(0L) { case (acc, (_, stats)) => acc + stats.success + stats.dropped }
processorsMap.get(event.`type`) match {
case Some(processor) =>
processor
.evaluate(event)
.map { status =>
report + (event, status)
}
.onError { err =>
logger.error(err)(
s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'."
)
}
IO.whenA(processed % 1000 == 0)(logger.info(s"Current progress is: ${report.progress}")) >>
processor
.evaluate(event)
.map { status =>
report + (event, status)
}
.onError { err =>
logger.error(err)(
s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'."
)
}
case None =>
logger.warn(s"No processor is provided for '${event.`type`}', skipping...") >>
IO.pure(report + (event, ImportStatus.Dropped))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ final case class InputConfig(
storages: StoragesConfig,
importBucket: String,
targetBucket: String,
disableResourceValidation: Boolean
disableResourceValidation: Boolean,
skipFileEvents: Boolean
)

object InputConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ object FileProcessor {

private val logger = Logger[FileProcessor]

private val noop = new EventProcessor[FileEvent] {
override def resourceType: EntityType = Files.entityType

override def decoder: Decoder[FileEvent] = FileEvent.serializer.codec

override def evaluate(event: FileEvent): IO[ImportStatus] = IO.pure(ImportStatus.Dropped)
}

def apply(
fetchContext: FetchContext,
s3Client: S3StorageClient,
Expand All @@ -101,7 +109,8 @@ object FileProcessor {
config: InputConfig,
clock: EventClock,
xas: Transactors
)(implicit jsonLdApi: JsonLdApi): FileProcessor = {
)(implicit jsonLdApi: JsonLdApi): EventProcessor[FileEvent] = if (config.skipFileEvents) noop
else {

val storages = StorageWiring.storages(fetchContext, rcr, config, clock, xas)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ trait ShipConfigFixtures extends ConfigFixtures with StorageFixtures with Classp
StoragesConfig(eventLogConfig, pagination, config.copy(amazon = Some(amazonConfig))),
importBucket,
targetBucket,
disableResourceValidation = false
disableResourceValidation = false,
skipFileEvents = false
)

}