From 16b5e137fb5ab6cea486dc6a1317d8d87b7d817c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 26 Jun 2026 14:23:12 +0800 Subject: [PATCH] Fix internal procedure deduplication (#18036) (cherry picked from commit 7f41f064858f9376d7c6f7e443dc541918d20ac8) --- .../procedure/ProcedureExecutor.java | 1 - .../procedure/TimeoutExecutorThread.java | 20 ++++++++++++++++--- .../procedure/TestProcedureExecutor.java | 10 ++++++++++ 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 1633f78eec903..f79ff888c3825 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -289,7 +289,6 @@ public void addInternalProcedure(InternalProcedure interalProcedure) { if (interalProcedure == null) { return; } - interalProcedure.setState(ProcedureState.WAITING_TIMEOUT); timeoutExecutor.add(interalProcedure); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java index c998ad903c208..8ade48f40018b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java @@ -19,6 +19,10 @@ package org.apache.iotdb.confignode.procedure; +import org.apache.iotdb.confignode.procedure.state.ProcedureState; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; @@ -28,6 +32,7 @@ public class TimeoutExecutorThread extends StoppableThread { private static final int DELAY_QUEUE_TIMEOUT = 20; private final ProcedureExecutor executor; private final DelayQueue> queue = new DelayQueue<>(); + private final Set> registeredInternalProcedures = ConcurrentHashMap.newKeySet(); public TimeoutExecutorThread( ProcedureExecutor envProcedureExecutor, ThreadGroup threadGroup, String name) { @@ -38,12 +43,21 @@ public TimeoutExecutorThread( public void add(Procedure procedure) { ProcedureDelayContainer delayTask = new ProcedureDelayContainer<>(procedure); + if (procedure instanceof InternalProcedure) { + if (!registeredInternalProcedures.add(procedure)) { + return; + } + procedure.setState(ProcedureState.WAITING_TIMEOUT); + } queue.remove(delayTask); queue.add(delayTask); } public boolean remove(Procedure procedure) { - return queue.remove(new ProcedureDelayContainer<>(procedure)) || procedure.isFinished(); + boolean unregistered = + procedure instanceof InternalProcedure && registeredInternalProcedures.remove(procedure); + boolean removed = queue.remove(new ProcedureDelayContainer<>(procedure)); + return unregistered || removed || procedure.isFinished(); } private ProcedureDelayContainer takeQuietly() { @@ -64,12 +78,12 @@ public void run() { } Procedure procedure = delayTask.getProcedure(); if (procedure instanceof InternalProcedure) { - if (procedure.isFinished()) { + if (!registeredInternalProcedures.contains(procedure) || procedure.isFinished()) { continue; } InternalProcedure internal = (InternalProcedure) procedure; internal.periodicExecute(executor.getEnvironment()); - if (!procedure.isFinished()) { + if (!procedure.isFinished() && registeredInternalProcedures.contains(procedure)) { procedure.updateTimestamp(); queue.add(delayTask); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java index ba5f635507a4a..75a7168c1f616 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java @@ -127,6 +127,16 @@ public void testInternalProcedureCanBeDeduplicatedAndRemoved() throws Interrupte Assert.assertFalse(internalProcedure.awaitExecution(300, TimeUnit.MILLISECONDS)); Assert.assertEquals(1, internalProcedure.getExecutionCount()); + procExecutor.addInternalProcedure(internalProcedure); + Assert.assertFalse(internalProcedure.awaitExecution(300, TimeUnit.MILLISECONDS)); + Assert.assertEquals(1, internalProcedure.getExecutionCount()); + + Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure)); + + procExecutor.addInternalProcedure(internalProcedure); + Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS)); + Assert.assertEquals(2, internalProcedure.getExecutionCount()); + Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure)); }