diff --git a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/InternalClock.scala b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/InternalClock.scala index f38171e65..2bd8e99b5 100644 --- a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/InternalClock.scala +++ b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/InternalClock.scala @@ -4,19 +4,54 @@ import java.util.concurrent.Executor import java.util.concurrent.locks.LockSupport import scala.annotation.nowarn +/** Low-resolution clock optimized for frequent access in the scheduler. + * + * While System.currentTimeMillis is accurate, calling it frequently creates significant overhead due to system calls. This clock uses a + * dedicated thread to update a volatile timestamp every millisecond, allowing other threads to read the current time without system calls. + * + * The tradeoff of potentially being off by up to a millisecond is acceptable for scheduler operations like measuring task runtime and + * detecting stalled workers. The performance benefit of avoiding system calls on every time check is substantial when processing thousands + * of tasks per second. + * + * The clock self-corrects any drift by measuring the actual elapsed time between updates. + * + * @param executor + * Executor for running the update thread + */ final case class InternalClock(executor: Executor) { @volatile private var _stop = false - val a1, a2, a3, a4, a5, a6, a7 = 0L // padding + // padding to avoid false sharing + val a1, a2, a3, a4, a5, a6, a7 = 0L @volatile private var millis = System.currentTimeMillis() - val b1, b2, b3, b4, b5, b6, b7 = 0L // padding + // padding to avoid false sharing + val b1, b2, b3, b4, b5, b6, b7 = 0L private var start = System.nanoTime() + /** Get the current time in milliseconds without making a system call. + * + * This method is designed for frequent calls, returning the latest cached timestamp from the update thread. The returned time has + * millisecond resolution but may be up to one millisecond behind the system time. + * + * @return + * Current time in milliseconds since epoch, accurate to within one millisecond + */ + def currentMillis(): Long = millis + + /** Stop the clock's update thread. + * + * After stopping, currentMillis() will return the last updated timestamp. The clock cannot be restarted after stopping - create a new + * instance if needed. + */ + def stop(): Unit = + _stop = true + executor.execute(() => + // update in a separate method to ensure the code is JIT compiled while (!_stop) update() ) @@ -28,11 +63,6 @@ final case class InternalClock(executor: Executor) { LockSupport.parkNanos(1000000L - elapsed) } - def currentMillis(): Long = millis - - def stop(): Unit = - _stop = true - @nowarn("msg=unused") private val gauge = statsScope.scope("clock").gauge("skew")((System.currentTimeMillis() - millis).toDouble) diff --git a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/InternalTimer.scala b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/InternalTimer.scala index 7b870cd97..904729ad7 100644 --- a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/InternalTimer.scala +++ b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/InternalTimer.scala @@ -5,6 +5,11 @@ import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration +/** Scheduler-internal timer abstraction that provides unified scheduling primitives. + * + * Abstracts over the underlying executor to support both testing scenarios (via TestTimer) and production usage (via + * ScheduledExecutorService), while maintaining consistent scheduling semantics throughout the scheduler implementation. + */ abstract private[kyo] class InternalTimer { def schedule(interval: Duration)(f: => Unit): TimerTask def scheduleOnce(delay: Duration)(f: => Unit): TimerTask diff --git a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Queue.scala b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Queue.scala index 03acffa59..4fb9ab451 100644 --- a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Queue.scala +++ b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Queue.scala @@ -5,20 +5,44 @@ import java.util.concurrent.atomic.AtomicBoolean import kyo.* import scala.collection.mutable.PriorityQueue +/** Specialized concurrent priority queue supporting atomic batch operations and work stealing. + * + * This queue implementation extends beyond standard concurrent queue operations to provide atomic batch transfers and priority-based + * ordering. While designed primarily for task scheduling, the core operations are generally applicable to producer-consumer scenarios + * requiring work stealing and load balancing. + * + * The implementation uses explicit locking via AtomicBoolean for thread safety while supporting specialized operations like atomic batch + * transfers. Priority ordering is maintained using the implicit ordering, typically based on runtime or cost metrics in scheduling + * contexts. + * + * The scheduler uses this queue as the foundation for task distribution between workers, leveraging the work stealing and draining + * capabilities for load balancing and worker management. + * + * @tparam A + * The element type, ordered by the implicit Ordering + * @param ord + * Implicit ordering for priority queue behavior + */ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean { private val queue = PriorityQueue[A]() private var items = 0 + /** Tests if queue contains no elements. Provides a non-blocking snapshot of empty state. + */ def isEmpty() = size() == 0 + /** Returns number of elements in queue. Non-blocking operation with memory fence for cross-thread visibility. + */ def size(): Int = { VarHandle.acquireFence() items } + /** Adds element to queue in priority order. Acquires lock for thread-safe insertion. + */ def add(value: A): Unit = { lock() try { @@ -29,6 +53,8 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean { unlock() } + /** Attempts non-blocking element addition. + */ def offer(value: A): Boolean = tryLock() && { try { @@ -39,6 +65,8 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean { unlock() } + /** Retrieves and removes highest priority element. + */ def poll(): A = if (isEmpty()) null.asInstanceOf[A] @@ -55,6 +83,10 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean { unlock() } + /** Atomically exchanges input element for highest priority element. + * + * Returns input if queue is empty, otherwise adds input and returns previous highest priority element as single atomic operation. + */ def addAndPoll(value: A): A = if (isEmpty()) value @@ -71,6 +103,18 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean { unlock() } + /** Atomically transfers elements between queues. + * + * Moves highest priority element plus approximately half of remaining elements to target queue. Operation only succeeds if target is + * empty and both locks can be acquired without blocking. Maintains priority ordering in both queues. + * + * Used for work stealing in scheduling contexts. + * + * @param to + * recipient queue for stolen elements + * @return + * highest priority element that was transferred, or null if transfer failed + */ def stealingBy(to: Queue[A]): A = { var t: A = null.asInstanceOf[A] val _ = @@ -97,6 +141,11 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean { t } + /** Atomically removes and processes all elements. + * + * Locks queue, removes all elements, then applies function to each removed element after lock release. Used for queue shutdown and + * rebalancing. + */ def drain(f: A => Unit): Unit = if (!isEmpty()) { val tasks = { @@ -110,6 +159,16 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean { tasks.foreach(f) } + /** Acquires queue lock using busy-wait spin loop. + * + * Uses busy waiting rather than parking/blocking since: + * - Effective concurrency is bounded by available cores + * - Lock hold times are very short (optimized queue operations) + * - Most contended operations (stealing) fail fast if lock unavailable + * + * The scheduler's worker model ensures the number of threads competing for any given queue is limited, making spinning more efficient + * than thread parking overhead. + */ private def lock(): Unit = while (!compareAndSet(false, true)) {} diff --git a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Scheduler.scala b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Scheduler.scala index 169efd7d1..dce345f9d 100644 --- a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Scheduler.scala +++ b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Scheduler.scala @@ -20,6 +20,73 @@ import scala.annotation.nowarn import scala.concurrent.ExecutionContext import scala.util.control.NonFatal +/** A high-performance task scheduler with adaptive concurrency control and admission regulation. + * + * The scheduler provides a foundation for concurrent task execution with features including dynamic worker pool sizing, admission control + * to prevent overload, work stealing for load balancing, task preemption, and comprehensive performance monitoring. + * + * ==Worker Management== + * + * The scheduler maintains a pool of worker threads that execute tasks. The number of workers adjusts dynamically between configured + * minimum and maximum bounds based on system load and performance metrics. Workers can steal tasks from each other to balance load across + * the pool, ensuring efficient resource utilization. + * + * ==Admission Control== + * + * An admission regulator prevents system overload by selectively rejecting tasks when the system shows signs of congestion. The admission + * rate adjusts automatically based on measured queuing delays, providing natural backpressure that helps maintain system stability under + * varying loads. + * + * ==Concurrency Control== + * + * A concurrency regulator continuously monitors system scheduling efficiency through sophisticated timing measurements. By analyzing + * scheduling delays and system load, it dynamically adjusts the worker pool size to maintain optimal performance. The regulator detects + * both under-utilization and thread interference, scaling the thread count up or down accordingly. + * + * ==Thread Blocking and Adaptive Concurrency== + * + * The scheduler employs a sophisticated approach to handle thread blocking that requires no explicit signaling or special handling from + * tasks. Instead of treating blocking as an exceptional case, the system embraces it as a natural part of task execution through its + * adaptive concurrency mechanism. + * + * At its core, the scheduler uses an ephemeral thread model where workers acquire threads from a pool only when actively processing tasks. + * When a thread becomes blocked, the scheduler detects this through direct thread state inspection and automatically compensates by + * adjusting its concurrency level. This detection-and-response cycle creates a natural overflow capacity that maintains throughput even + * when many threads are blocked. + * + * The design's effectiveness lies in its transparency: tasks can freely perform blocking operations without concerning themselves with + * thread management. When blocking occurs, the concurrency regulator observes increased scheduling delays and responds by expanding the + * worker pool if possible and necessary. As blocked threads resume, the improved scheduling efficiency triggers a gradual reduction in + * worker count. + * + * ==Loom Integration== + * + * The scheduler seamlessly integrates with Java's Project Loom virtual threads when available, providing enhanced scalability for + * I/O-bound workloads. To enable virtual threads, add the JVM argument '--add-opens=java.base/java.lang=ALL-UNNAMED' and set + * '-Dkyo.scheduler.virtualizeWorkers=true'. The scheduler transparently manages virtual thread creation and scheduling through the worker + * executor. + * + * ==Monitoring== + * + * Comprehensive metrics about scheduler operation are exposed through JMX and optional console reporting, providing detailed insight into + * worker utilization, task execution, regulation decisions, and system load. + * + * @param workerExecutor + * Executor for running worker threads + * @param clockExecutor + * Executor for running the internal clock + * @param timerExecutor + * Executor for running scheduled operations + * @param config + * Configuration parameters controlling scheduler behavior + * + * @see + * Worker for details on task execution and work stealing + * @see + * Admission for admission control implementation + * @see + * Concurrency for concurrency regulation details + */ final class Scheduler( workerExecutor: Executor = Scheduler.defaultWorkerExecutor, clockExecutor: Executor = Scheduler.defaultClockExecutor, @@ -49,24 +116,119 @@ final class Scheduler( private val top = new Reporter(status, enableTopJMX, enableTopConsoleMs, timer) + /** Schedules a task for execution by the scheduler. + * + * The scheduler will assign the task to an available worker based on current load and system conditions. Tasks are executed according + * to their priority ordering and may be preempted if they exceed their time slice. + * + * @param task + * The task to schedule for execution + */ def schedule(task: Task): Unit = schedule(task, null) + /** Tests if a new task should be rejected based on current system conditions. + * + * The scheduler uses admission control to prevent system overload by selectively rejecting tasks when detecting signs of congestion. + * This method provides probabilistic load shedding using random sampling, making it suitable for one-off tasks where consistent + * admission decisions aren't required. + * + * This approach works well for: + * - One-off tasks with no related operations + * - Tasks where consistent rejection isn't critical + * - High-volume scenarios where perfect distribution isn't necessary + * - Cases where no natural key exists for the task + * + * For tasks requiring consistent admission decisions (e.g., related operations that should be handled similarly), prefer using + * reject(key) or reject(string) instead. + * + * @return + * true if the task should be rejected, false if it can be accepted + */ def reject(): Boolean = admissionRegulator.reject() + /** Tests if a task with the given string key should be rejected based on current system conditions. + * + * This method provides consistent admission decisions by using the string's hash as a sampling key. This ensures that identical + * strings will receive the same admission decision at any given admission percentage, creating stable and predictable load shedding + * patterns. + * + * This consistency is particularly valuable for: + * - User IDs or session identifiers to maintain consistent user experience + * - Transaction or operation IDs for related task sets + * - Service names or endpoints for targeted load shedding + * - Any scenario requiring deterministic admission control + * + * The string-based rejection provides several benefits: + * - Related requests from the same user/session get uniform treatment + * - Retries of rejected tasks won't add load since they'll stay rejected + * - System stabilizes with a consistent subset of flowing traffic + * - Natural backpressure mechanism for distributed systems + * + * @param key + * String to use for admission decision + * @return + * true if the task should be rejected, false if it can be accepted + */ def reject(key: String): Boolean = admissionRegulator.reject(key) + /** Tests if a task with the given integer key should be rejected based on current system conditions. + * + * This method provides consistent admission decisions by using the integer directly as a sampling key. It guarantees that identical + * integers will receive the same admission decision at any given admission percentage, implemented through efficient modulo + * operations. + * + * This method is particularly useful for: + * - Numeric identifiers like user IDs or request sequence numbers + * - Hash values from other sources + * - Cases where the caller has already computed a suitable numeric key + * - Performance-critical scenarios needing minimal overhead + * + * The integer-based rejection maintains the same consistency benefits as string-based rejection: + * - Deterministic decisions for identical keys + * - Stable load shedding patterns + * - Efficient handling of related operations + * - Natural queueing behavior for rejected requests + * + * @param key + * Integer to use for admission decision + * @return + * true if the task should be rejected, false if it can be accepted + */ def reject(key: Int): Boolean = admissionRegulator.reject(key) + /** Provides an Executor interface to the scheduler. + * + * Allows using the scheduler as a drop-in replacement for standard Java executors while maintaining all scheduler capabilities like + * admission control and adaptive concurrency. + * + * @return + * An Executor that submits Runnables as scheduler tasks + */ def asExecutor: Executor = (r: Runnable) => schedule(Task(r.run())) + /** Provides a Scala ExecutionContext interface to the scheduler. + * + * Allows using the scheduler as a drop-in replacement for Scala execution contexts while maintaining all scheduler capabilities like + * admission control and adaptive concurrency. + * + * @return + * An ExecutionContext backed by the scheduler + */ def asExecutionContext: ExecutionContext = ExecutionContext.fromExecutor(asExecutor) + /** Schedules a task for execution, optionally specifying the submitting worker. + * + * Implements a work-stealing load balancing strategy: + * - If submitted by a worker, tries to execute on that worker first + * - Otherwise samples a subset of workers to find one with minimal load + * - Falls back to random worker assignment if no suitable worker found + */ private def schedule(task: Task, submitter: Worker): Unit = { val nowMs = clock.currentMillis() var worker: Worker = null @@ -104,6 +266,15 @@ final class Scheduler( worker.enqueue(task) } + /** Attempts to steal a task from another worker with higher load. + * + * Implements work stealing by: + * - Sampling a subset of workers based on stealStride config + * - Finding worker with highest load above threshold + * - Atomically transferring tasks from chosen worker + * + * Returns null if no suitable tasks found to steal. + */ private def steal(thief: Worker): Task = { val currentWorkers = this.currentWorkers var worker: Worker = null @@ -133,6 +304,11 @@ final class Scheduler( null } + /** Forces completion of any pending tasks on the current worker thread. + * + * When called from a worker thread, drains and re-submits all queued tasks before returning. Has no effect when called from non-worker + * threads. + */ def flush() = { val worker = Worker.current() if (worker ne null) { @@ -141,6 +317,14 @@ final class Scheduler( } } + /** Calculates the current average load across all workers. + * + * Load is measured as the number of queued plus executing tasks per worker. This metric is used by the regulators to make admission + * and concurrency decisions. + * + * @return + * Average load per worker between 0.0 and worker queue capacity + */ def loadAvg(): Double = { val currentWorkers = this.currentWorkers @@ -155,6 +339,11 @@ final class Scheduler( sum.toDouble / currentWorkers } + /** Shuts down the scheduler and releases resources. + * + * Stops all internal threads, cancels pending tasks, and cleans up monitoring systems. The scheduler cannot be restarted after + * shutdown. + */ def shutdown(): Unit = { cycleTask.cancel(true) admissionRegulator.stop() @@ -162,11 +351,25 @@ final class Scheduler( top.close() } + /** Updates the number of active workers within configured bounds. + * + * Called by the concurrency regulator to adjust worker count: + * - Increases workers when system is underutilized + * - Decreases workers when detecting scheduling delays + * - Maintains count between minWorkers and maxWorkers + */ private def updateWorkers(delta: Int) = { currentWorkers = Math.max(minWorkers, Math.min(maxWorkers, currentWorkers + delta)) ensureWorkers() } + /** Ensures required number of workers are allocated and initialized. + * + * Creates new workers as needed up to currentWorkers count: + * - Allocates worker instances with unique IDs + * - Configures worker with scheduler callbacks + * - Tracks total allocated workers + */ private def ensureWorkers() = for (idx <- allocatedWorkers until currentWorkers) { workers(idx) = @@ -189,6 +392,15 @@ final class Scheduler( ): Callable[Unit] ) + /** Periodically checks worker health and availability. + * + * Runs on timer to: + * - Detect stalled or blocked workers + * - Clear stale worker states + * - Maintain accurate worker availability status + * + * Critical for work stealing and load balancing decisions. + */ private def cycleWorkers(): Unit = { try { val nowMs = clock.currentMillis() @@ -250,6 +462,57 @@ object Scheduler { val get = new Scheduler() + /** Configuration parameters controlling worker behavior and performance characteristics. + * + * Most applications can use the default worker configuration values, which are tuned for general-purpose workloads. The defaults are + * configured through system properties and provide good performance across a wide range of scenarios without requiring manual tuning. + * + * @param minWorkers + * Minimum number of worker threads that will be maintained even under low load. A lower number conserves resources but may require + * ramp-up time when load increases. (-Dkyo.scheduler.minWorkers=, default: coreWorkers/2) + * + * @param coreWorkers + * Initial worker thread count at scheduler startup. Represents the baseline capacity for handling normal workload. + * (-Dkyo.scheduler.coreWorkers=, default: cores * 10) + * + * @param maxWorkers + * Maximum worker thread count beyond which the scheduler won't allocate new workers. Prevents unconstrained thread growth under + * heavy load. Must be greater than or equal to minWorkers. (-Dkyo.scheduler.maxWorkers=, default: coreWorkers * 100) + * + * @param timeSliceMs + * Maximum duration a task can run before being preempted. Lower values (e.g. 5ms) ensure more frequent task switching and better + * responsiveness. Higher values (e.g. 20ms) reduce context switching overhead but may delay other tasks. + * (-Dkyo.scheduler.timeSliceMs=, default: 10) + * + * @param scheduleStride + * Number of workers to examine when scheduling a new task. Larger values find better scheduling targets but take longer to make + * decisions. (-Dkyo.scheduler.scheduleStride=, default: cores) + * + * @param stealStride + * Number of workers to examine when looking for tasks to steal. Larger values improve load balancing but increase steal overhead. + * (-Dkyo.scheduler.stealStride=, default: cores * 4) + * + * @param cycleIntervalNs + * Interval between worker health checks in nanoseconds. Controls how quickly the scheduler detects and responds to stalled or + * blocked workers. (-Dkyo.scheduler.cycleIntervalNs=, default: 100000) + * + * @param virtualizeWorkers + * When true, uses virtual threads from Project Loom instead of platform threads. Beneficial for workloads with significant I/O or + * blocking operations. Requires JDK 21+ and appropriate JVM flags. (-Dkyo.scheduler.virtualizeWorkers=, default: false) + * + * @param enableTopJMX + * Exposes scheduler metrics through JMX for monitoring. Useful for observing scheduler behavior in production. + * (-Dkyo.scheduler.enableTopJMX=, default: true) + * + * @param enableTopConsoleMs + * Interval in milliseconds for printing scheduler metrics to console. Zero disables console output. Useful for development and + * debugging. (-Dkyo.scheduler.enableTopConsoleMs=, default: 0) + * + * @see + * Worker for worker implementation details + * @see + * Scheduler for how workers are managed + */ case class Config( cores: Int, coreWorkers: Int, diff --git a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Worker.scala b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Worker.scala index 839f60122..99583c97c 100644 --- a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Worker.scala +++ b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Worker.scala @@ -9,6 +9,74 @@ import kyo.scheduler.top.WorkerStatus import scala.annotation.nowarn import scala.util.control.NonFatal +/** A scheduler worker that executes tasks with preemption and work stealing capabilities. + * + * This worker implementation provides core task execution functionality for the scheduler, including: + * - Task queuing and execution + * - Preemptive scheduling with time slicing + * - Work stealing for load balancing + * - Thread state monitoring and stall detection + * - Performance metrics collection + * + * ==Task Execution== + * + * Workers maintain a priority queue of pending tasks and execute them according to their priority ordering. Each task runs until either: + * - It completes naturally + * - It exceeds its time slice and is preempted + * - The worker is instructed to stop + * + * ==Preemption== + * + * The worker monitors task execution time and preempts long-running tasks that exceed the configured time slice. Preempted tasks are + * re-queued to allow other tasks to execute, implementing fair scheduling. This prevents individual tasks from monopolizing the worker + * thread. + * + * ==Work Stealing== + * + * When a worker's queue is empty, it attempts to steal tasks from other workers that have higher load. The stealing mechanism uses atomic + * batch transfers to move multiple tasks at once, maintaining priority ordering while balancing load across workers. This improves overall + * scheduler throughput by keeping workers busy. + * + * ==State Management== + * + * The worker transitions between three states: + * - Idle: No tasks to execute + * - Running: Actively executing tasks + * - Stalled: Detected as blocked or exceeding time slice + * + * Thread state monitoring detects when workers become blocked on I/O or synchronization, allowing the scheduler to compensate by not + * scheduling new tasks to blocked workers. + * + * ==Thread Management== + * + * Workers use an ephemeral thread model where they acquire threads from the executor only when actively processing tasks: + * - Thread is mounted when the worker begins processing tasks + * - Thread is released back to the pool when the worker becomes idle + * - New thread is requested from executor when work arrives for an idle worker + * + * The provided executor may be configured to start new virtual threads instead of platform threads when Project Loom is available. The + * worker treats virtual and platform threads identically, with thread management handled transparently by the provided executor instance. + * + * @param id + * Unique identifier for this worker + * @param exec + * Executor for running the worker thread + * @param scheduleTask + * Function to schedule a task on a specific worker + * @param stealTask + * Function to steal tasks from another worker + * @param clock + * Internal clock for timing operations + * @param timeSliceMs + * Maximum time slice for task execution before preemption + * + * @see + * Queue for details on the underlying task queue implementation + * @see + * Task for the task execution model + * @see + * Scheduler for how workers are managed + */ abstract private class Worker( id: Int, exec: Executor, @@ -46,16 +114,24 @@ abstract private class Worker( private val schedule = scheduleTask(_, this) + /** Adds a task to this worker's queue and ensures the worker is running. Called by the scheduler when assigning new work. + */ def enqueue(task: Task): Unit = { queue.add(task) wakeup() } + /** Transitions the worker from Idle to Running state and requests a new thread from the executor if successful. Used when new work + * arrives for an idle worker. + */ def wakeup() = { if ((state eq State.Idle) && stateHandle.compareAndSet(this, State.Idle, State.Running)) exec.execute(this) } + /** Returns the current task count including both queued tasks and any actively executing task. Used by the scheduler for load balancing + * decisions. + */ def load() = { var load = queue.size() if (currentTask ne null) @@ -63,6 +139,9 @@ abstract private class Worker( load } + /** Attempts to steal tasks from this worker's queue into the thief's queue. Updates metrics to track task movement between workers. + * Returns null if no tasks were stolen. + */ def stealingBy(thief: Worker): Task = { val task = queue.stealingBy(thief.queue) if (task ne null) @@ -70,9 +149,19 @@ abstract private class Worker( task } + /** Transfers all tasks from this worker's queue to the scheduler for reassignment. Called when the worker becomes stalled to allow + * other workers to take over the work. + */ def drain(): Unit = queue.drain(schedule) + /** Checks if this worker can accept new tasks by verifying: + * - Not stalled on a long-running task + * - Not in Stalled state + * - Thread not blocked on I/O or synchronization + * + * If checks fail while Running, transitions to Stalled and drains queue. Used by scheduler to skip workers that can't make progress. + */ def checkAvailability(nowMs: Long): Boolean = { val state = this.state val available = !checkStalling(nowMs) && (state ne State.Stalled) && !isBlocked() @@ -102,39 +191,58 @@ abstract private class Worker( } def run(): Unit = { + // Set up worker state mounts += 1 mount = Thread.currentThread() setCurrent(this) var task: Task = null + while (true) { + // Mark worker as actively running state = State.Running + if (task eq null) + // Try to get a task from our own queue first task = queue.poll() + if (task eq null) { + // If our queue is empty, try to steal work from another worker task = stealTask(this) if (task ne null) + // Track number of stolen tasks including batch size stolenTasks += queue.size() + 1 } + if (task ne null) { + // We have a task to execute executions += 1 if (runTask(task) == Task.Preempted) { + // Task was preempted - add it back to queue and get next task preemptions += 1 task = queue.addAndPoll(task) } else { + // Task completed normally completions += 1 task = null } } else { + // No tasks available - prepare to go idle state = State.Idle if (queue.isEmpty() || !stateHandle.compareAndSet(this, State.Idle, State.Running)) { + // Either queue is empty or another thread changed our state + // Clean up and exit mount = null clearCurrent() return } } + + // Check if we should stop processing tasks if (shouldStop()) { state = State.Idle + // Reschedule current task if we have one if (task ne null) schedule(task) + // Drain remaining tasks from queue drain() return } diff --git a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Admission.scala b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Admission.scala index 4428e83fc..589def83f 100644 --- a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Admission.scala +++ b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Admission.scala @@ -12,6 +12,65 @@ import scala.annotation.nowarn import scala.concurrent.duration.* import scala.util.hashing.MurmurHash3 +/** Admission control regulator that prevents scheduler overload by measuring queuing delays. + * + * The Admission regulator protects the system from overload by monitoring scheduler queuing delays and selectively rejecting tasks when + * delays increase. It maintains an admission percentage that adjusts dynamically based on measured delays. + * + * ==Queuing Delay Measurement== + * + * The regulator probes for queuing delays by periodically submitting special timing tasks into the scheduler and measuring how long they + * wait before execution. A probe task simply measures the time between its creation and execution. High variance or increasing delays in + * these measurements indicate scheduler congestion, triggering reductions in the admission rate to alleviate pressure. + * + * ==Rejection Mechanism== + * + * Tasks are rejected using a deterministic hashing mechanism that provides stable and consistent admission decisions. Each task key + * (string or integer) is hashed to a value between 0-99, and tasks with hash values above the current admission percentage are rejected. + * This approach ensures the same keys will be consistently rejected until the admission percentage changes. + * + * This deterministic approach provides significant benefits: + * - Individual users/tasks receive consistent service or rejection + * - Related requests from the same user/session get uniform treatment + * - Retries from rejected tasks won't add load since they'll stay rejected + * - Accepted tasks can proceed without interruption + * + * ==Load Shedding Pattern== + * + * The system responds to increasing pressure through a gradual and predictable load shedding pattern: + * + * - As pressure increases and admission percentage drops, tasks with highest hash values are rejected + * - Additional tasks are rejected if pressure continues building + * - System stabilizes at lower load with a stable subset of traffic + * - During recovery, admission percentage gradually increases + * - Previously rejected tasks are readmitted in the same order they were rejected + * + * ==Backpressure Characteristics== + * + * This design creates an effective backpressure mechanism with several key characteristics. Load reduces predictably as the admission + * percentage drops, avoiding the oscillation patterns common with random rejection strategies. The system maintains a stable subset of + * flowing traffic, while providing natural queue-like behavior for rejected requests. + * + * ==Distributed Systems Context== + * + * The admission control mechanism is particularly effective in microservices architectures. Consistent rejection patterns help downstream + * services manage their own load effectively, while enabling client libraries to implement intelligent backoff strategies. The predictable + * degradation and recovery patterns make it easier to maintain system stability across service boundaries. + * + * @param loadAvg + * A supplier that provides the current system load average + * @param schedule + * Function to schedule probe tasks in the scheduler + * @param nowMillis + * Current time supplier for delay measurements + * @param timer + * Timer for scheduling periodic regulation + * @param config + * Configuration parameters controlling admission behavior + * + * @see + * Regulator for details on the underlying regulation mechanism + */ final class Admission( loadAvg: DoubleSupplier, schedule: Task => Unit, @@ -25,44 +84,122 @@ final class Admission( private val rejected = new LongAdder private val allowed = new LongAdder + /** Returns the current admission percentage representing system availability. + * + * The admission percentage dynamically adjusts based on system pressure and measured queuing delays. This value determines what + * portion of incoming tasks will be accepted versus rejected. The adjustment process follows several principles: + * + * - A value of 100 indicates the system is healthy and accepting all tasks + * - Lower values indicate the system is under pressure and selectively rejecting tasks + * - Changes occur gradually to maintain system stability + * - Values decrease when high queuing delays are detected + * - Values increase when the system shows capacity for more load + * + * @return + * Integer between 0 and 100 representing the percentage of tasks that will be admitted + */ def percent(): Int = admissionPercent - def reject(key: String): Boolean = - reject(MurmurHash3.stringHash(key)) - + /** Tests if a task should be rejected using random sampling. + * + * This method provides probabilistic load shedding based on the current admission percentage. It generates a new random number for + * each call, making it suitable for tasks where consistent admission decisions aren't required. This approach works well for: + * + * - One-off tasks with no related operations + * - Tasks where consistent rejection isn't critical + * - High-volume scenarios where perfect distribution isn't necessary + * - Cases where the caller doesn't have a natural key to use + * + * For tasks requiring consistent admission decisions, prefer using reject(key) or reject(string) instead. + * + * @return + * true if the task should be rejected, false if it should be admitted + */ def reject(): Boolean = reject(ThreadLocalRandom.current().nextInt()) + /** Tests if a task should be rejected using the string's hash value. + * + * Provides consistent admission decisions by using the string's hash as the sampling key. This method guarantees that identical + * strings will receive the same admission decision at any given admission percentage. This consistency is valuable for several use + * cases: + * + * - User IDs or session identifiers to maintain consistent user experience + * - Transaction or operation IDs for related task sets + * - Service names or endpoints for targeted load shedding + * - Any scenario requiring deterministic admission control + * + * @param key + * String to use for admission decision + * @return + * true if the task should be rejected, false if it should be admitted + */ + def reject(key: String): Boolean = + reject(MurmurHash3.stringHash(key)) + + /** Tests if a task should be rejected using the provided integer key. + * + * Provides consistent admission decisions by using the integer directly as the sampling key. This method guarantees that identical + * integers will receive the same admission decision at any given admission percentage. The integer is mapped to the admission space + * through a simple modulo operation, making it efficient for high-volume scenarios. + * + * This method is particularly useful for: + * - Numeric identifiers like user IDs or request sequence numbers + * - Hash values from other sources + * - Cases where the caller has already computed a suitable numeric key + * - Performance-critical scenarios needing minimal overhead + * + * @param key + * Integer to use for admission decision + * @return + * true if the task should be rejected, false if it should be admitted + */ def reject(key: Int): Boolean = { - val r = - (key.abs % 100) > admissionPercent + val r = (key.abs % 100) > admissionPercent if (r) rejected.increment() else allowed.increment() r } + /** Internal task class used for probing queue delays. + * + * This probe task measures the time between its creation and execution to detect queuing delays in the scheduler. The measured delays + * help determine when the system is under pressure and needs to adjust its admission rate. + */ final private class ProbeTask extends Task { val start = nowMillis.getAsLong() def run(startMillis: Long, clock: InternalClock) = { + // Record the scheduling delay measure(nowMillis.getAsLong() - start) Task.Done } } - protected def probe() = - schedule(new ProbeTask) + /** Initiates a probe measurement by scheduling a timing task. + * + * This method is called periodically to collect delay measurements that drive the admission control decisions. + */ + protected def probe() = schedule(new ProbeTask) + /** Updates the admission percentage based on system conditions. + * + * Adjusts the admission percentage while ensuring it stays within valid bounds (0-100). The adjustment size and direction are + * determined by the regulator based on observed delays and system load. + * + * @param diff + * The amount to adjust the admission percentage, positive for increase, negative for decrease + */ protected def update(diff: Int): Unit = admissionPercent = Math.max(0, Math.min(100, admissionPercent + diff)) - @nowarn("msg=unused") - private val gauges = - List( - statsScope.gauge("percent")(admissionPercent), - statsScope.counterGauge("allowed")(allowed.sum()), - statsScope.counterGauge("rejected")(rejected.sum()) - ) - + /** Provides current statistics about the admission controller's operation. + * + * Returns a snapshot of key metrics including the current admission percentage and counts of allowed and rejected tasks. This + * information is valuable for monitoring and debugging system behavior. + * + * @return + * Status object containing current metrics and regulator state + */ def status(): AdmissionStatus = AdmissionStatus( admissionPercent, @@ -70,6 +207,14 @@ final class Admission( rejected.sum(), regulatorStatus() ) + + @nowarn("msg=unused") + private val gauges = + List( + statsScope.gauge("percent")(admissionPercent), + statsScope.counterGauge("allowed")(allowed.sum()), + statsScope.counterGauge("rejected")(rejected.sum()) + ) } object Admission { diff --git a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Concurrency.scala b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Concurrency.scala index 43fa49981..394e1b35f 100644 --- a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Concurrency.scala +++ b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Concurrency.scala @@ -7,6 +7,38 @@ import kyo.scheduler.top.ConcurrencyStatus import kyo.scheduler.util.Flag import scala.concurrent.duration.* +/** Concurrency control regulator that optimizes thread count by detecting system scheduling delays. + * + * The Concurrency regulator maintains optimal performance by measuring the system's ability to promptly execute threads. It uses a + * dedicated OS thread to perform brief sleep operations, analyzing delays between requested and actual wake times to detect various forms + * of contention and interference. This approach is similar to the jHiccup tool, but uses standard deviation analysis via the Regulator + * framework to make automatic adjustments. + * + * The probe mechanism relies on the operating system's thread scheduling behavior. In a healthy system, the probe thread wakes up from + * sleep very close to the requested time. Various conditions can delay thread wake-ups, including OS scheduler overload, CPU throttling, + * excessive context switching, hypervisor interference, and hardware power management. By measuring these delays, the regulator detects + * when the system is struggling to handle the current thread count. + * + * When wake-up delays show high jitter, indicating degraded thread scheduling, the regulator reduces the number of workers. When delays + * are consistent and the system maintains its target load, the regulator gradually increases workers. This approach automatically finds + * the optimal thread count for the current system conditions and available CPU resources. + * + * @param loadAvg + * A supplier that provides the current system load average + * @param updateConcurrency + * Callback to update the number of worker threads + * @param sleep + * Function to perform sleep probes (must use OS thread sleep) + * @param nowNanos + * Current time supplier for wake-up delay measurements + * @param timer + * Timer for scheduling periodic regulation + * @param config + * Configuration parameters controlling concurrency adjustment + * + * @see + * Regulator for details on the underlying regulation mechanism + */ final class Concurrency( loadAvg: DoubleSupplier, updateConcurrency: Int => Unit, @@ -16,12 +48,29 @@ final class Concurrency( config: Config = Concurrency.defaultConfig ) extends Regulator(loadAvg, timer, config) { + /** Performs a probe measurement by executing a brief sleep operation. + * + * This method measures thread scheduling delays by: + * - Recording the start time + * - Performing a 1ms sleep + * - Measuring the actual delay beyond the requested sleep time + * + * The measured delay indicates system scheduling efficiency and contention. + */ protected def probe() = { val start = nowNanos.getAsLong() sleep(1) measure(nowNanos.getAsLong() - start - 1000000) } + /** Updates the number of worker threads based on regulation decisions. + * + * @param diff + * The change in thread count to apply: + * - Positive values increase threads + * - Negative values decrease threads + * - Magnitude increases with consecutive adjustments + */ protected def update(diff: Int): Unit = updateConcurrency(diff) diff --git a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Config.scala b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Config.scala index b61b72ed0..eb8ff99b6 100644 --- a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Config.scala +++ b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Config.scala @@ -2,6 +2,26 @@ package kyo.scheduler.regulator import scala.concurrent.duration.Duration +/** Configuration parameters controlling regulator behavior. + * + * The configuration determines how regulators collect and analyze timing measurements, and how aggressively they respond to detected + * issues. These parameters balance responsiveness against stability. + * + * @param collectWindow + * Size of the moving window used for standard deviation calculation + * @param collectInterval + * Interval between probe measurements + * @param regulateInterval + * Interval between regulation adjustments + * @param jitterUpperThreshold + * High standard deviation threshold that triggers load reduction + * @param jitterLowerThreshold + * Low standard deviation threshold that allows load increase + * @param loadAvgTarget + * Target load level - load must meet this for increases + * @param stepExp + * Controls how quickly consecutive adjustments escalate + */ case class Config( collectWindow: Int, collectInterval: Duration, diff --git a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Regulator.scala b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Regulator.scala index e2b5ed7d1..965f37e83 100644 --- a/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Regulator.scala +++ b/kyo-scheduler/jvm/src/main/scala/kyo/scheduler/regulator/Regulator.scala @@ -7,6 +7,53 @@ import kyo.scheduler.top.RegulatorStatus import kyo.scheduler.util.* import scala.util.control.NonFatal +/** A self-tuning regulator that dynamically adjusts scheduler behavior based on system performance metrics. This base class provides + * automatic adjustment of scheduler parameters based on real-time performance measurements and statistical analysis of timing variations. + * + * ==Measurement Collection== + * + * The regulator collects timing measurements through periodic probes at configured intervals. These measurements are stored in a moving + * window, which provides an efficient way to maintain recent performance history while automatically discarding old data. This approach + * enables quick detection of emerging performance trends while smoothing out momentary irregularities. + * + * ==Jitter Analysis== + * + * Collected measurements are analyzed using a moving standard deviation calculation to determine system stability. This "jitter" metric + * reveals performance characteristics such as sudden instability, ongoing systemic issues, and recovery patterns. The analysis focuses on + * detecting significant deviations that indicate potential performance problems. + * + * ==Adjustment Mechanism== + * + * Based on the jitter analysis, the regulator makes incremental adjustments to maintain system stability. When jitter exceeds the upper + * threshold, the regulator adjusts using negative steps. Conversely, when jitter falls below the lower threshold and load meets the + * target, it adjusts with positive steps. Step sizes increase exponentially with consecutive adjustments in the same direction but reset + * when the direction changes, providing both responsiveness and stability. + * + * ==Configuration== + * + * The regulator's behavior is controlled through configuration parameters that define the measurement window size, collection and + * regulation intervals, jitter thresholds, target load, and step escalation rate. These parameters can be tuned to match specific system + * characteristics and performance requirements. + * + * @param loadAvg + * A supplier that provides the current system load average (0.0 to 1.0) + * @param timer + * Timer used for scheduling periodic measurements and adjustments + * @param config + * Configuration parameters for the regulator + * + * @note + * Implementations must provide probe() and update() methods to define measurement collection and adjustment application respectively. + * + * @see + * Config for configuration parameters + * @see + * MovingStdDev for jitter calculation details + * @see + * Admission for admission control implementation + * @see + * Concurrency for concurrency control implementation + */ abstract class Regulator( loadAvg: DoubleSupplier, timer: InternalTimer, @@ -21,9 +68,47 @@ abstract class Regulator( private val adjustments = new LongAdder private val updates = new LongAdder + /** Collect a performance measurement. + * + * This method should implement the specific probing mechanism for the regulator. Implementations must call `measure()` with the + * collected measurement value. + */ protected def probe(): Unit + + /** Apply a regulation adjustment. + * + * @param diff + * The size and direction of adjustment to apply Positive values indicate increase Negative values indicate decrease Magnitude + * increases with consecutive adjustments + */ protected def update(diff: Int): Unit + /** Record a measurement value for regulation. + * + * @param v + * The measurement value in nanoseconds + * + * Measurements are used to: + * - Calculate jitter (standard deviation) + * - Detect performance anomalies + * - Guide adjustment decisions + */ + protected def measure(v: Long): Unit = { + probesCompleted.increment() + stats.measurement.observe(Math.max(0, v.toDouble)) + synchronized(measurements.observe(v)) + } + + /** Stop the regulator. + * + * Cancels all scheduled tasks and cleans up resources. + */ + def stop(): Unit = { + def discard(v: Any) = {} + discard(collectTask.cancel()) + discard(regulateTask.cancel()) + } + private val collectTask = timer.schedule(collectInterval)(collect()) @@ -40,33 +125,38 @@ abstract class Regulator( } } - protected def measure(v: Long): Unit = { - probesCompleted.increment() - stats.measurement.observe(Math.max(0, v.toDouble)) - synchronized(measurements.observe(v)) - } - final private def adjust() = { try { adjustments.increment() + + // Calculate current performance metrics val jitter = synchronized(measurements.dev()) val load = loadAvg.getAsDouble() + + // Determine adjustment direction based on jitter thresholds if (jitter > jitterUpperThreshold) { + // High jitter - increase negative step size if (step < 0) step -= 1 else step = -1 } else if (jitter < jitterLowerThreshold && load >= loadAvgTarget) { + // Low jitter and sufficient load - increase positive step size if (step > 0) step += 1 else step = 1 } else + // Reset step when within acceptable range step = 0 + if (step != 0) { + // Calculate exponential adjustment size based on consecutive steps val pow = Math.pow(Math.abs(step), stepExp).toInt val delta = if (step < 0) -pow else pow + // Track actual updates and apply the adjustment updates.increment() update(delta) } + stats.jitter.observe(jitter) stats.loadavg.observe(load) } catch { @@ -74,13 +164,6 @@ abstract class Regulator( kyo.scheduler.bug(s"${getClass.getSimpleName()} regulator's adjustment has failed.", ex) } } - - def stop(): Unit = { - def discard(v: Any) = {} - discard(collectTask.cancel()) - discard(regulateTask.cancel()) - } - protected val statsScope = kyo.scheduler.statsScope.scope("regulator", getClass.getSimpleName().toLowerCase()) private object stats {