[spark] support lateral inner join for vector search#8252
Conversation
8aa9c09 to
774c9b6
Compare
|
|
||
| override protected def doExecute(): RDD[InternalRow] = { | ||
| child.execute().mapPartitions { | ||
| outerRows => |
There was a problem hiding this comment.
Can batch queries be supported? Batch queries are crucial for performance. You can take a look to benchmark in https://github.com/apache/paimon-vector-index
There was a problem hiding this comment.
Thanks for your reminder. I'll refine it in batch mode later.
|
Please fix test failures. |
4697f65 to
c23c76b
Compare
a1e3745 to
835b339
Compare
| _.toPaimonDataField)).asJava) | ||
| } | ||
| val sparkRow = SparkInternalRow.create(resultRowType) | ||
| val vectorSearchBuilder = innerTable |
There was a problem hiding this comment.
Normal Spark vector_search scans apply pushed partition/data filters before top-k (PaimonBaseScan.evalVectorSearch passes pushedPartitionFilters/pushedDataFilters into the builder). This lateral executor builds the BatchVectorSearchBuilder here without carrying predicates from the search side; PushDownLateralVectorSearchFilter only pushes predicates that reference the left child, so predicates on r.dt or other searched-table columns stay above LateralVectorSearch. A query like ... JOIN LATERAL (...) r WHERE r.dt = '20260420' will pick topK over all partitions and then filter the joined rows, which can return fewer or wrong rows compared with non-lateral vector_search(...) WHERE dt = .... Please preserve search-side filters and apply them via withPartitionFilter/withFilter before newVectorScan()/readBatch(), or reject such predicates explicitly.
|
|
||
| scan.plan().splits().asScala.iterator.flatMap { | ||
| split => | ||
| val reader = |
There was a problem hiding this comment.
This reader is only closed when this inner iterator is fully exhausted. If a downstream operator short-circuits consumption, for example LIMIT 1/take, or if the task is interrupted, Spark can stop pulling rows before hasNext returns false, leaving the current PaimonRecordReaderIterator and its underlying RecordReader open. Please register a TaskContext completion listener or wrap the returned iterator so the current reader is closed on task completion/cancellation as well as normal exhaustion.
07428ac to
c9ee0dd
Compare
| relation.output.filter(projectReferences.contains) | ||
| } | ||
|
|
||
| private def extractDynamicVectorSearch(plan: LogicalPlan) |
There was a problem hiding this comment.
Filters inside the lateral subquery are still not handled here. For example, FROM q, LATERAL (SELECT gid FROM vector_search('t', 'embs', q.embs, 5) WHERE dt = '20260608') r is resolved as a right plan containing Filter(..., DynamicVectorSearchRelation) (usually under Project). This extractor falls through to None, leaving the dynamic relation with an outer reference but no LateralVectorSearch physical path. Please extract Filter nodes here and append their conditions to searchFilters (or reject them explicitly) in addition to the outer-WHERE pushdown case.
| val (pushDownToLeft, otherPredicates) = predicates.partition { | ||
| predicate => predicate.deterministic && predicate.references.subsetOf(lvs.child.outputSet) | ||
| } | ||
| val (pushDownToSearch, stayUp) = otherPredicates.partition { |
There was a problem hiding this comment.
This removes every deterministic predicate that references only search-side output, but convertSearchFilters() later throws if Spark cannot translate the rewritten expression into a Paimon predicate. A valid query such as WHERE r.gid + 1 > 10, or a filter on an expression alias from the lateral subquery, would now fail during execution instead of being evaluated above the lateral result. Please keep untranslatable predicates in stayUp, or restrict this pushdown to simple field predicates that are known to be convertible before dropping the upper filter.
c9ee0dd to
aa7dcc3
Compare
| } | ||
|
|
||
| def hasOuterReference(argsWithoutTable: Seq[Expression]): Boolean = { | ||
| val queryVector = argsWithoutTable(1) |
There was a problem hiding this comment.
hasOuterReference accesses argsWithoutTable(1) before verifying the arity. For invalid calls such as vector_search('t', 'embs'), this now throws IndexOutOfBoundsException during resolution instead of the existing helpful vector_search needs three or four parameters... error from createVectorSearch/createDynamicVectorSearch. Please check the size before reading the query-vector argument.
Purpose
Purpose: Support lateral join for vector search on spark.
Linked issue: #8251
Tests
Add vector search with lateral join on org.apache.paimon.spark.SparkMultimodalITCase#testVector、org.apache.spark.sql.test.SQLTestUtils#test("lateral vector search preserves subquery alias qualifiers")