Skip to content

Commit

Permalink
add update metrics interval
Browse files Browse the repository at this point in the history
  • Loading branch information
wForget committed Feb 5, 2025
1 parent 642c737 commit b52b6ae
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 1 deletion.
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,14 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] =
conf("spark.comet.metrics.updateInterval")
.doc(
"The interval in milliseconds to update metrics. If interval is negative," +
" metrics will be updated upon task completion.")
.longConf
.createWithDefault(3000L)

/** Create a config to enable a specific operator */
private def createExecEnabledConfig(
exec: String,
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Comet provides the following configuration settings.
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.metrics.updateInterval | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false |
| spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false |
Expand Down
25 changes: 25 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ struct ExecutionContext {
pub runtime: Runtime,
/// Native metrics
pub metrics: Arc<GlobalRef>,
// The interval in milliseconds to update metrics
pub metrics_update_interval: Option<Duration>,
// The last update time of metrics
pub metrics_last_update_time: Instant,
/// The time it took to create the native plan and configure the context
pub plan_creation_time: Duration,
/// DataFusion SessionContext
Expand Down Expand Up @@ -158,6 +162,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
serialized_query: jbyteArray,
partition_count: jint,
metrics_node: JObject,
metrics_update_interval: jlong,
comet_task_memory_manager_obj: JObject,
batch_size: jint,
use_unified_memory_manager: jboolean,
Expand Down Expand Up @@ -220,6 +225,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(

let plan_creation_time = start.elapsed();

let metrics_update_interval = if metrics_update_interval > 0 {
Some(Duration::from_millis(metrics_update_interval as u64))
} else {
None
};

let exec_context = Box::new(ExecutionContext {
id,
task_attempt_id,
Expand All @@ -231,6 +242,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
stream: None,
runtime,
metrics,
metrics_update_interval,
metrics_last_update_time: Instant::now(),
plan_creation_time,
session_ctx: Arc::new(session),
debug_native: debug_native == 1,
Expand Down Expand Up @@ -505,6 +518,18 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
let next_item = exec_context.stream.as_mut().unwrap().next();
let poll_output = exec_context.runtime.block_on(async { poll!(next_item) });

// update metrics at interval
match exec_context.metrics_update_interval {
Some(interval) => {
let now = Instant::now();
if now - exec_context.metrics_last_update_time >= interval {
update_metrics(&mut env, exec_context)?;
exec_context.metrics_last_update_time = now;
}
}
None => {}
}

match poll_output {
Poll::Ready(Some(output)) => {
// prepare output for FFI transfer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_METRICS_UPDATE_INTERVAL, COMET_WORKER_THREADS}
import org.apache.comet.vector.NativeUtil

/**
Expand Down Expand Up @@ -72,6 +72,7 @@ class CometExecIterator(
protobufQueryPlan,
numParts,
nativeMetrics,
metricsUpdateInterval = COMET_METRICS_UPDATE_INTERVAL.get(),
new CometTaskMemoryManager(id),
batchSize = COMET_BATCH_SIZE.get(),
use_unified_memory_manager = conf.getBoolean("spark.memory.offHeap.enabled", false),
Expand Down
4 changes: 4 additions & 0 deletions spark/src/main/scala/org/apache/comet/Native.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class Native extends NativeBase {
* the bytes of serialized SparkPlan.
* @param metrics
* the native metrics of SparkPlan.
* @param metricsUpdateInterval
* the interval in milliseconds to update metrics, if interval is negative, metrics will be
* updated upon task completion.
* @param taskMemoryManager
* the task-level memory manager that is responsible for tracking memory usage across JVM and
* native side.
Expand All @@ -53,6 +56,7 @@ class Native extends NativeBase {
plan: Array[Byte],
partitionCount: Int,
metrics: CometMetricNode,
metricsUpdateInterval: Long,
taskMemoryManager: CometTaskMemoryManager,
batchSize: Int,
use_unified_memory_manager: Boolean,
Expand Down

0 comments on commit b52b6ae

Please sign in to comment.