Skip to content

Commit

Permalink
Optimize backend timeline calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
kunyavskiy committed Aug 3, 2024
1 parent 524b9cb commit 7a03642
Show file tree
Hide file tree
Showing 17 changed files with 211 additions and 242 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.icpclive.api

import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import org.icpclive.cds.api.ProblemId
import org.icpclive.cds.util.serializers.DurationInMillisecondsSerializer
import kotlin.time.Duration

@Serializable
sealed class TimeLineRunInfo {
@Serializable
@SerialName("ICPC")
public data class ICPC(
@Serializable(with = DurationInMillisecondsSerializer::class) val time: Duration,
val problemId: ProblemId,
val isAccepted: Boolean,
val shortName: String) : TimeLineRunInfo()

@Serializable
@SerialName("IOI")
public data class IOI(@Serializable(with = DurationInMillisecondsSerializer::class) val time: Duration, val problemId: ProblemId, val score: Double) : TimeLineRunInfo()

@Serializable
@SerialName("IN_PROGRESS")
public data class InProgress(@Serializable(with = DurationInMillisecondsSerializer::class) val time: Duration, val problemId: ProblemId) : TimeLineRunInfo()
}
1 change: 1 addition & 0 deletions src/backend/src/main/kotlin/org/icpclive/data/DataBus.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ object DataBus {
val mainScreenFlow = CompletableDeferred<Flow<MainScreenEvent>>()
val queueFlow = CompletableDeferred<Flow<QueueEvent>>()
val externalRunsFlow = CompletableDeferred<Flow<Map<RunId, ExternalRunInfo>>>()
val timelineFlow = CompletableDeferred<Flow<Map<TeamId, List<TimeLineRunInfo>>>>()

// flow of run ids that need to be braking news
val queueFeaturedRunsFlow = CompletableDeferred<FlowCollector<FeaturedRunAction>>()
Expand Down
41 changes: 18 additions & 23 deletions src/backend/src/main/kotlin/org/icpclive/overlay/Routing.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ import org.icpclive.data.currentContestInfoFlow
import org.icpclive.util.sendJsonFlow
import kotlin.time.Duration

inline fun <reified T : Any> Route.flowEndpoint(name: String, crossinline dataProvider: suspend () -> Flow<T>) {
webSocket(name) { sendJsonFlow(dataProvider()) }
get(name) { call.respond(dataProvider().first()) }
inline fun <reified T : Any> Route.flowEndpoint(name: String, crossinline dataProvider: suspend (ApplicationCall) -> Flow<T>?) {
webSocket(name) {
val flow = dataProvider(call) ?: return@webSocket
sendJsonFlow(flow)
}
get(name) {
val result = dataProvider(call)?.first() ?: return@get
call.respond(result)
}
}

private inline fun <reified T : Any> Route.setUpScoreboard(crossinline getter: suspend DataBus.(OptimismLevel) -> Flow<T>) {
Expand All @@ -32,29 +38,18 @@ fun Route.configureOverlayRouting() {
flowEndpoint("/mainScreen") { DataBus.mainScreenFlow.await() }
flowEndpoint("/contestInfo") { DataBus.currentContestInfoFlow() }
flowEndpoint("/runs") { DataBus.contestStateFlow.await().map { it.runsAfterEvent.values.sortedBy { it.time } } }
webSocket("/teamRuns/{id}") {
flowEndpoint("/teamRuns/{id}") { call ->
val teamIdStr = call.parameters["id"]
if (teamIdStr.isNullOrBlank()) {
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid team id"))
return@webSocket
call.respond(HttpStatusCode.BadRequest, "Invalid team id")
null
} else {
val teamId = teamIdStr.toTeamId()
DataBus.timelineFlow.await()
.map { it[teamId] }
.distinctUntilChanged { a, b -> a === b }
.map { it ?: emptyList() }
}
val teamId = teamIdStr.toTeamId()
val acceptedProblems = mutableSetOf<ProblemId>()
val allRuns = mutableMapOf<RunId, RunInfo>()
DataBus.contestStateFlow.await().first().runsAfterEvent.values
.filter { teamId == it.teamId && it.time != Duration.ZERO }
.forEach { allRuns[it.id] = it }
sendJsonFlow(DataBus.contestStateFlow.await()
.mapNotNull { (it.lastEvent as? RunUpdate)?.newInfo }
.runningFold(allRuns.toPersistentMap()) { acc, it ->
if (it.teamId == teamId) acc.put(it.id, it) else if (it.id in acc) acc.remove(it.id) else acc
}
.distinctUntilChanged { a, b -> a === b }
.map { runs -> runs.values.sortedBy { it.time } }
.map { runs ->
acceptedProblems.clear()
runs.mapNotNull { info -> TimeLineRunInfo.fromRunInfo(info, acceptedProblems) }
})
}
flowEndpoint("/queue") { DataBus.queueFlow.await() }
flowEndpoint("/statistics") { DataBus.statisticFlow.await() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,6 @@ fun CoroutineScope.launchServices(loader: Flow<ContestUpdate>) {
launchService(ExternalRunsService())
launchService(TeamSpotlightService(teamInteresting = teamInterestingFlow))
launchService(RegularLoggingService())
launchService(TimelineService())
started.update { it - 1 }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.icpclive.service

import kotlinx.collections.immutable.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import org.icpclive.api.TimeLineRunInfo
import org.icpclive.cds.*
import org.icpclive.cds.adapters.*
import org.icpclive.cds.api.*
import org.icpclive.api.TimeLineRunInfo.*
import org.icpclive.cds.scoreboard.*
import org.icpclive.cds.utils.TeamRunsStorage
import org.icpclive.data.DataBus

internal class TimelineService : Service {
override fun CoroutineScope.runOn(flow: Flow<ContestStateWithScoreboard>) {
DataBus.timelineFlow.complete(flow.map { it.state.lastEvent }.timelineFlow().stateIn(this, SharingStarted.Eagerly, emptyMap()))
}

private fun Flow<ContestUpdate>.timelineFlow() = flow {
var rows = persistentMapOf<TeamId, List<TimeLineRunInfo>>()
val runsByTeamId = TeamRunsStorage()
contestState().collect { state ->
for (team in runsByTeamId.applyEvent(state)) {
val newRow = runsByTeamId.getRuns(team).toTimeLine()
val oldRow = rows[team]
if (newRow != oldRow) { // optimization: avoid identity change, if no real change
rows = rows.put(team, newRow)
}
}
emit(rows)
}
}

private fun List<RunInfo>?.toTimeLine() : List<TimeLineRunInfo> {
val acceptedProblems = mutableSetOf<ProblemId>()
return (this ?: emptyList()).mapNotNull {
when (val result = it.result) {
is RunResult.ICPC -> {
if (!acceptedProblems.contains(it.problemId)) {
if (result.verdict.isAccepted) {
acceptedProblems.add(it.problemId)
}
ICPC(it.time, it.problemId, result.verdict.isAccepted, result.verdict.shortName)
} else {
null
}
}

is RunResult.IOI -> {
if (result.difference > 0) {
IOI(it.time, it.problemId, result.scoreAfter)
} else {
null
}
}

is RunResult.InProgress -> {
InProgress(it.time, it.problemId)
}
}
}
}


}
113 changes: 8 additions & 105 deletions src/cds/core/api/core.api
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public final class org/icpclive/cds/adapters/FirstToSolveAdapterKt {
public static final fun addFirstToSolves (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
}

public final class org/icpclive/cds/adapters/HiddenProbblemsAdapterKt {
public final class org/icpclive/cds/adapters/HiddenProblemsAdapterKt {
public static final fun processHiddenProblems (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
}

Expand Down Expand Up @@ -1672,110 +1672,6 @@ public final class org/icpclive/cds/api/TeamMediaType$Companion {
public final fun serializer ()Lkotlinx/serialization/KSerializer;
}

public abstract class org/icpclive/cds/api/TimeLineRunInfo {
public static final field Companion Lorg/icpclive/cds/api/TimeLineRunInfo$Companion;
public synthetic fun <init> (ILkotlinx/serialization/internal/SerializationConstructorMarker;)V
public static final synthetic fun write$Self (Lorg/icpclive/cds/api/TimeLineRunInfo;Lkotlinx/serialization/encoding/CompositeEncoder;Lkotlinx/serialization/descriptors/SerialDescriptor;)V
}

public final class org/icpclive/cds/api/TimeLineRunInfo$Companion {
public final fun fromRunInfo (Lorg/icpclive/cds/api/RunInfo;Ljava/util/Set;)Lorg/icpclive/cds/api/TimeLineRunInfo;
public final fun serializer ()Lkotlinx/serialization/KSerializer;
}

public final class org/icpclive/cds/api/TimeLineRunInfo$ICPC : org/icpclive/cds/api/TimeLineRunInfo {
public static final field Companion Lorg/icpclive/cds/api/TimeLineRunInfo$ICPC$Companion;
public synthetic fun <init> (JLjava/lang/String;ZLjava/lang/String;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1-UwyO8pc ()J
public final fun component2-Xzdl60o ()Ljava/lang/String;
public final fun component3 ()Z
public final fun component4 ()Ljava/lang/String;
public final fun copy-4joZhgo (JLjava/lang/String;ZLjava/lang/String;)Lorg/icpclive/cds/api/TimeLineRunInfo$ICPC;
public static synthetic fun copy-4joZhgo$default (Lorg/icpclive/cds/api/TimeLineRunInfo$ICPC;JLjava/lang/String;ZLjava/lang/String;ILjava/lang/Object;)Lorg/icpclive/cds/api/TimeLineRunInfo$ICPC;
public fun equals (Ljava/lang/Object;)Z
public final fun getProblemId-Xzdl60o ()Ljava/lang/String;
public final fun getShortName ()Ljava/lang/String;
public final fun getTime-UwyO8pc ()J
public fun hashCode ()I
public final fun isAccepted ()Z
public fun toString ()Ljava/lang/String;
}

public synthetic class org/icpclive/cds/api/TimeLineRunInfo$ICPC$$serializer : kotlinx/serialization/internal/GeneratedSerializer {
public static final field INSTANCE Lorg/icpclive/cds/api/TimeLineRunInfo$ICPC$$serializer;
public final fun childSerializers ()[Lkotlinx/serialization/KSerializer;
public synthetic fun deserialize (Lkotlinx/serialization/encoding/Decoder;)Ljava/lang/Object;
public final fun deserialize (Lkotlinx/serialization/encoding/Decoder;)Lorg/icpclive/cds/api/TimeLineRunInfo$ICPC;
public final fun getDescriptor ()Lkotlinx/serialization/descriptors/SerialDescriptor;
public synthetic fun serialize (Lkotlinx/serialization/encoding/Encoder;Ljava/lang/Object;)V
public final fun serialize (Lkotlinx/serialization/encoding/Encoder;Lorg/icpclive/cds/api/TimeLineRunInfo$ICPC;)V
public fun typeParametersSerializers ()[Lkotlinx/serialization/KSerializer;
}

public final class org/icpclive/cds/api/TimeLineRunInfo$ICPC$Companion {
public final fun serializer ()Lkotlinx/serialization/KSerializer;
}

public final class org/icpclive/cds/api/TimeLineRunInfo$IOI : org/icpclive/cds/api/TimeLineRunInfo {
public static final field Companion Lorg/icpclive/cds/api/TimeLineRunInfo$IOI$Companion;
public synthetic fun <init> (JLjava/lang/String;DLkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1-UwyO8pc ()J
public final fun component2-Xzdl60o ()Ljava/lang/String;
public final fun component3 ()D
public final fun copy-nveW3u0 (JLjava/lang/String;D)Lorg/icpclive/cds/api/TimeLineRunInfo$IOI;
public static synthetic fun copy-nveW3u0$default (Lorg/icpclive/cds/api/TimeLineRunInfo$IOI;JLjava/lang/String;DILjava/lang/Object;)Lorg/icpclive/cds/api/TimeLineRunInfo$IOI;
public fun equals (Ljava/lang/Object;)Z
public final fun getProblemId-Xzdl60o ()Ljava/lang/String;
public final fun getScore ()D
public final fun getTime-UwyO8pc ()J
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public synthetic class org/icpclive/cds/api/TimeLineRunInfo$IOI$$serializer : kotlinx/serialization/internal/GeneratedSerializer {
public static final field INSTANCE Lorg/icpclive/cds/api/TimeLineRunInfo$IOI$$serializer;
public final fun childSerializers ()[Lkotlinx/serialization/KSerializer;
public synthetic fun deserialize (Lkotlinx/serialization/encoding/Decoder;)Ljava/lang/Object;
public final fun deserialize (Lkotlinx/serialization/encoding/Decoder;)Lorg/icpclive/cds/api/TimeLineRunInfo$IOI;
public final fun getDescriptor ()Lkotlinx/serialization/descriptors/SerialDescriptor;
public synthetic fun serialize (Lkotlinx/serialization/encoding/Encoder;Ljava/lang/Object;)V
public final fun serialize (Lkotlinx/serialization/encoding/Encoder;Lorg/icpclive/cds/api/TimeLineRunInfo$IOI;)V
public fun typeParametersSerializers ()[Lkotlinx/serialization/KSerializer;
}

public final class org/icpclive/cds/api/TimeLineRunInfo$IOI$Companion {
public final fun serializer ()Lkotlinx/serialization/KSerializer;
}

public final class org/icpclive/cds/api/TimeLineRunInfo$InProgress : org/icpclive/cds/api/TimeLineRunInfo {
public static final field Companion Lorg/icpclive/cds/api/TimeLineRunInfo$InProgress$Companion;
public synthetic fun <init> (JLjava/lang/String;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1-UwyO8pc ()J
public final fun component2-Xzdl60o ()Ljava/lang/String;
public final fun copy-15fPPug (JLjava/lang/String;)Lorg/icpclive/cds/api/TimeLineRunInfo$InProgress;
public static synthetic fun copy-15fPPug$default (Lorg/icpclive/cds/api/TimeLineRunInfo$InProgress;JLjava/lang/String;ILjava/lang/Object;)Lorg/icpclive/cds/api/TimeLineRunInfo$InProgress;
public fun equals (Ljava/lang/Object;)Z
public final fun getProblemId-Xzdl60o ()Ljava/lang/String;
public final fun getTime-UwyO8pc ()J
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public synthetic class org/icpclive/cds/api/TimeLineRunInfo$InProgress$$serializer : kotlinx/serialization/internal/GeneratedSerializer {
public static final field INSTANCE Lorg/icpclive/cds/api/TimeLineRunInfo$InProgress$$serializer;
public final fun childSerializers ()[Lkotlinx/serialization/KSerializer;
public synthetic fun deserialize (Lkotlinx/serialization/encoding/Decoder;)Ljava/lang/Object;
public final fun deserialize (Lkotlinx/serialization/encoding/Decoder;)Lorg/icpclive/cds/api/TimeLineRunInfo$InProgress;
public final fun getDescriptor ()Lkotlinx/serialization/descriptors/SerialDescriptor;
public synthetic fun serialize (Lkotlinx/serialization/encoding/Encoder;Ljava/lang/Object;)V
public final fun serialize (Lkotlinx/serialization/encoding/Encoder;Lorg/icpclive/cds/api/TimeLineRunInfo$InProgress;)V
public fun typeParametersSerializers ()[Lkotlinx/serialization/KSerializer;
}

public final class org/icpclive/cds/api/TimeLineRunInfo$InProgress$Companion {
public final fun serializer ()Lkotlinx/serialization/KSerializer;
}

public abstract class org/icpclive/cds/api/Verdict {
public static final field Companion Lorg/icpclive/cds/api/Verdict$Companion;
public synthetic fun <init> (Ljava/lang/String;ZZLkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down Expand Up @@ -2399,3 +2295,10 @@ public final class org/icpclive/cds/tunning/TeamRegexOverrides$Companion {
public final fun serializer ()Lkotlinx/serialization/KSerializer;
}

public final class org/icpclive/cds/utils/TeamRunsStorage {
public fun <init> ()V
public final fun applyEvent (Lorg/icpclive/cds/api/ContestState;)Ljava/util/List;
public final fun getRuns-nUE0bHc (Ljava/lang/String;)Ljava/util/List;
public final fun updateRun (Lorg/icpclive/cds/api/RunInfo;Lorg/icpclive/cds/api/RunInfo;)Ljava/util/List;
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.icpclive.cds.ContestUpdate
import org.icpclive.cds.InfoUpdate
import org.icpclive.cds.api.*
import org.icpclive.cds.util.getLogger
import org.icpclive.cds.utils.withGroupedRuns

private val logger by getLogger()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.icpclive.cds.ContestUpdate
import org.icpclive.cds.api.*
import org.icpclive.cds.utils.withGroupedRuns

private interface ScoreAccumulator {
fun add(score: RunResult.IOI)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.icpclive.cds.ContestUpdate
import org.icpclive.cds.api.*
import java.rmi.NotBoundException
import org.icpclive.cds.utils.withGroupedRuns

private fun RunInfo.setFTS(value: Boolean) = when (result) {
is RunResult.ICPC -> copy(result = result.copy(isFirstToSolveRun = value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import kotlinx.coroutines.flow.map
import org.icpclive.cds.api.ContestInfo
import org.icpclive.cds.ContestUpdate
import org.icpclive.cds.api.ProblemId
import org.icpclive.cds.utils.withGroupedRuns

public fun Flow<ContestUpdate>.processHiddenProblems(): Flow<ContestUpdate> =
withGroupedRuns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import kotlinx.coroutines.flow.map
import org.icpclive.cds.ContestUpdate
import org.icpclive.cds.InfoUpdate
import org.icpclive.cds.api.*
import org.icpclive.cds.utils.withGroupedRuns

private fun TeamInfo.updateHidden(isHidden: Boolean, isOutOfContest: Boolean) =
if (isHidden != this.isHidden || isOutOfContest != this.isOutOfContest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.transform
import org.icpclive.cds.*
import org.icpclive.cds.api.*
import org.icpclive.cds.utils.withGroupedRuns

private fun RunInfo.shouldDiscloseColor() = (result as? RunResult.ICPC)?.verdict?.isAccepted == true && !isHidden

Expand Down
Loading

0 comments on commit 7a03642

Please sign in to comment.