Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ The tables below list every Spark built-in expression with its current status.
| `max` | ✅ | |
| `max_by` | 🔜 | [#3841](https://github.com/apache/datafusion-comet/issues/3841) |
| `mean` | ✅ | |
| `median` | ✅ | Rewrites to `percentile(col, 0.5)`; falls back by default, opt-in via allowIncompatible ([#4719](https://github.com/apache/datafusion-comet/issues/4719)) |
| `median` | ✅ | Rewrites to `percentile(col, 0.5)` and runs natively for supported percentile inputs |
| `min` | ✅ | |
| `min_by` | 🔜 | [#3841](https://github.com/apache/datafusion-comet/issues/3841) |
| `mode` | 🔜 | [#3970](https://github.com/apache/datafusion-comet/issues/3970) |
| `percentile` | ✅ | Single literal percentage on numeric input; array of percentages and a frequency argument fall back to Spark. Falls back by default, opt-in via allowIncompatible ([#4719](https://github.com/apache/datafusion-comet/issues/4719)) |
| `percentile_cont` | ✅ | Spark 4.0+ `WITHIN GROUP (ORDER BY ...)`; ascending only, `DESC` falls back to Spark. Falls back by default, opt-in via allowIncompatible ([#4719](https://github.com/apache/datafusion-comet/issues/4719)) |
| `percentile` | ✅ | Single literal percentage on numeric input runs natively; array of percentages and a frequency argument fall back to Spark |
| `percentile_cont` | ✅ | Spark 4.0+ `WITHIN GROUP (ORDER BY ...)`; ascending only runs natively, `DESC` falls back to Spark |
| `percentile_disc` | 🔜 | Percentile aggregate |
| `regr_avgx` | ✅ | Native: Spark rewrites to `Average` (tests in [#4551](https://github.com/apache/datafusion-comet/issues/4551)) |
| `regr_avgy` | ✅ | Native: Spark rewrites to `Average` (tests in [#4551](https://github.com/apache/datafusion-comet/issues/4551)) |
Expand Down
30 changes: 23 additions & 7 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use datafusion::functions_aggregate::bit_and_or_xor::{bit_and_udaf, bit_or_udaf,
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::functions_aggregate::min_max::max_udaf;
use datafusion::functions_aggregate::min_max::min_udaf;
use datafusion::functions_aggregate::percentile_cont::percentile_cont_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
Expand Down Expand Up @@ -74,7 +73,7 @@ use datafusion::{
use datafusion_comet_spark_expr::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle,
BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SparkArraysZipFunc,
SparkBloomFilterVersion, SumInteger, ToCsv,
SparkBloomFilterVersion, SparkPercentile, SumInteger, ToCsv,
};
use datafusion_spark::function::aggregate::collect::SparkCollectSet;
use iceberg::expr::Bind;
Expand Down Expand Up @@ -2616,12 +2615,14 @@ impl PhysicalPlanner {
let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?;
let percentile =
self.create_expr(expr.percentage.as_ref().unwrap(), Arc::clone(&schema))?;
// DataFusion's percentile_cont uses the same `index = p * (n - 1)` linear
// interpolation as Spark's exact Percentile, so results match for the single
// percentage case wired here.
AggregateExprBuilder::new(percentile_cont_udaf(), vec![child, percentile])
// Spark's exact Percentile uses full-precision linear interpolation. Comet uses
// its own UDAF rather than DataFusion's percentile_cont because DataFusion
// quantizes the interpolation weight.
let percentile_value = percentile_value(expr.percentage.as_ref().unwrap())?;
let func = AggregateUDF::new_from_impl(SparkPercentile::try_new(percentile_value)?);
AggregateExprBuilder::new(func.into(), vec![child, percentile])
.schema(schema)
.alias("percentile_cont")
.alias("percentile")
.with_ignore_nulls(false)
.with_distinct(false)
.build()
Expand Down Expand Up @@ -3273,6 +3274,21 @@ impl PhysicalPlanner {
}
}

fn percentile_value(expr: &spark_expression::Expr) -> Result<f64, ExecutionError> {
match &expr.expr_struct {
Some(ExprStruct::Literal(literal)) if !literal.is_null => match &literal.value {
Some(Value::DoubleVal(value)) => Ok(*value),
Some(Value::FloatVal(value)) => Ok(*value as f64),
_ => Err(GeneralError(
"Percentile value must be a floating-point literal".to_string(),
)),
},
_ => Err(GeneralError(
"Percentile value must be a non-null literal".to_string(),
)),
}
}

/// Collects the indices of the columns in the input schema that are used in the expression
/// and returns them as a pair of vectors, one for the left side and one for the right side.
fn expr_to_columns(
Expand Down
2 changes: 2 additions & 0 deletions native/spark-expr/src/agg_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod avg;
mod avg_decimal;
mod correlation;
mod covariance;
mod percentile;
mod stddev;
mod sum_decimal;
mod sum_int;
Expand All @@ -29,6 +30,7 @@ pub use avg::Avg;
pub use avg_decimal::AvgDecimal;
pub use correlation::Correlation;
pub use covariance::Covariance;
pub use percentile::SparkPercentile;
pub use stddev::Stddev;
pub use sum_decimal::SumDecimal;
pub use sum_int::SumInteger;
Expand Down
Loading
Loading