Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,24 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
Expand All @@ -18,6 +30,7 @@
import io.temporal.internal.common.SdkFlag;
import io.temporal.internal.history.VersionMarkerUtils;
import io.temporal.internal.statemachines.WorkflowStateMachines;
import io.temporal.testing.TestEnvironmentOptions;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.testing.WorkflowHistoryLoader;
import io.temporal.testing.WorkflowReplayer;
Expand All @@ -27,13 +40,16 @@
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.slf4j.Logger;

Expand Down Expand Up @@ -113,6 +129,56 @@ public void testReplayHistoryWithWaitForMarkerFlagReplaysWithoutDefaultEnable()
}
}

/**
* Regression test for the interaction between GetSystemInfo capability detection and SDK flags.
*
* <p>The base fixture is an old interleaved-update/getVersion history that fails replay unless
* replay waits for the real version marker event before resuming workflow code. That newer replay
* behavior is gated by {@link SdkFlag#VERSION_WAIT_FOR_MARKER}, so this test first edits the
* fixture in-memory to add that flag to every WorkflowTaskCompleted sdkMetadata.langUsedFlags
* field. The unproxied replay immediately after that edit proves the modified history has enough
* SDK metadata to select the fixed behavior.
*
* <p>The second replay runs the same history through a minimal gRPC proxy. The proxy forwards all
* WorkflowService RPCs to an in-memory test server except GetSystemInfo, which returns
* UNIMPLEMENTED. Current SDK code interprets that as default server capabilities. Default
* capabilities report sdkMetadata as unsupported, so the replay state machines ignore the
* langUsedFlags that are present in history and do not enable VERSION_WAIT_FOR_MARKER.
*
* <p>The intended behavior is that this replay still succeeds: a GetSystemInfo failure caused by
* an intermediary should not make a worker forget SDK flags already recorded in workflow history.
* On buggy code, this test fails with the same TMPRL1100 NonDeterministicException as the
* original unflagged fixture, which demonstrates that the proxy-induced default capabilities
* masked the recorded SDK flag.
*/
@Test
public void testGetSystemInfoUnimplementedDoesNotMaskSdkFlags() throws Exception {
WorkflowExecutionHistory history =
withSdkFlag(
WorkflowHistoryLoader.readHistoryFromResource(HISTORY_RESOURCE),
SdkFlag.VERSION_WAIT_FOR_MARKER);
assertTrue(
"The modified history must advertise VERSION_WAIT_FOR_MARKER.",
hasSdkFlag(history, SdkFlag.VERSION_WAIT_FOR_MARKER));
WorkflowReplayer.replayWorkflowExecution(history, GreetingWorkflowImpl.class);

try (TestWorkflowEnvironment backingEnvironment = TestWorkflowEnvironment.newInstance();
GetSystemInfoUnimplementedProxy proxy =
GetSystemInfoUnimplementedProxy.start(
backingEnvironment.getWorkflowServiceStubs().getRawChannel());
TestWorkflowEnvironment proxiedEnvironment =
TestWorkflowEnvironment.newInstance(
TestEnvironmentOptions.newBuilder()
.setUseExternalService(true)
.setTarget(proxy.getTarget())
.build())) {

WorkflowReplayer.replayWorkflowExecution(
history, proxiedEnvironment, GreetingWorkflowImpl.class);
assertTrue("Expected the proxy to receive GetSystemInfo.", proxy.getGetSystemInfoCalls() > 0);
}
}

public static WorkflowExecutionHistory captureReplayableHistory() {
List<SdkFlag> savedInitialFlags = WorkflowStateMachines.initialFlags;
List<SdkFlag> replayableFlags = new ArrayList<>(savedInitialFlags);
Expand Down Expand Up @@ -184,6 +250,107 @@ private static boolean hasEvent(List<HistoryEvent> events, EventType eventType)
return false;
}

private static WorkflowExecutionHistory withSdkFlag(
WorkflowExecutionHistory history, SdkFlag flag) {
io.temporal.api.history.v1.History.Builder historyBuilder = history.getHistory().toBuilder();
for (int i = 0; i < historyBuilder.getEventsCount(); i++) {
HistoryEvent.Builder event = historyBuilder.getEventsBuilder(i);
if (event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) {
continue;
}
if (!event
.getWorkflowTaskCompletedEventAttributes()
.getSdkMetadata()
.getLangUsedFlagsList()
.contains(flag.getValue())) {
event
.getWorkflowTaskCompletedEventAttributesBuilder()
.getSdkMetadataBuilder()
.addLangUsedFlags(flag.getValue());
}
}
return new WorkflowExecutionHistory(
historyBuilder.build(), history.getWorkflowExecution().getWorkflowId());
}

private static final class GetSystemInfoUnimplementedProxy implements AutoCloseable {
private final Server server;
private final AtomicInteger getSystemInfoCalls;

private GetSystemInfoUnimplementedProxy(Server server, AtomicInteger getSystemInfoCalls) {
this.server = server;
this.getSystemInfoCalls = getSystemInfoCalls;
}

static GetSystemInfoUnimplementedProxy start(Channel target) throws IOException {
AtomicInteger getSystemInfoCalls = new AtomicInteger();
Server server =
NettyServerBuilder.forPort(0)
.addService(buildProxyService(target, getSystemInfoCalls))
.build()
.start();
return new GetSystemInfoUnimplementedProxy(server, getSystemInfoCalls);
}

String getTarget() {
return "127.0.0.1:" + server.getPort();
}

int getGetSystemInfoCalls() {
return getSystemInfoCalls.get();
}

@Override
public void close() throws InterruptedException {
server.shutdownNow();
server.awaitTermination(1, TimeUnit.SECONDS);
}

private static ServerServiceDefinition buildProxyService(
Channel target, AtomicInteger getSystemInfoCalls) {
ServerServiceDefinition.Builder builder =
ServerServiceDefinition.builder(WorkflowServiceGrpc.getServiceDescriptor());
for (MethodDescriptor<?, ?> method :
WorkflowServiceGrpc.getServiceDescriptor().getMethods()) {
addProxyMethod(builder, method, target, getSystemInfoCalls);
}
return builder.build();
}

private static <ReqT, RespT> void addProxyMethod(
ServerServiceDefinition.Builder builder,
MethodDescriptor<ReqT, RespT> method,
Channel target,
AtomicInteger getSystemInfoCalls) {
if (method
.getFullMethodName()
.equals(WorkflowServiceGrpc.getGetSystemInfoMethod().getFullMethodName())) {
builder.addMethod(
method,
ServerCalls.asyncUnaryCall(
(ReqT request, StreamObserver<RespT> responseObserver) -> {
getSystemInfoCalls.incrementAndGet();
responseObserver.onError(unimplementedGetSystemInfo());
}));
return;
}

builder.addMethod(
method,
ServerCalls.asyncUnaryCall(
(ReqT request, StreamObserver<RespT> responseObserver) -> {
ClientCall<ReqT, RespT> call = target.newCall(method, CallOptions.DEFAULT);
ClientCalls.asyncUnaryCall(call, request, responseObserver);
}));
}

private static RuntimeException unimplementedGetSystemInfo() {
return Status.UNIMPLEMENTED
.withDescription("proxy intentionally hides getSystemInfo")
.asRuntimeException();
}
}

public static class Request {
private final String name;
private final OffsetDateTime date;
Expand Down
Loading