Skip to content

Commit

Permalink
[TRELLO-2155] Add table to track scheduled jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
charlescd committed Jan 11, 2024
1 parent 3914d2d commit 7109ed6
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 81 deletions.
14 changes: 7 additions & 7 deletions app/loader/SignalConsoApplicationLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ import repositories.socialnetwork.SocialNetworkRepository
import repositories.socialnetwork.SocialNetworkRepositoryInterface
import repositories.subscription.SubscriptionRepository
import repositories.subscription.SubscriptionRepositoryInterface
import repositories.tasklock.TaskLockRepository
import repositories.tasklock.TaskRepository
import repositories.user.UserRepository
import repositories.user.UserRepositoryInterface
import repositories.usersettings.UserReportsFiltersRepository
Expand Down Expand Up @@ -180,7 +180,7 @@ class SignalConsoComponents(

val dbConfig: DatabaseConfig[JdbcProfile] = slickApi.dbConfig[JdbcProfile](DbName("default"))

val taskLockRepository = new TaskLockRepository(dbConfig)
val taskRepository = new TaskRepository(dbConfig)
val blacklistedEmailsRepository: BlacklistedEmailsRepositoryInterface = new BlacklistedEmailsRepository(dbConfig)
val companyAccessRepository: CompanyAccessRepositoryInterface = new CompanyAccessRepository(dbConfig)
val accessTokenRepository: AccessTokenRepositoryInterface =
Expand Down Expand Up @@ -444,7 +444,7 @@ class SignalConsoComponents(
companyRepository,
mailService,
taskConfiguration,
taskLockRepository,
taskRepository,
messagesApi
)
reportClosureTask.schedule()
Expand All @@ -456,7 +456,7 @@ class SignalConsoComponents(
mailService,
companiesVisibilityOrchestrator,
taskConfiguration,
taskLockRepository
taskRepository
)
reportReminderTask.schedule()

Expand All @@ -471,7 +471,7 @@ class SignalConsoComponents(
companySyncService,
companySyncRepository,
taskConfiguration,
taskLockRepository
taskRepository
)
companyUpdateTask.schedule()

Expand All @@ -485,7 +485,7 @@ class SignalConsoComponents(
userRepository,
mailService,
taskConfiguration,
taskLockRepository
taskRepository
)
reportNotificationTask.schedule()

Expand All @@ -503,7 +503,7 @@ class SignalConsoComponents(
inactiveDgccrfAccountRemoveTask,
inactiveDgccrfAccountReminderTask,
applicationConfiguration.task,
taskLockRepository
taskRepository
)
inactiveAccountTask.schedule()

Expand Down
31 changes: 31 additions & 0 deletions app/repositories/tasklock/TaskDetails.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package repositories.tasklock

import play.api.libs.json.Format
import play.api.libs.json.JsResult
import play.api.libs.json.JsValue
import play.api.libs.json.Json
import play.api.libs.json.OWrites
import play.api.libs.json.Reads._
import play.api.libs.json.Writes._
import java.time.LocalTime
import java.time.OffsetDateTime
import scala.concurrent.duration._

case class TaskDetails(
id: Int,
name: String,
startTime: LocalTime,
interval: FiniteDuration,
lastRunDate: OffsetDateTime,
lastRunStatus: String
)

object TaskDetails {
implicit private val finiteDurationFormat: Format[FiniteDuration] = new Format[FiniteDuration] {
def reads(json: JsValue): JsResult[FiniteDuration] = LongReads.reads(json).map(_.milliseconds)

def writes(o: FiniteDuration): JsValue = LongWrites.writes(o.toMillis)
}

implicit val writes: OWrites[TaskDetails] = Json.writes[TaskDetails]
}
39 changes: 39 additions & 0 deletions app/repositories/tasklock/TaskDetailsTable.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package repositories.tasklock

import repositories.PostgresProfile.api._
import repositories.TypedDatabaseTable
import slick.lifted.ProvenShape

import java.time.Duration
import java.time.LocalTime
import java.time.OffsetDateTime
import scala.jdk.DurationConverters._

class TaskDetailsTable(tag: Tag) extends TypedDatabaseTable[TaskDetails, Int](tag, "task_details") {

def name = column[String]("name")
def startTime = column[LocalTime]("start_time")
def interval = column[Duration]("interval")
def lastRunDate = column[OffsetDateTime]("last_run_date")
def lastRunStatus = column[String]("last_run_status")

type TaskData = (Int, String, LocalTime, Duration, OffsetDateTime, String)

def constructTaskDetails: TaskData => TaskDetails = {
case (id, name, startTime, interval, lastRunDate, lastRunStatus) =>
TaskDetails(id, name, startTime, interval.toScala, lastRunDate, lastRunStatus)
}

def extractTaskDetails: PartialFunction[TaskDetails, TaskData] = {
case TaskDetails(id, name, startTime, interval, lastRunDate, lastRunStatus) =>
(id, name, startTime, interval.toJava, lastRunDate, lastRunStatus)
}

override def * : ProvenShape[TaskDetails] =
(id, name, startTime, interval, lastRunDate, lastRunStatus) <> (constructTaskDetails, extractTaskDetails.lift)

}

object TaskDetailsTable {
val table = TableQuery[TaskDetailsTable]
}
26 changes: 0 additions & 26 deletions app/repositories/tasklock/TaskLockRepository.scala

This file was deleted.

34 changes: 34 additions & 0 deletions app/repositories/tasklock/TaskRepository.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package repositories.tasklock

import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile
import repositories.PostgresProfile.api._
import repositories.TypedCRUDRepository
import repositories.TypedCRUDRepositoryInterface

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

trait TaskRepositoryInterface extends TypedCRUDRepositoryInterface[TaskDetails, Int] {
def acquireLock(id: Int): Future[Boolean]
def releaseLock(id: Int): Future[Boolean]
}

class TaskRepository(override val dbConfig: DatabaseConfig[JdbcProfile])(implicit override val ec: ExecutionContext)
extends TypedCRUDRepository[TaskDetailsTable, TaskDetails, Int]
with TaskRepositoryInterface {

import dbConfig._

override val table = TaskDetailsTable.table

def acquireLock(id: Int): Future[Boolean] =
db.run(
sql"""select pg_try_advisory_lock($id)""".as[Boolean].head
)

def releaseLock(id: Int): Future[Boolean] =
db.run(
sql"""select pg_advisory_unlock($id)""".as[Boolean].head
)
}
24 changes: 13 additions & 11 deletions app/repositories/website/WebsiteRepository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,18 @@ class WebsiteRepository(
)

def deprecatedSearchCompaniesByHost(host: String): Future[Seq[(Website, Company)]] =
URL(host).getHost.map { h =>
db.run(
table
.filter(_.host === h)
.filter(_.identificationStatus inSet List(IdentificationStatus.Identified))
.join(CompanyTable.table)
.on(_.companyId === _.id)
.result
)
} getOrElse (Future(Nil))
URL(host).getHost
.map { h =>
db.run(
table
.filter(_.host === h)
.filter(_.identificationStatus inSet List(IdentificationStatus.Identified))
.join(CompanyTable.table)
.on(_.companyId === _.id)
.result
)
}
.getOrElse(Future(Nil))

override def removeOtherNonIdentifiedWebsitesWithSameHost(website: Website): Future[Int] =
db.run(
Expand All @@ -115,7 +117,7 @@ class WebsiteRepository(
override def searchCompaniesByUrl(
url: String
): Future[Seq[(Website, Company)]] =
URL(url).getHost.map(searchCompaniesByHost(_)).getOrElse(Future(Nil))
URL(url).getHost.map(searchCompaniesByHost).getOrElse(Future(Nil))

override def listWebsitesCompaniesByReportCount(
maybeHost: Option[String],
Expand Down
29 changes: 23 additions & 6 deletions app/tasks/ScheduledTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package tasks
import akka.actor.ActorSystem
import config.TaskConfiguration
import play.api.Logger
import repositories.tasklock.TaskLockRepositoryInterface
import repositories.tasklock.TaskRepositoryInterface
import repositories.tasklock.TaskDetails
import utils.Logs.RichLogger

import java.time.LocalTime
import java.time.OffsetDateTime
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContext
Expand All @@ -17,7 +19,7 @@ import scala.util.Success
abstract class ScheduledTask(
taskId: Int,
taskName: String,
taskLockRepository: TaskLockRepositoryInterface,
taskRepository: TaskRepositoryInterface,
actorSystem: ActorSystem,
taskConfiguration: TaskConfiguration
)(implicit ec: ExecutionContext) {
Expand All @@ -26,17 +28,32 @@ abstract class ScheduledTask(
val startTime: LocalTime
val interval: FiniteDuration

private def createTaskModel(status: String) =
TaskDetails(taskId, taskName, startTime, interval, lastRunDate = OffsetDateTime.now(), status)

private def createOrUpdateTaskDetails(taskModel: TaskDetails) =
taskRepository
.createOrUpdate(taskModel)
.map(_ => logger.info(s"$taskName updated in DB"))
.recover(err => logger.errorWithTitle("task_failed", s"$taskName failed", err))

def runTask(): Future[Unit]

private def runTaskWithLock(): Unit =
(for {
lockAcquired <- taskLockRepository.acquire(taskId)
lockAcquired <- taskRepository.acquireLock(taskId)
_ <-
if (lockAcquired) {
logger.info(s"Lock acquired for $taskName.")
runTask()
.map(_ => logger.info(s"$taskName finished"))
.recover(err => logger.errorWithTitle("task_failed", s"$taskName failed", err))
.flatMap { _ =>
logger.info(s"$taskName finished")
createOrUpdateTaskDetails(createTaskModel("success"))
}
.recoverWith { err =>
logger.errorWithTitle("task_failed", s"$taskName failed", err)
createOrUpdateTaskDetails(createTaskModel("failure"))
}
} else {
logger.info(s"Lock for $taskName is already taken by another instance. Nothing to do here.")
Future.unit
Expand All @@ -46,7 +63,7 @@ abstract class ScheduledTask(
private def release(): Unit =
actorSystem.scheduler.scheduleOnce(1.minute) {
logger.debug(s"Releasing lock for $taskName with id $taskId")
taskLockRepository.release(taskId).onComplete {
taskRepository.releaseLock(taskId).onComplete {
case Success(_) =>
logger.debug(s"Lock released for $taskName with id $taskId")
case Failure(err) =>
Expand Down
4 changes: 2 additions & 2 deletions app/tasks/account/InactiveAccountTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package tasks.account
import akka.actor.ActorSystem
import config.TaskConfiguration
import play.api.Logger
import repositories.tasklock.TaskLockRepositoryInterface
import repositories.tasklock.TaskRepositoryInterface
import tasks.ScheduledTask

import java.time.LocalTime
Expand All @@ -19,7 +19,7 @@ class InactiveAccountTask(
inactiveDgccrfAccountRemoveTask: InactiveDgccrfAccountRemoveTask,
inactiveDgccrfAccountSendReminderTask: InactiveDgccrfAccountReminderTask,
taskConfiguration: TaskConfiguration,
taskLockRepository: TaskLockRepositoryInterface
taskLockRepository: TaskRepositoryInterface
)(implicit executionContext: ExecutionContext)
extends ScheduledTask(1, "inactive_account_task", taskLockRepository, actorSystem, taskConfiguration) {

Expand Down
4 changes: 2 additions & 2 deletions app/tasks/company/CompanyUpdateTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import play.api.Logger
import repositories.company.CompanyRepositoryInterface
import repositories.company.CompanySyncRepositoryInterface
import repositories.company.CompanyTable
import repositories.tasklock.TaskLockRepositoryInterface
import repositories.tasklock.TaskRepositoryInterface
import tasks.ScheduledTask

import java.time.LocalTime
Expand All @@ -29,7 +29,7 @@ class CompanyUpdateTask(
companySyncService: CompanySyncServiceInterface,
companySyncRepository: CompanySyncRepositoryInterface,
taskConfiguration: TaskConfiguration,
taskLockRepository: TaskLockRepositoryInterface
taskLockRepository: TaskRepositoryInterface
)(implicit
executionContext: ExecutionContext,
materializer: Materializer
Expand Down
4 changes: 2 additions & 2 deletions app/tasks/report/ReportClosureTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import play.api.i18n.MessagesApi
import repositories.company.CompanyRepositoryInterface
import repositories.event.EventRepositoryInterface
import repositories.report.ReportRepositoryInterface
import repositories.tasklock.TaskLockRepositoryInterface
import repositories.tasklock.TaskRepositoryInterface
import services.Email.ConsumerReportClosedNoAction
import services.Email.ConsumerReportClosedNoReading
import services.ConsumerEmail
Expand Down Expand Up @@ -42,7 +42,7 @@ class ReportClosureTask(
companyRepository: CompanyRepositoryInterface,
mailService: MailService,
taskConfiguration: TaskConfiguration,
taskLockRepository: TaskLockRepositoryInterface,
taskLockRepository: TaskRepositoryInterface,
messagesApi: MessagesApi
)(implicit val executionContext: ExecutionContext)
extends ScheduledTask(2, "report_closure_task", taskLockRepository, actorSystem, taskConfiguration) {
Expand Down
4 changes: 2 additions & 2 deletions app/tasks/report/ReportNotificationTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import models.report.ReportFilter
import play.api.Logger
import repositories.report.ReportRepositoryInterface
import repositories.subscription.SubscriptionRepositoryInterface
import repositories.tasklock.TaskLockRepositoryInterface
import repositories.tasklock.TaskRepositoryInterface
import repositories.user.UserRepositoryInterface
import services.Email.DgccrfReportNotification
import services.MailService
Expand All @@ -38,7 +38,7 @@ class ReportNotificationTask(
userRepository: UserRepositoryInterface,
mailService: MailService,
taskConfiguration: TaskConfiguration,
taskLockRepository: TaskLockRepositoryInterface
taskLockRepository: TaskRepositoryInterface
)(implicit executionContext: ExecutionContext)
extends ScheduledTask(3, "report_notification_task", taskLockRepository, actorSystem, taskConfiguration) {

Expand Down
4 changes: 2 additions & 2 deletions app/tasks/report/ReportRemindersTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import orchestrators.CompaniesVisibilityOrchestrator
import play.api.Logger
import repositories.event.EventRepositoryInterface
import repositories.report.ReportRepositoryInterface
import repositories.tasklock.TaskLockRepositoryInterface
import repositories.tasklock.TaskRepositoryInterface
import services.Email.ProReportsReadReminder
import services.Email.ProReportsUnreadReminder
import services.Email
Expand All @@ -38,7 +38,7 @@ class ReportRemindersTask(
mailService: MailServiceInterface,
companiesVisibilityOrchestrator: CompaniesVisibilityOrchestrator,
taskConfiguration: TaskConfiguration,
taskLockRepository: TaskLockRepositoryInterface
taskLockRepository: TaskRepositoryInterface
)(implicit val executionContext: ExecutionContext)
extends ScheduledTask(4, "report_reminders_task", taskLockRepository, actorSystem, taskConfiguration) {

Expand Down
Loading

0 comments on commit 7109ed6

Please sign in to comment.