Skip to content

Commit

Permalink
Replaced some StampedLocks with ReentrantReadWriteLocks
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaro committed Jan 4, 2024
1 parent 6456a53 commit 486d66d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 33 deletions.
32 changes: 17 additions & 15 deletions backend/src/main/kotlin/dev/dres/run/RunExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@ package dev.dres.run


import dev.dres.api.rest.types.ViewerInfo
import dev.dres.data.model.config.Config
import dev.dres.data.model.run.*
import dev.dres.data.model.run.interfaces.EvaluationId
import dev.dres.mgmt.cache.CacheManager
import dev.dres.run.validation.interfaces.JudgementValidator
import dev.dres.utilities.extensions.read
import dev.dres.utilities.extensions.write
import io.javalin.websocket.WsContext
import jetbrains.exodus.database.TransientEntityStore
import kotlinx.dnq.query.*
import org.slf4j.LoggerFactory
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.locks.StampedLock
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.write

/**
* The execution environment for [RunManager]s
Expand All @@ -32,7 +30,7 @@ object RunExecutor {
private val executor = Executors.newCachedThreadPool()

/** List of [RunManager] executed by this [RunExecutor]. */
private val runManagers = HashMap<EvaluationId,RunManager>()
private val runManagers = HashMap<EvaluationId, RunManager>()

/** List of [JudgementValidator]s registered with this [RunExecutor]. */
private val judgementValidators = LinkedList<JudgementValidator>()
Expand All @@ -43,11 +41,11 @@ object RunExecutor {
/** List of session IDs that are currently observing an evaluation. */
private val observingClients = HashMap<EvaluationId, MutableSet<ViewerInfo>>()

/** Lock for accessing and changing all data structures related to WebSocket clients. */
private val clientLock = StampedLock()
// /** Lock for accessing and changing all data structures related to WebSocket clients. */
// private val clientLock = StampedLock()

/** Lock for accessing and changing all data structures related to [RunManager]s. */
private val runManagerLock = StampedLock()
private val runManagerLock = ReentrantReadWriteLock()

/** Internal array of [Future]s for cleaning after [RunManager]s. See [RunExecutor.cleanerThread]*/
private val results = HashMap<Future<*>, EvaluationId>()
Expand All @@ -58,7 +56,7 @@ object RunExecutor {
*/
fun init(store: TransientEntityStore) {
store.transactional {
DbEvaluation.filter { (it.ended eq null) }.asSequence().forEach {evaluation ->
DbEvaluation.filter { (it.ended eq null) }.asSequence().forEach { evaluation ->
try {
this.schedule(evaluation.toRunManager(store)) /* Re-schedule evaluations. */
} catch (e: IllegalStateException) {
Expand Down Expand Up @@ -89,26 +87,30 @@ object RunExecutor {
private val cleanerThread = Thread {
while (!this@RunExecutor.executor.isShutdown) {
var stamp = this@RunExecutor.runManagerLock.readLock()
try {
this@RunExecutor.runManagerLock.read {
//try {
this@RunExecutor.results.entries.removeIf { entry ->
val k = entry.key
val v = entry.value
if (k.isDone || k.isCancelled) {
logger.info("RunManager $v (done = ${k.isDone}, cancelled = ${k.isCancelled}) will be removed!")
stamp = this@RunExecutor.runManagerLock.tryConvertToWriteLock(stamp)
if (stamp > -1L) {
// stamp = [email protected](stamp)
// if (stamp > -1L) {
this@RunExecutor.runManagerLock.write {
/* Cleanup. */
this@RunExecutor.runManagers.remove(v)
this@RunExecutor.observingClients.remove(v)

}
true
} else {
false
}
}
} finally {
this@RunExecutor.runManagerLock.unlock(stamp)
}
// } finally {
// [email protected](stamp)
// }
Thread.sleep(500)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package dev.dres.run.eventstream

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import dev.dres.DRES
import dev.dres.utilities.extensions.read
import dev.dres.utilities.extensions.write
import org.slf4j.LoggerFactory
import java.io.PrintWriter
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.StampedLock
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.thread
import kotlin.concurrent.write

object EventStreamProcessor {

Expand All @@ -24,7 +24,7 @@ object EventStreamProcessor {

private val eventQueue = LinkedBlockingQueue<StreamEvent>()
private val eventHandlers = mutableListOf<StreamEventHandler>()
private val handlerLock = StampedLock()
private val handlerLock = ReentrantReadWriteLock()
private val eventSink = PrintWriter(
DRES.CONFIG.eventsLocation.resolve("${System.currentTimeMillis()}.txt").toFile()
.also { it.parentFile.mkdirs() })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ import dev.dres.utilities.extensions.convertWriteLock
import dev.dres.utilities.extensions.write
import jetbrains.exodus.kotlin.synchronized
import java.util.*
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.concurrent.locks.StampedLock
import kotlin.collections.HashMap
import kotlin.concurrent.read
import kotlin.concurrent.write

/**
* A [TaskScorer] that caches intermediate results for performance gains.
*
* @author Luca Rossetto
* @version 1.0.0
*/
class CachingTaskScorer(private val wrapped: TaskScorer): TaskScorer {
class CachingTaskScorer(private val wrapped: TaskScorer) : TaskScorer {

/** The [Scoreable] held by the wrapped [TaskScorer]. */
override val scoreable: Scoreable
Expand All @@ -30,25 +33,23 @@ class CachingTaskScorer(private val wrapped: TaskScorer): TaskScorer {
private var dirty = true

/** */
private val lock = StampedLock()
private val lock = ReentrantReadWriteLock()

/**
* Returns a map of [TeamId] to score as generated by this [TaskScorer]. Updates the local cache as a side-effect.
*
* @return A [Map] of [TeamId] to score value
*/
override fun scoreMap(): Map<TeamId, Double> {
var stamp = this.lock.readLock()
try {
if (this.dirty) {
stamp = this.lock.convertWriteLock(stamp)
override fun scoreMap(): Map<TeamId, Double> = this.lock.read {

if (this.dirty) {
this.lock.write {
this.latest.putAll(this.wrapped.scoreMap())
this.dirty = false
}
return this.latest
} finally {
this.lock.unlock(stamp)
}
return this.latest

}

/**
Expand Down
8 changes: 4 additions & 4 deletions backend/src/main/kotlin/dev/dres/utilities/ReadyLatch.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package dev.dres.utilities

import dev.dres.utilities.extensions.read
import dev.dres.utilities.extensions.write
import it.unimi.dsi.fastutil.objects.Object2BooleanOpenHashMap
import java.util.HashMap
import java.util.concurrent.locks.StampedLock
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.write

/**
* A simple latch that tracks for all object it contains whether they are ready (true) or not (false).
Expand All @@ -18,7 +18,7 @@ class ReadyLatch<T> {
private val map = Object2BooleanOpenHashMap<T>()

/** Internal lock to mediate access to map. */
private val lock = StampedLock()
private val lock = ReentrantReadWriteLock()

private var timeout: Long? = null

Expand Down

0 comments on commit 486d66d

Please sign in to comment.