Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[scheduler] scaladocs #863

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions kyo-scheduler/jvm/src/main/scala/kyo/scheduler/InternalClock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,54 @@ import java.util.concurrent.Executor
import java.util.concurrent.locks.LockSupport
import scala.annotation.nowarn

/** Low-resolution clock optimized for frequent access in the scheduler.
*
* While System.currentTimeMillis is accurate, calling it frequently creates significant overhead due to system calls. This clock uses a
* dedicated thread to update a volatile timestamp every millisecond, allowing other threads to read the current time without system calls.
*
* The tradeoff of potentially being off by up to a millisecond is acceptable for scheduler operations like measuring task runtime and
* detecting stalled workers. The performance benefit of avoiding system calls on every time check is substantial when processing thousands
* of tasks per second.
*
* The clock self-corrects any drift by measuring the actual elapsed time between updates.
*
* @param executor
* Executor for running the update thread
*/
final case class InternalClock(executor: Executor) {

@volatile private var _stop = false

val a1, a2, a3, a4, a5, a6, a7 = 0L // padding
// padding to avoid false sharing
val a1, a2, a3, a4, a5, a6, a7 = 0L

@volatile private var millis = System.currentTimeMillis()

val b1, b2, b3, b4, b5, b6, b7 = 0L // padding
// padding to avoid false sharing
val b1, b2, b3, b4, b5, b6, b7 = 0L

private var start = System.nanoTime()

/** Get the current time in milliseconds without making a system call.
*
* This method is designed for frequent calls, returning the latest cached timestamp from the update thread. The returned time has
* millisecond resolution but may be up to one millisecond behind the system time.
*
* @return
* Current time in milliseconds since epoch, accurate to within one millisecond
*/
def currentMillis(): Long = millis

/** Stop the clock's update thread.
*
* After stopping, currentMillis() will return the last updated timestamp. The clock cannot be restarted after stopping - create a new
* instance if needed.
*/
def stop(): Unit =
_stop = true

executor.execute(() =>
// update in a separate method to ensure the code is JIT compiled
while (!_stop) update()
)

Expand All @@ -28,11 +63,6 @@ final case class InternalClock(executor: Executor) {
LockSupport.parkNanos(1000000L - elapsed)
}

def currentMillis(): Long = millis

def stop(): Unit =
_stop = true

@nowarn("msg=unused")
private val gauge =
statsScope.scope("clock").gauge("skew")((System.currentTimeMillis() - millis).toDouble)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration

/** Scheduler-internal timer abstraction that provides unified scheduling primitives.
*
* Abstracts over the underlying executor to support both testing scenarios (via TestTimer) and production usage (via
* ScheduledExecutorService), while maintaining consistent scheduling semantics throughout the scheduler implementation.
*/
abstract private[kyo] class InternalTimer {
def schedule(interval: Duration)(f: => Unit): TimerTask
def scheduleOnce(delay: Duration)(f: => Unit): TimerTask
Expand Down
59 changes: 59 additions & 0 deletions kyo-scheduler/jvm/src/main/scala/kyo/scheduler/Queue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,44 @@ import java.util.concurrent.atomic.AtomicBoolean
import kyo.*
import scala.collection.mutable.PriorityQueue

/** Specialized concurrent priority queue supporting atomic batch operations and work stealing.
*
* This queue implementation extends beyond standard concurrent queue operations to provide atomic batch transfers and priority-based
* ordering. While designed primarily for task scheduling, the core operations are generally applicable to producer-consumer scenarios
* requiring work stealing and load balancing.
*
* The implementation uses explicit locking via AtomicBoolean for thread safety while supporting specialized operations like atomic batch
* transfers. Priority ordering is maintained using the implicit ordering, typically based on runtime or cost metrics in scheduling
* contexts.
*
* The scheduler uses this queue as the foundation for task distribution between workers, leveraging the work stealing and draining
* capabilities for load balancing and worker management.
*
* @tparam A
* The element type, ordered by the implicit Ordering
* @param ord
* Implicit ordering for priority queue behavior
*/
final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean {

private val queue = PriorityQueue[A]()

private var items = 0

/** Tests if queue contains no elements. Provides a non-blocking snapshot of empty state.
*/
def isEmpty() =
size() == 0

/** Returns number of elements in queue. Non-blocking operation with memory fence for cross-thread visibility.
*/
def size(): Int = {
VarHandle.acquireFence()
items
}

/** Adds element to queue in priority order. Acquires lock for thread-safe insertion.
*/
def add(value: A): Unit = {
lock()
try {
Expand All @@ -29,6 +53,8 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean {
unlock()
}

/** Attempts non-blocking element addition.
*/
def offer(value: A): Boolean =
tryLock() && {
try {
Expand All @@ -39,6 +65,8 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean {
unlock()
}

/** Retrieves and removes highest priority element.
*/
def poll(): A =
if (isEmpty())
null.asInstanceOf[A]
Expand All @@ -55,6 +83,10 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean {
unlock()
}

/** Atomically exchanges input element for highest priority element.
*
* Returns input if queue is empty, otherwise adds input and returns previous highest priority element as single atomic operation.
*/
def addAndPoll(value: A): A =
if (isEmpty())
value
Expand All @@ -71,6 +103,18 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean {
unlock()
}

/** Atomically transfers elements between queues.
*
* Moves highest priority element plus approximately half of remaining elements to target queue. Operation only succeeds if target is
* empty and both locks can be acquired without blocking. Maintains priority ordering in both queues.
*
* Used for work stealing in scheduling contexts.
*
* @param to
* recipient queue for stolen elements
* @return
* highest priority element that was transferred, or null if transfer failed
*/
def stealingBy(to: Queue[A]): A = {
var t: A = null.asInstanceOf[A]
val _ =
Expand All @@ -97,6 +141,11 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean {
t
}

/** Atomically removes and processes all elements.
*
* Locks queue, removes all elements, then applies function to each removed element after lock release. Used for queue shutdown and
* rebalancing.
*/
def drain(f: A => Unit): Unit =
if (!isEmpty()) {
val tasks = {
Expand All @@ -110,6 +159,16 @@ final private class Queue[A](implicit ord: Ordering[A]) extends AtomicBoolean {
tasks.foreach(f)
}

/** Acquires queue lock using busy-wait spin loop.
*
* Uses busy waiting rather than parking/blocking since:
* - Effective concurrency is bounded by available cores
* - Lock hold times are very short (optimized queue operations)
* - Most contended operations (stealing) fail fast if lock unavailable
*
* The scheduler's worker model ensures the number of threads competing for any given queue is limited, making spinning more efficient
* than thread parking overhead.
*/
private def lock(): Unit =
while (!compareAndSet(false, true)) {}

Expand Down
Loading
Loading