Skip to content

[spark] support lateral inner join for vector search#8252

Merged
JingsongLi merged 1 commit into
apache:masterfrom
Stefanietry:support_lateral_join_for_vector_search
Jun 24, 2026
Merged

[spark] support lateral inner join for vector search#8252
JingsongLi merged 1 commit into
apache:masterfrom
Stefanietry:support_lateral_join_for_vector_search

Conversation

@Stefanietry

Copy link
Copy Markdown
Contributor

Purpose
Purpose: Support lateral join for vector search on spark.
Linked issue: #8251

Tests
Add vector search with lateral join on org.apache.paimon.spark.SparkMultimodalITCase#testVector、org.apache.spark.sql.test.SQLTestUtils#test("lateral vector search preserves subquery alias qualifiers")

@Stefanietry Stefanietry force-pushed the support_lateral_join_for_vector_search branch from 8aa9c09 to 774c9b6 Compare June 16, 2026 08:05

override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions {
outerRows =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can batch queries be supported? Batch queries are crucial for performance. You can take a look to benchmark in https://github.com/apache/paimon-vector-index

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reminder. I'll refine it in batch mode later.

@JingsongLi

Copy link
Copy Markdown
Contributor

Please fix test failures.

@Stefanietry Stefanietry force-pushed the support_lateral_join_for_vector_search branch 2 times, most recently from 4697f65 to c23c76b Compare June 22, 2026 15:04
@Stefanietry Stefanietry reopened this Jun 22, 2026
@Stefanietry Stefanietry force-pushed the support_lateral_join_for_vector_search branch 2 times, most recently from a1e3745 to 835b339 Compare June 23, 2026 05:38
_.toPaimonDataField)).asJava)
}
val sparkRow = SparkInternalRow.create(resultRowType)
val vectorSearchBuilder = innerTable

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normal Spark vector_search scans apply pushed partition/data filters before top-k (PaimonBaseScan.evalVectorSearch passes pushedPartitionFilters/pushedDataFilters into the builder). This lateral executor builds the BatchVectorSearchBuilder here without carrying predicates from the search side; PushDownLateralVectorSearchFilter only pushes predicates that reference the left child, so predicates on r.dt or other searched-table columns stay above LateralVectorSearch. A query like ... JOIN LATERAL (...) r WHERE r.dt = '20260420' will pick topK over all partitions and then filter the joined rows, which can return fewer or wrong rows compared with non-lateral vector_search(...) WHERE dt = .... Please preserve search-side filters and apply them via withPartitionFilter/withFilter before newVectorScan()/readBatch(), or reject such predicates explicitly.


scan.plan().splits().asScala.iterator.flatMap {
split =>
val reader =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reader is only closed when this inner iterator is fully exhausted. If a downstream operator short-circuits consumption, for example LIMIT 1/take, or if the task is interrupted, Spark can stop pulling rows before hasNext returns false, leaving the current PaimonRecordReaderIterator and its underlying RecordReader open. Please register a TaskContext completion listener or wrap the returned iterator so the current reader is closed on task completion/cancellation as well as normal exhaustion.

@Stefanietry Stefanietry force-pushed the support_lateral_join_for_vector_search branch 2 times, most recently from 07428ac to c9ee0dd Compare June 23, 2026 11:41
relation.output.filter(projectReferences.contains)
}

private def extractDynamicVectorSearch(plan: LogicalPlan)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filters inside the lateral subquery are still not handled here. For example, FROM q, LATERAL (SELECT gid FROM vector_search('t', 'embs', q.embs, 5) WHERE dt = '20260608') r is resolved as a right plan containing Filter(..., DynamicVectorSearchRelation) (usually under Project). This extractor falls through to None, leaving the dynamic relation with an outer reference but no LateralVectorSearch physical path. Please extract Filter nodes here and append their conditions to searchFilters (or reject them explicitly) in addition to the outer-WHERE pushdown case.

val (pushDownToLeft, otherPredicates) = predicates.partition {
predicate => predicate.deterministic && predicate.references.subsetOf(lvs.child.outputSet)
}
val (pushDownToSearch, stayUp) = otherPredicates.partition {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes every deterministic predicate that references only search-side output, but convertSearchFilters() later throws if Spark cannot translate the rewritten expression into a Paimon predicate. A valid query such as WHERE r.gid + 1 > 10, or a filter on an expression alias from the lateral subquery, would now fail during execution instead of being evaluated above the lateral result. Please keep untranslatable predicates in stayUp, or restrict this pushdown to simple field predicates that are known to be convertible before dropping the upper filter.

@Stefanietry Stefanietry force-pushed the support_lateral_join_for_vector_search branch from c9ee0dd to aa7dcc3 Compare June 23, 2026 16:06
}

def hasOuterReference(argsWithoutTable: Seq[Expression]): Boolean = {
val queryVector = argsWithoutTable(1)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasOuterReference accesses argsWithoutTable(1) before verifying the arity. For invalid calls such as vector_search('t', 'embs'), this now throws IndexOutOfBoundsException during resolution instead of the existing helpful vector_search needs three or four parameters... error from createVectorSearch/createDynamicVectorSearch. Please check the size before reading the query-vector argument.

@JingsongLi JingsongLi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit ffaebae into apache:master Jun 24, 2026
13 checks passed
@Stefanietry Stefanietry deleted the support_lateral_join_for_vector_search branch June 24, 2026 04:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants