[flink] Support stream read Chain Table#8262
Conversation
49a4f20 to
f2dc523
Compare
| @Nullable | ||
| @Override | ||
| public Long checkpoint() { | ||
| return nextDeltaSnapshotId; |
There was a problem hiding this comment.
[Blocking] checkpoint() returns a stale snapshot id in Phase 2, causing duplicate consumption after recovery.
checkpoint() returns nextDeltaSnapshotId, which is set only once in captureDeltaPosition() (the Phase-1 boundary, latestId + 1). In Phase 2, plan() delegates to deltaStreamScan.plan(), which advances the delta scan's internal cursor, but nextDeltaSnapshotId is never written back — so checkpoint() is frozen at the boundary.
The Flink enumerator persists scan.checkpoint() on every checkpoint (ContinuousFileSplitEnumerator#snapshotState). So during Phase 2 it keeps storing the boundary value; on failure, restore(boundary) makes the delta stream re-read every snapshot consumed since Phase 1 → large-scale duplicates.
Note that watermark() and startingContext() already delegate to deltaStreamScan when startingDone — checkpoint() is the one that was missed.
Secondary effect: because checkpoint() never changes, ContinuousFileSplitEnumerator#scanNextSnapshot never increments handledSnapshotCount, so scan.max-snapshot-count backpressure is silently defeated (the splitMaxNum guard still applies). The same fix resolves this.
Suggested fix:
@Nullable
@Override
public Long checkpoint() {
if (startingDone) {
return deltaStreamScan.checkpoint();
}
return nextDeltaSnapshotId;
}| * Converts a {@link DataSplit} to a {@link ChainSplit} where all files belong to the given | ||
| * branch. The partition value is preserved as-is (no rewriting). | ||
| */ | ||
| private static ChainSplit dataSplitToChainSplit(DataSplit dataSplit, String branch) { |
There was a problem hiding this comment.
[Maintainability] ChainSplit construction is duplicated in three places.
The per-file loop that builds fileBranchMapping + fileBucketPathMapping and then calls new ChainSplit(...) appears here in dataSplitToChainSplit(), and twice in ChainGroupReadTable (plan() around L249 and around L408). Any future change to how a ChainSplit is built (e.g. adding a field) has to be applied in all three; missing one would silently diverge the read/write paths.
Suggest extracting a single factory, e.g. ChainSplit.from(DataSplit dataSplit, String branch), and using it everywhere.
|
|
||
| // 2. Read all snapshot branch data, grouped by partition. | ||
| // Reuse batchScan.mainScan which has predicates/shard already applied. | ||
| Map<BinaryRow, List<DataSplit>> snapshotSplitsByPartition = |
There was a problem hiding this comment.
[Performance] planStarting() reads file-level splits for every snapshot partition, then keeps only the latest per group.
groupByPartition(batchScan.mainScan) runs a full batch scan that reads the manifests and file lists of all snapshot-branch partitions, but only the latest chain partition per group is kept (steps 3-4). With many historical full-dump partitions (e.g. a daily ODS dump over a year), this reads hundreds of partitions' metadata to keep one — slow startup and heavy manifest I/O (especially on object stores).
The batch path already does the cheaper thing: ChainGroupReadTable.plan() uses newChainPartitionListingScan(...).listPartitions() (partition metadata only) to locate partitions before scanning files. Suggest the same here: list partitions first, pick the latest per group, then withPartitionFilter(...) to scan only that partition. The pinned-delta scan (L166) has the same pattern.
| } | ||
|
|
||
| @Override | ||
| public void restore(@Nullable Long nextSnapshotId) { |
There was a problem hiding this comment.
[Minor] restore(null) does not reset startingDone, so it cannot actually re-run Phase 1.
When nextSnapshotId == null the if body is skipped, leaving startingDone unchanged. On an instance that already finished Phase 1 (startingDone == true), a subsequent restore(null) + plan() enters Phase 2 (deltaStreamScan.plan()) instead of re-running planStarting().
In the Flink runtime this is harmless because restore is always called on a fresh scan instance (startingDone defaults to false). But it contradicts the intent of testStreamingReadRestoreAfterNewData, whose comment states "restore(null) re-runs Phase 1". Suggest resetting startingDone = false here so the documented semantics hold.
| if (bucketDir.startsWith("bucket-")) { | ||
| try { | ||
| bucketId = Integer.parseInt(bucketDir.substring("bucket-".length())); | ||
| } catch (NumberFormatException ignored) { |
There was a problem hiding this comment.
[Minor] Silent catch hides bucket-id parse failures.
catch (NumberFormatException ignored) {} swallows the failure and falls back to bucketId = 0, which would route all affected splits to the same reader subtask (skew) with no trace. The bucket-{N} layout is a stable convention so this shouldn't happen in practice, but a LOG.warn(...) here would make any future regression diagnosable rather than silent.
| */ | ||
| @Test | ||
| @Timeout(180) | ||
| public void testStreamingReadChainTableStatefulRestart() throws Exception { |
There was a problem hiding this comment.
[Test gap] This stateful-restart test doesn't exercise the checkpoint/restore path that actually regresses.
The checkpoint here is triggered before Phase 2 has consumed any new delta snapshot (no delta is written between the Phase-1 write at L1009 and the checkpoint at L1024). So the frozen nextDeltaSnapshotId happens to equal the real delta cursor, and the "no duplicates" assertion (L1088) passes even if checkpoint() is broken.
The checkpoint() regression (see comment on ChainTableStreamScan#checkpoint) only manifests when Phase 2 has already advanced past the boundary. Suggest adding a case: after Phase 1, write delta and let Phase 2 consume ≥1 snapshot, then checkpoint → cancel → restart, and assert no duplicates.
| JobClient jobClient = tableResult.getJobClient().get(); | ||
|
|
||
| // Wait for data to flow to the sink | ||
| Thread.sleep(5000); |
There was a problem hiding this comment.
[Test, minor] Timing-based synchronization is flaky-prone, and debug prints are left in.
This class uses ~19 Thread.sleep(2000-5000) calls to synchronize with the streaming job, which tends to be flaky under CI load; there are also a few leftover System.err.println("[TEST] ...") debug statements. Consider polling for a condition instead of fixed sleeps, and removing the prints.
3e00ac2 to
6d67d97
Compare
| Map<Object, BinaryRow> latestChainPartitionPerGroup = new HashMap<>(); | ||
| if (chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null) { | ||
| DataTableScan partitionListingScan = chainGroupReadTable.wrapped.newScan(); | ||
| applyPredicatesAndShard(partitionListingScan); |
There was a problem hiding this comment.
Phase 1 uses this scan to discover the latest snapshot partition per group, but applyPredicatesAndShard also applies row predicates before listPartitions(). That makes the chain boundary depend on the query predicate instead of the snapshot branch state. For example, if the latest snapshot partition no longer has k = 1 but an older delta partition still does, SELECT ... WHERE k = 1 can make this listing miss the latest snapshot partition and then include the old delta row, even though that partition should be considered outdated by the latest full snapshot. Please keep the partition-discovery scan free of row/shard filters and apply those predicates only when scanning the already-selected snapshot/delta splits.
| protected int assignSuggestedTask(FileStoreSourceSplit split) { | ||
| if (split.split() instanceof DataSplit) { | ||
| return assignSuggestedTask((DataSplit) split.split()); | ||
| } else if (split.split() instanceof ChainSplit) { |
There was a problem hiding this comment.
This handles ChainSplit for the ordinary continuous enumerator, but the checkpoint-align source path still assumes all splits are DataSplits (AlignedContinuousFileSplitEnumerator#addSplits casts to DataSplit when grouping by snapshot). Since chain-table streaming Phase 1 now emits ChainSplits, a query with source.checkpoint-align.enabled=true will fail with ClassCastException before assignment. Please either add ChainSplit support to the aligned enumerator/read operator path as well, or reject checkpoint-align mode for chain-table streaming explicitly.
2354828 to
16aeff6
Compare
| // discarded (only the latest per group is kept). | ||
| Map<Object, BinaryRow> latestChainPartitionPerGroup = new HashMap<>(); | ||
| if (chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null) { | ||
| DataTableScan partitionListingScan = chainGroupReadTable.wrapped.newScan(); |
There was a problem hiding this comment.
Phase 1 pins the delta branch at latestId, but the snapshot branch is read from whatever is latest when these scans run. That makes the advertised Phase 1/Phase 2 boundary non-deterministic if a snapshot-branch overwrite commits while a streaming job is starting. For example, after we capture delta at 20250808, a snapshot commit for 20250809 can land before wrapped.newScan() lists partitions; Phase 1 will then exclude the 20250808 delta as older than the new snapshot, but Phase 2 starts from delta snapshot latestId + 1, so the 20250808 delta is never emitted. The snapshot branch should be captured/pinned to a specific snapshot id before this listing (similar to the delta branch) and both partition listing and file scan should use that pinned snapshot.
There was a problem hiding this comment.
Thanks for the comment. I now added logics to pin the snapshot ids of the two branches at the beginning of the code. In case race condition might still happen between the two pins for snapshot branch, double checks and retry logics are also used.
| if (!startingDone) { | ||
| return planStarting(); | ||
| } | ||
| TableScan.Plan plan = deltaStreamScan.plan(); |
There was a problem hiding this comment.
After Phase 1, here returns deltaStreamScan.plan() directly, so the stream emits normal delta DataSplits. But ChainTableFileStoreTable only overrides newStreamScan() and inherits FallbackReadFileStoreTable.newRead(), whose non-FallbackSplit path falls back to mainRead.createReader(split). That bypasses the branch-aware ChainGroupReadTable.Read logic added in this PR. This is especially risky when snapshot/delta/main branch schemas diverge or when branch-specific schema lookup is needed. Maybe can pair the new stream scan with a chain-aware newRead() in ChainTableFileStoreTable, or convert phase-2 delta splits into branch-tagged ChainSplits before returning them?
There was a problem hiding this comment.
Thanks for the comment. I've converted the splits to ChainSplits in phase-2.
35e4de2 to
2fa6c38
Compare
2fa6c38 to
9098815
Compare
| TableScanUtils.streamingReadingValidate(table); | ||
|
|
||
| if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) { | ||
| if (conf.get(CoreOptions.CHAIN_TABLE_ENABLED)) { |
There was a problem hiding this comment.
This guard only covers checkpoint-align mode, but a chain-table streaming read can still enter the dedicated split-generation path below when the query sets consumer-id with consumer.ignore-progress=true (and the default consumer.mode=exactly-once). ChainTableFileStoreTable.newStreamScan() allows that combination because there is no existing consumer progress, so phase 1 emits ChainSplits. The dedicated MonitorSource path is not chain-aware: MonitorSource.shuffleOrdered and ReadOperator.processElement both cast every split to DataSplit, which will fail with ClassCastException before any rows are read. Please either reject consumer-id / dedicated split generation for chain-table streaming here, or make the MonitorSource reader/shuffle path handle ChainSplit as well.
There was a problem hiding this comment.
Thanks for the comment. I've added the check.
Given that consumer mode (core level) and checkpoint alignment (flink engine level) are configurations at different levels, I choose to place the configuration checks separately in FlinkSourceBuilder and ChainTableFileStoreTable.
| if (predicate == null) { | ||
| return this; | ||
| } | ||
| if (!partitionKeys.isEmpty() |
There was a problem hiding this comment.
[Blocking] Mixed predicates can still push partition filters into the chain-table stream scan.
This guard only rejects predicates whose whole expression is partition-only. In Flink, filters are combined before ReadBuilder creates the stream scan, so a query such as WHERE dt = '20250808' AND v = 'hello' reaches this method as one AND predicate. The visitor returns false because v is non-partition, so the scan accepts the predicate.
Later applyPredicatesAndShard() calls DataTableScan.withFilter(...) on the pinned snapshot/delta scans. SnapshotReaderImpl.withFilter() then extracts the dt conjunct into scan.withPartitionFilter(...); for the snapshot scan this can even replace the already-selected latest-partition filter. As a result Phase 1 computes/reads the chain boundary under a user partition filter instead of the full chain state, which can include stale delta data or skip the intended snapshot data. It also contradicts the documented behavior that partition filters in streaming reads should throw.
Please reject any predicate that contains a partition conjunct, not only predicates made entirely of partition fields (for example, use/expose the same partition-extraction logic as SnapshotReaderImpl and throw when the partition side is present) before storing or applying the predicate.
There was a problem hiding this comment.
Thanks for pointing this out. I have rejected any predicate that might contain references to partition columns.
| data at that point in time. | ||
| - **Delta branch** receives incremental changes between snapshots (e.g., a batch job writes the | ||
| current day's new/updated records via `INSERT INTO t$branch_delta`). Each delta partition | ||
| contains only the changes for that period. The delta branch must have a |
There was a problem hiding this comment.
This requirement no longer matches the implementation or the new coverage below. testStreamingReadWithNoChangelogProducer creates a chain table with the default changelog-producer=none and verifies that streaming read works by emitting the normal delta data-file records. If none is intentionally supported, this paragraph should say that a changelog producer is required only when users need changelog semantics (for example update-before/update-after records), not for chain-table streaming read to work at all.
| not detected until the streaming job is restarted. | ||
| - The chain-table-aware streaming scan only supports the default startup mode (`latest-full`). | ||
| When the user specifies an explicit starting position — such as `scan.snapshot-id`, | ||
| `scan.timestamp-millis`, `scan.mode = 'latest'`, or `consumer-id` with existing progress — |
There was a problem hiding this comment.
This is a bit narrower than the code. ChainTableFileStoreTable.newStreamScan() rejects any non-null consumer-id, and the new testStreamingReadRejectsConsumerMode also covers consumer.ignore-progress=true as rejected. Please document this as consumer-id in general (or adjust the implementation if only existing consumer progress should be unsupported), otherwise users may expect a fresh/ignore-progress consumer to work.
Purpose
Chain Table (
chain-table.enabled=true) separates data into asnapshotbranch (batch-imported full partitions) and adeltabranch (incremental updates). Prior to this change, streaming read was not supported because the standardDataTableStreamScanis unaware of the two-branch architecture.This PR introduces
ChainTableFileStoreTable(a wrapper overFallbackReadFileStoreTable) andChainTableStreamScanwhich implements a two-phase streaming scan: Phase 1 does a full load by reading delta data pinned to the current snapshot and merging snapshot files for overlapping partitions; Phase 2 incrementally monitors the delta branch only, returningDataSplit(isStreaming=true)for changelog passthrough. The snapshot-pinning strategy makes the Phase 1 / Phase 2 boundary deterministic — no overlap or data loss regardless of concurrent commits.Tests
Added
FlinkChainTableITCasewith 16 tests (all passing, ~75s):changelog-producer=inputWHEREpredicate forwarding,withShardforwardingscan.mode=latestbypass,changelog-producer=nonerejectionrestore(id, scanAll=true)andrestore(null, scanAll=true)state resetchain-partition-keysgroup partition streaming