fix: match Spark percentile interpolation precision#4792
Conversation
Use a Comet-native percentile aggregate for Spark percentile so interpolation keeps Spark's full-precision weight and can run compatible by default. Add native and SQL regressions for deeply interpolated percentile values. Co-authored-by: Codex <codex@openai.com>
andygrove
left a comment
There was a problem hiding this comment.
Thanks for tackling this. Overall the implementation looks solid and the CI is fully green across all Spark versions. A few observations and suggestions below.
Algorithm correctness
The new SparkPercentile mirrors Spark's Percentile.getPercentile closely. Position is (len - 1) * percentile, lower/higher via floor/ceil, and interpolation is (higher - position) * lower + (position - lower) * higher. That matches percentiles.scala:197-225 in Spark's source.
spark_double_cmp correctly matches SQLOrderingUtil.compareDoubles: NaN == NaN, NaN > any non-NaN, and -0.0 == 0.0. It's a proper total order so select_nth_unstable_by is safe.
Refactor of adjustOutputForNativeState
Moving the method to CometBaseAggregate and widening the mode check from if (modes != Seq(Partial)) to modeSet.subsetOf(Set(Partial, PartialMerge)) is the right fix for mixed distinct plans. One small question: since HashAggregateExec cannot hold a TypedImperativeAggregate like Percentile or CollectSet, the call to adjustOutputForNativeState from CometHashAggregateExec.createExec should always fall through the case _ => branch. Is that the intent, or is there a scenario where a HashAggregateExec would actually carry one of these functions?
Stale docs in expressions.md
docs/source/user-guide/latest/expressions.md lines 100, 104, 105 still say median, percentile, and percentile_cont "fall back by default, opt-in via allowIncompatible (#4719)". Now that this PR marks the supported form as Compatible(), those notes are out of date. Could you update them so users know these run natively without needing allowIncompatible?
Missing edge case tests
The new spark_double_cmp has custom handling for NaN and negative zero, but the SQL fixtures don't exercise those cases. It might be worth adding a query with double('NaN'), -0.0, and double('Infinity') so ordering behavior is exercised end-to-end and any future regression in the comparator would be caught by CI.
Benchmarks
The PR touches a performance-critical path and the changelog mentions CometAggregateExpressionBenchmark.scala, but the description does not include numbers. select_nth_unstable_by called twice may be slower than DataFusion's specialized path. Do you have before/after numbers to confirm there's no regression?
Minor observations
percentile_valueinplanner.rshandles bothDoubleValandFloatVal, but the Scala serde always wraps inLiteral(..., DoubleType), soFloatValis unreachable. Happy to leave it as defensive code, just flagging.- The
percentileexpression is still passed as a second argument toAggregateExprBuilder, but the accumulator readsself.percentile()from the UDAF, so the per-batch scalar column is materialized and unused. Functionally correct, mild inefficiency.
Nice work overall. The main gap to close before merge is refreshing the stale docs in expressions.md. Everything else is nice-to-have.
Refresh percentile support docs, keep native-state schema adjustment on the object aggregate path, and add SQL coverage for special double ordering. Co-authored-by: Codex <codex@openai.com>
Which issue does this PR close?
Closes #4719
Rationale for this change
Spark's exact
Percentileuses full-precision linear interpolation, while DataFusion'spercentile_contquantizes the interpolation weight to 6 decimal places. That can produce visible mismatches for deeply interpolated values, so the supported Comet percentile path had to remain behindallowIncompatible.What changes are included in this PR?
SparkPercentileaggregate UDAF that stores values in the existingList<Float64>state shape and computes Spark-compatible full-precision interpolation.Percentileplanning to the new UDAF instead of DataFusionpercentile_cont.percentileand Spark 4percentile_cont ... WITHIN GROUP.How are these changes tested?
cargo fmt --allcargo check -p datafusion-comet-spark-exprcargo check -p datafusion-cometcargo test -p datafusion-comet-spark-expr percentilecargo clippy -p datafusion-comet-spark-expr --all-targets --all-featuresmake core./mvnw test -Pjdk17 -Dtest=none -Dsuites="org.apache.comet.CometSqlFileTestSuite percentile"