diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 1633f78eec90..f79ff888c382 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -289,7 +289,6 @@ public void addInternalProcedure(InternalProcedure interalProcedure) { if (interalProcedure == null) { return; } - interalProcedure.setState(ProcedureState.WAITING_TIMEOUT); timeoutExecutor.add(interalProcedure); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java index c998ad903c20..8ade48f40018 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java @@ -19,6 +19,10 @@ package org.apache.iotdb.confignode.procedure; +import org.apache.iotdb.confignode.procedure.state.ProcedureState; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; @@ -28,6 +32,7 @@ public class TimeoutExecutorThread extends StoppableThread { private static final int DELAY_QUEUE_TIMEOUT = 20; private final ProcedureExecutor executor; private final DelayQueue> queue = new DelayQueue<>(); + private final Set> registeredInternalProcedures = ConcurrentHashMap.newKeySet(); public TimeoutExecutorThread( ProcedureExecutor envProcedureExecutor, ThreadGroup threadGroup, String name) { @@ -38,12 +43,21 @@ public TimeoutExecutorThread( public void add(Procedure procedure) { ProcedureDelayContainer delayTask = new ProcedureDelayContainer<>(procedure); + if (procedure instanceof InternalProcedure) { + if (!registeredInternalProcedures.add(procedure)) { + return; + } + procedure.setState(ProcedureState.WAITING_TIMEOUT); + } queue.remove(delayTask); queue.add(delayTask); } public boolean remove(Procedure procedure) { - return queue.remove(new ProcedureDelayContainer<>(procedure)) || procedure.isFinished(); + boolean unregistered = + procedure instanceof InternalProcedure && registeredInternalProcedures.remove(procedure); + boolean removed = queue.remove(new ProcedureDelayContainer<>(procedure)); + return unregistered || removed || procedure.isFinished(); } private ProcedureDelayContainer takeQuietly() { @@ -64,12 +78,12 @@ public void run() { } Procedure procedure = delayTask.getProcedure(); if (procedure instanceof InternalProcedure) { - if (procedure.isFinished()) { + if (!registeredInternalProcedures.contains(procedure) || procedure.isFinished()) { continue; } InternalProcedure internal = (InternalProcedure) procedure; internal.periodicExecute(executor.getEnvironment()); - if (!procedure.isFinished()) { + if (!procedure.isFinished() && registeredInternalProcedures.contains(procedure)) { procedure.updateTimestamp(); queue.add(delayTask); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java index ba5f635507a4..75a7168c1f61 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java @@ -127,6 +127,16 @@ public void testInternalProcedureCanBeDeduplicatedAndRemoved() throws Interrupte Assert.assertFalse(internalProcedure.awaitExecution(300, TimeUnit.MILLISECONDS)); Assert.assertEquals(1, internalProcedure.getExecutionCount()); + procExecutor.addInternalProcedure(internalProcedure); + Assert.assertFalse(internalProcedure.awaitExecution(300, TimeUnit.MILLISECONDS)); + Assert.assertEquals(1, internalProcedure.getExecutionCount()); + + Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure)); + + procExecutor.addInternalProcedure(internalProcedure); + Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS)); + Assert.assertEquals(2, internalProcedure.getExecutionCount()); + Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure)); }