Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Cache jstrings during metrics collection #1029

Merged
merged 7 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ struct ExecutionContext {
pub debug_native: bool,
/// Whether to write native plans with metrics to stdout
pub explain_native: bool,
/// Map of metrics name -> jstring object to cache jni_NewStringUTF calls.
pub metrics_jstrings: HashMap<String, Arc<GlobalRef>>,
}

/// Accept serialized query plan and return the address of the native query plan.
Expand Down Expand Up @@ -178,6 +180,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
session_ctx: Arc::new(session),
debug_native,
explain_native,
metrics_jstrings: HashMap::new(),
});

Ok(Box::into_raw(exec_context) as i64)
Expand Down Expand Up @@ -441,10 +444,11 @@ pub extern "system" fn Java_org_apache_comet_Native_releasePlan(
}

/// Updates the metrics of the query plan.
fn update_metrics(env: &mut JNIEnv, exec_context: &ExecutionContext) -> CometResult<()> {
fn update_metrics(env: &mut JNIEnv, exec_context: &mut ExecutionContext) -> CometResult<()> {
let native_query = exec_context.root_op.as_ref().unwrap();
let metrics = exec_context.metrics.as_obj();
update_comet_metric(env, metrics, native_query)
let metrics_jstrings = &mut exec_context.metrics_jstrings;
update_comet_metric(env, metrics, native_query, metrics_jstrings)
}

fn convert_datatype_arrays(
Expand Down
28 changes: 25 additions & 3 deletions native/core/src/execution/metrics/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use crate::jvm_bridge::jni_new_global_ref;
use crate::{
errors::CometError,
jvm_bridge::{jni_call, jni_new_string},
};
use datafusion::physical_plan::ExecutionPlan;
use jni::objects::{GlobalRef, JString};
use jni::{objects::JObject, JNIEnv};
use std::collections::HashMap;
use std::sync::Arc;

/// Updates the metrics of a CometMetricNode. This function is called recursively to
Expand All @@ -30,6 +33,7 @@ pub fn update_comet_metric(
env: &mut JNIEnv,
metric_node: &JObject,
execution_plan: &Arc<dyn ExecutionPlan>,
metrics_jstrings: &mut HashMap<String, Arc<GlobalRef>>,
) -> Result<(), CometError> {
update_metrics(
env,
Expand All @@ -41,6 +45,7 @@ pub fn update_comet_metric(
.map(|m| m.value())
.map(|m| (m.name(), m.as_usize() as i64))
.collect::<Vec<_>>(),
metrics_jstrings,
)?;

unsafe {
Expand All @@ -51,7 +56,7 @@ pub fn update_comet_metric(
if child_metric_node.is_null() {
continue;
}
update_comet_metric(env, &child_metric_node, child_plan)?;
update_comet_metric(env, &child_metric_node, child_plan, metrics_jstrings)?;
}
}
Ok(())
Expand All @@ -62,11 +67,28 @@ fn update_metrics(
env: &mut JNIEnv,
metric_node: &JObject,
metric_values: &[(&str, i64)],
metrics_jstrings: &mut HashMap<String, Arc<GlobalRef>>,
) -> Result<(), CometError> {
unsafe {
for &(name, value) in metric_values {
let jname = jni_new_string!(env, &name)?;
jni_call!(env, comet_metric_node(metric_node).set(&jname, value) -> ())?;
// Perform a lookup in the jstrings cache.
if let Some(map_global_ref) = metrics_jstrings.get(name) {
// Cache hit. Extract the jstring from the global ref.
let jobject = map_global_ref.as_obj();
let jstring = JString::from_raw(**jobject);
// Update the metrics using the jstring as a key.
jni_call!(env, comet_metric_node(metric_node).set(&jstring, value) -> ())?;
} else {
// Cache miss. Allocate a new string, promote to global ref, and insert into cache.
let local_jstring = jni_new_string!(env, &name)?;
let global_ref = jni_new_global_ref!(env, local_jstring)?;
let arc_global_ref = Arc::new(global_ref);
metrics_jstrings.insert(name.to_string(), Arc::clone(&arc_global_ref));
let jobject = arc_global_ref.as_obj();
let jstring = JString::from_raw(**jobject);
// Update the metrics using the jstring as a key.
jni_call!(env, comet_metric_node(metric_node).set(&jstring, value) -> ())?;
}
}
}
Ok(())
Expand Down
Loading