[spark] Lazy partition pruning for engine format table#8300
Conversation
d294391 to
c97bb41
Compare
| leafDirToChildrenFiles | ||
| } | ||
|
|
||
| override def refresh(): Unit = fileStatusCache.invalidateAll() |
There was a problem hiding this comment.
This only invalidates the shared FileStatusCache. Once fullIndex has been initialized, it still keeps its own cached leaf files, leaf-dir map, and partition spec, while Spark's InMemoryFileIndex.refresh() also calls refresh0() to rebuild those fields. I reproduced this by listing an index with pt=1, creating pt=2, calling refresh(), and then listFiles(Nil, Nil) still returned only pt=1. Please refresh/recreate fullIndex here so REFRESH TABLE and write refresh paths do not leave unfiltered scans/allFiles/partitionSpec stale.
There was a problem hiding this comment.
Thanks for the review. I investigated the refresh() path:
REFRESH TABLE in Spark V2 goes through RefreshTableExec → catalog.invalidateTable(ident), which causes the next query to call loadTable() and recreate the entire table instance (including a fresh LazyPartitionPruningFileIndex). FileIndex.refresh() is not called in this path. Same pattern as CatalogFileIndex, which also uses an immutable val sizeInBytes and its refresh() only clears fileStatusCache.
Although FileIndex.refresh() is currently only called from V1 paths (InsertIntoHadoopFsRelationCommand, CacheManager.recacheByPath) which engine format tables don't hit, I've made fullIndex resettable via refresh() (using @volatile var + double-checked locking) to avoid potential issues in the future. This ensures refresh() behaves consistently with InMemoryFileIndex.
Added a config spark.paimon.format-table.engine.lazy-partition-pruning (default true). Set to false to fall back to eager listing, which may be better for small tables queried repeatedly without partition filters — eager listing caches all files at construction and avoids per-query directory traversal overhead.
c97bb41 to
495f0c7
Compare
Replace the eager InMemoryFileIndex (which recursively lists all files at construction time) with LazyPartitionPruningFileIndex that defers file listing until listFiles() is called and prunes partition directories level-by-level using partition filters. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
495f0c7 to
92faa84
Compare
Change lazy val fullIndex to @volatile var with double-checked locking. refresh() now resets fullIndex so that subsequent calls see fresh data, consistent with InMemoryFileIndex.refresh() behavior. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When fullIndex is already created (e.g. by sizeInBytes during join planning), filtered listFiles reuses it for in-memory pruning instead of re-traversing the filesystem via discoverPartitions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| override def partitionSpec(): PartitionSpec = | ||
| PartitionSpec(_partitionSchema, Seq.empty) | ||
|
|
||
| override def sizeInBytes: Long = fullIndex.sizeInBytes |
There was a problem hiding this comment.
Spark also asks the scan for statistics while planning some queries. For file scans, estimateStatistics() calls fileIndex.sizeInBytes, so this initializes fullIndex by recursively listing the whole table before listFiles can apply the partition filters. I reproduced this on the latest head with a 20x30 partitioned format table: SELECT * FROM t JOIN d ON t.p1 = d.p1 WHERE t.p1 = 1 discovers 600 files instead of the expected 30 because planning touches sizeInBytes, and then _fullIndex != null makes the filtered read use the eager index too. Could we avoid constructing fullIndex for lazy-path stats, or keep filtered listFiles lazy even after stats are requested?
There was a problem hiding this comment.
The 600 files discovered comes from sizeInBytes triggered by DataSourceV2ScanRelation.computeStats() during planning, not from listFiles. When _fullIndex != null, all files have already been listed and cached in memory — filtered listFiles simply reuses fullIndex for in-memory prunePartitions with zero additional FS calls and zero additional FILES_DISCOVERED. Without the _fullIndex != null check, discoverPartitions would create a prunedIndex whose paths miss FileStatusCache (keyed by root path, not sub-paths), adding more to the metric (verified: 16 vs 15 in a 5×3 table).
To answer your two questions: (1) In Spark's query lifecycle, sizeInBytes is called during planning before listFiles during execution. At that point no listFiles results are available yet, so fullIndex must list all files. If the order were reversed, we could compute sizeInBytes from listFiles results without full listing. This requires optimizing Spark's FileScan to defer or reorder stats computation, which is beyond this PR. (2) Keeping filtered listFiles lazy after stats are requested would actually be worse — it adds redundant FS calls and increases FILES_DISCOVERED due to FileStatusCache key mismatch.
There was a problem hiding this comment.
Thanks for checking. I agree that once fullIndex has already been materialized, reusing it in listFiles is better than doing another filesystem discovery. My remaining concern is the earlier materialization itself: for filtered joins, planning still performs the full-table listing before execution, so the lazy path does not help that common query shape. Also, Spark’s FileScan.estimateStatistics() uses the index-wide sizeInBytes and does not apply the partition filters, so the full listing is not making the filtered scan stats partition-aware either. Could the lazy index return a cheap unknown/default estimate while _fullIndex is still null, and only delegate to fullIndex.sizeInBytes after an unfiltered scan or inputFiles has already materialized it? That would preserve lazy pruning for plans that touch stats without adding the redundant execution listing you mentioned.
There was a problem hiding this comment.
Went a step beyond a flat default. A constant defaultSizeInBytes keeps it lazy but means a small partitioned table is never broadcast.
So while _fullIndex is null, sizeInBytes now returns a cheap estimate: partition count from per-level fan-outs along one path × average size of a few sampled leaf partitions. It uses fs.listStatus directly — no full index, no HiveCatalogMetrics. Falls back to defaultSizeInBytes on empty table / empty sample / overflow so we never underestimate and wrongly broadcast a large table. The exact size is used once _fullIndex is materialized (unfiltered scan / inputFiles).
Added tests in FormatTableTestBase: a small partitioned table is now broadcast (planned as SMJ before), filtered-join file discovery stays at the pruned count, and empty-table joins don't misfire.
…les can broadcast FileScan.estimateStatistics reads fileIndex.sizeInBytes during planning, before partition filters are applied. Returning defaultSizeInBytes (the previous behavior) meant a small partitioned engine format table was never broadcast, while a full listing would defeat lazy pruning. Instead estimate the size cheaply without a full listing: walk one path down the partition tree to get the partition count from per-level fan-outs, and multiply by the average size of a few sampled leaf partitions. Falls back to defaultSizeInBytes on empty table / empty sample / overflow to avoid underestimating and wrongly broadcasting a large table. The probe uses fs.listStatus directly, so it neither materializes the full index nor increments HiveCatalogMetrics, keeping filtered queries lazy. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| } | ||
| partitionCount *= subdirs.length | ||
| leafSiblings = subdirs | ||
| path = subdirs.head.getPath |
There was a problem hiding this comment.
This estimator can substantially underestimate skewed/non-rectangular partition trees because it only follows the first directory at each level. For example, with p1=1/p2=1 containing a 1-byte file and p1=2/p2=1..100 containing 1000-byte files, the current logic sees p1 fanout 2, follows p1=1, sees only one p2 leaf, and returns about 2 bytes while the table is about 100 KB. That violates the “never underestimate and wrongly broadcast a large table” guarantee and can make Spark broadcast a table that is actually above the threshold. Could we either fall back to defaultSizeInBytes when the sampled path cannot prove the lower levels are representative, or compute a conservative upper estimate across sampled/known branches?
|
Let's use Paimon implementation. |
Purpose
Replace the eager
InMemoryFileIndex(which recursively lists all files at construction time) withLazyPartitionPruningFileIndexthat defers file listing untillistFiles()is called and prunes partition directories level-by-level using partition filters.For a table with 20×30=600 partitions, querying a single partition (
p1=1 AND p2=1) now discovers 1 file instead of 600. Range queries (p1>15) and non-leading column filters (p2=1) also benefit from per-level pruning.Controlled by
spark.paimon.format-table.engine.lazy-partition-pruning(defaulttrue). Set tofalseto fall back to eager listing.Tests