-
Notifications
You must be signed in to change notification settings - Fork 220
Add extstore s3 driver #2907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
cconstable
wants to merge
1
commit into
master
Choose a base branch
from
extstore-s3-driver
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Add extstore s3 driver #2907
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
19 changes: 19 additions & 0 deletions
19
contrib/temporal-payload-storage-s3driver-awssdkv2/build.gradle
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| description = '''Temporal Java SDK External Storage S3 Driver - AWS SDK v2 Client''' | ||
|
|
||
| ext { | ||
| awsSdkVersion = '2.31.0' | ||
| } | ||
|
|
||
| dependencies { | ||
| api project(':temporal-payload-storage-s3driver') | ||
| api platform("software.amazon.awssdk:bom:$awsSdkVersion") | ||
| api "software.amazon.awssdk:s3" | ||
|
|
||
| // For the @Experimental annotation only. | ||
| compileOnly project(':temporal-sdk') | ||
|
|
||
| testImplementation project(':temporal-payload-storage-s3driver') | ||
| testImplementation "junit:junit:${junitVersion}" | ||
| testImplementation "org.mockito:mockito-core:${mockitoVersion}" | ||
| testRuntimeOnly group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}" | ||
| } |
110 changes: 110 additions & 0 deletions
110
...kv2/src/main/java/io/temporal/payload/storage/s3driver/awssdkv2/S3AsyncClientAdapter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,110 @@ | ||
| package io.temporal.payload.storage.s3driver.awssdkv2; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
| import io.temporal.payload.storage.s3driver.S3StorageDriverClient; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionException; | ||
| import javax.annotation.Nonnull; | ||
| import software.amazon.awssdk.core.ResponseBytes; | ||
| import software.amazon.awssdk.core.async.AsyncRequestBody; | ||
| import software.amazon.awssdk.core.async.AsyncResponseTransformer; | ||
| import software.amazon.awssdk.regions.Region; | ||
| import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
| import software.amazon.awssdk.services.s3.model.GetObjectRequest; | ||
| import software.amazon.awssdk.services.s3.model.GetObjectResponse; | ||
| import software.amazon.awssdk.services.s3.model.HeadObjectRequest; | ||
| import software.amazon.awssdk.services.s3.model.HeadObjectResponse; | ||
| import software.amazon.awssdk.services.s3.model.NoSuchKeyException; | ||
| import software.amazon.awssdk.services.s3.model.PutObjectRequest; | ||
| import software.amazon.awssdk.services.s3.model.PutObjectResponse; | ||
| import software.amazon.awssdk.services.s3.model.S3Exception; | ||
|
|
||
| /** | ||
| * {@link S3StorageDriverClient} backed by the AWS SDK for Java v2 {@link S3AsyncClient}. The | ||
| * wrapped client must be configured with credentials and a region by the caller. | ||
| */ | ||
| @Experimental | ||
| public final class S3AsyncClientAdapter implements S3StorageDriverClient { | ||
| private final S3AsyncClient client; | ||
|
|
||
| public S3AsyncClientAdapter(@Nonnull S3AsyncClient client) { | ||
| this.client = Objects.requireNonNull(client, "client"); | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public CompletableFuture<Void> putObject( | ||
| @Nonnull String bucket, @Nonnull String key, @Nonnull byte[] data) { | ||
| CompletableFuture<PutObjectResponse> request = | ||
| client.putObject( | ||
| PutObjectRequest.builder().bucket(bucket).key(key).build(), | ||
| AsyncRequestBody.fromBytesUnsafe(data)); // avoids a defensive copy | ||
| return abortRequestOnCancel(request, request.thenApply(response -> (Void) null)); | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public CompletableFuture<Boolean> objectExists(@Nonnull String bucket, @Nonnull String key) { | ||
| CompletableFuture<HeadObjectResponse> request = | ||
| client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build()); | ||
| return abortRequestOnCancel( | ||
| request, | ||
| request.handle( | ||
| (response, ex) -> { | ||
| if (ex == null) { | ||
| return true; | ||
| } | ||
| Throwable cause = | ||
| (ex instanceof CompletionException && ex.getCause() != null) ? ex.getCause() : ex; | ||
| if (cause instanceof NoSuchKeyException) { | ||
| return false; | ||
| } | ||
| if (cause instanceof S3Exception && ((S3Exception) cause).statusCode() == 404) { | ||
| return false; | ||
| } | ||
| if (cause instanceof RuntimeException) { | ||
| throw (RuntimeException) cause; | ||
| } | ||
| throw new RuntimeException(cause); | ||
| })); | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public CompletableFuture<byte[]> getObject(@Nonnull String bucket, @Nonnull String key) { | ||
| CompletableFuture<ResponseBytes<GetObjectResponse>> request = | ||
| client.getObject( | ||
| GetObjectRequest.builder().bucket(bucket).key(key).build(), | ||
| AsyncResponseTransformer.toBytes()); | ||
| return abortRequestOnCancel(request, request.thenApply(ResponseBytes::asByteArrayUnsafe)); | ||
| } | ||
|
|
||
| /** | ||
| * Returns {@code result}, wired so that cancelling it cancels the underlying {@code request}. The | ||
| * AWS SDK aborts an async request when the future it returns is cancelled. Cancellation does not | ||
| * otherwise propagate across the {@code thenApply}/{@code handle} boundary. | ||
| */ | ||
| private static <T> CompletableFuture<T> abortRequestOnCancel( | ||
| CompletableFuture<?> request, CompletableFuture<T> result) { | ||
| result.whenComplete( | ||
| (value, ex) -> { | ||
| if (result.isCancelled()) { | ||
| request.cancel(true); | ||
| } | ||
| }); | ||
| return result; | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public Map<String, String> describe() { | ||
| Region region = client.serviceClientConfiguration().region(); | ||
| if (region == null) { | ||
| return Collections.emptyMap(); | ||
| } | ||
| return Collections.singletonMap("client_region", region.id()); | ||
| } | ||
| } |
38 changes: 38 additions & 0 deletions
38
...src/test/java/io/temporal/payload/storage/s3driver/awssdkv2/S3AsyncClientAdapterTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| package io.temporal.payload.storage.s3driver.awssdkv2; | ||
|
|
||
| import static org.junit.Assert.assertFalse; | ||
| import static org.junit.Assert.assertTrue; | ||
| import static org.mockito.ArgumentMatchers.any; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
| import org.junit.Test; | ||
| import software.amazon.awssdk.core.async.AsyncRequestBody; | ||
| import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
| import software.amazon.awssdk.services.s3.model.PutObjectRequest; | ||
| import software.amazon.awssdk.services.s3.model.PutObjectResponse; | ||
|
|
||
| public class S3AsyncClientAdapterTest { | ||
|
|
||
| /** | ||
| * Cancelling the future the adapter returns must abort the underlying AWS request. The adapter | ||
| * wraps the AWS future with {@code thenApply}, which does not propagate cancellation upstream, so | ||
| * this verifies the explicit forwarding does its job. | ||
| */ | ||
| @Test | ||
| public void cancellingReturnedFutureAbortsTheUnderlyingRequest() { | ||
| S3AsyncClient s3 = mock(S3AsyncClient.class); | ||
| CompletableFuture<PutObjectResponse> awsRequest = new CompletableFuture<>(); | ||
| when(s3.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))) | ||
| .thenReturn(awsRequest); | ||
|
|
||
| CompletableFuture<Void> result = | ||
| new S3AsyncClientAdapter(s3).putObject("bucket", "key", new byte[] {1, 2, 3}); | ||
|
|
||
| assertFalse(awsRequest.isCancelled()); | ||
| result.cancel(true); | ||
| assertTrue( | ||
| "cancelling the adapter's future should abort the AWS request", awsRequest.isCancelled()); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| description = '''Temporal Java SDK External Storage S3 Driver''' | ||
|
|
||
| dependencies { | ||
| compileOnly project(':temporal-serviceclient') | ||
| compileOnly project(':temporal-sdk') | ||
|
|
||
| testImplementation project(':temporal-serviceclient') | ||
| testImplementation project(':temporal-sdk') | ||
| testImplementation "junit:junit:${junitVersion}" | ||
| testRuntimeOnly group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}" | ||
| } |
18 changes: 18 additions & 0 deletions
18
...d-storage-s3driver/src/main/java/io/temporal/payload/storage/s3driver/BucketResolver.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package io.temporal.payload.storage.s3driver; | ||
|
|
||
| import io.temporal.api.common.v1.Payload; | ||
| import io.temporal.common.Experimental; | ||
| import io.temporal.payload.storage.StorageDriverStoreContext; | ||
| import javax.annotation.Nonnull; | ||
|
|
||
| /** | ||
| * Resolves the target S3 bucket for a payload. Use {@link | ||
| * S3StorageDriver.Builder#setBucket(String)} for a fixed bucket, or supply a resolver via {@link | ||
| * S3StorageDriver.Builder#setBucketResolver(BucketResolver)} to choose a bucket per payload. | ||
| */ | ||
| @Experimental | ||
| @FunctionalInterface | ||
| public interface BucketResolver { | ||
| @Nonnull | ||
| String resolveBucket(@Nonnull StorageDriverStoreContext context, @Nonnull Payload payload); | ||
| } |
56 changes: 56 additions & 0 deletions
56
...orage-s3driver/src/main/java/io/temporal/payload/storage/s3driver/CompletableFutures.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| package io.temporal.payload.storage.s3driver; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionException; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| final class CompletableFutures { | ||
| private CompletableFutures() {} | ||
|
|
||
| /** | ||
| * Completes with the results in input order once every future succeeds. Fails fast with the first | ||
| * failure's cause as soon as any future fails, without waiting for the rest. | ||
| */ | ||
| static <T> CompletableFuture<List<T>> allAsList(List<CompletableFuture<T>> futures) { | ||
| CompletableFuture<List<T>> result = new CompletableFuture<>(); | ||
| if (futures.isEmpty()) { | ||
| result.complete(new ArrayList<>()); | ||
| return result; | ||
| } | ||
| AtomicInteger remaining = new AtomicInteger(futures.size()); | ||
| for (CompletableFuture<T> future : futures) { | ||
| future.whenComplete( | ||
| (value, ex) -> { | ||
| if (ex != null) { | ||
| result.completeExceptionally(unwrap(ex)); | ||
| } else if (remaining.decrementAndGet() == 0) { | ||
| List<T> results = new ArrayList<>(futures.size()); | ||
| for (CompletableFuture<T> completed : futures) { | ||
| results.add(completed.join()); | ||
| } | ||
| result.complete(results); | ||
| } | ||
| }); | ||
| } | ||
| result.whenComplete( | ||
| (value, ex) -> { | ||
| if (ex != null) { | ||
| for (CompletableFuture<T> future : futures) { | ||
| future.cancel(true); | ||
| } | ||
| } | ||
| }); | ||
| return result; | ||
| } | ||
|
|
||
| static Throwable unwrap(Throwable t) { | ||
| while ((t instanceof CompletionException || t instanceof ExecutionException) | ||
| && t.getCause() != null) { | ||
| t = t.getCause(); | ||
| } | ||
| return t; | ||
| } | ||
| } | ||
26 changes: 26 additions & 0 deletions
26
...ad-storage-s3driver/src/main/java/io/temporal/payload/storage/s3driver/PayloadHasher.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| package io.temporal.payload.storage.s3driver; | ||
|
|
||
| import java.security.MessageDigest; | ||
| import java.security.NoSuchAlgorithmException; | ||
|
|
||
| final class PayloadHasher { | ||
| private static final char[] HEX = "0123456789abcdef".toCharArray(); | ||
|
|
||
| private PayloadHasher() {} | ||
|
|
||
| /** Returns the lower-case SHA-256 hex digest of {@code data}. */ | ||
| static String sha256Hex(byte[] data) { | ||
| byte[] digest; | ||
| try { | ||
| // If we ever move to Java 17+ we can use HexFormat.of().formatHex() instead. | ||
| digest = MessageDigest.getInstance("SHA-256").digest(data); | ||
| } catch (NoSuchAlgorithmException e) { | ||
| throw new AssertionError("SHA-256 MessageDigest cannot be found", e); | ||
| } | ||
| StringBuilder sb = new StringBuilder(digest.length * 2); | ||
| for (byte b : digest) { | ||
| sb.append(HEX[(b >> 4) & 0xF]).append(HEX[b & 0xF]); | ||
| } | ||
| return sb.toString(); | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After issuing cancellations, do we need to wait for all of the futures to actually finish? Thinking if the futures are using resources that are provided by the caller that we might want to wait for their cancellations to finish before returning to the caller. This might be difficult because of how
CompletableFutureworks. I don't have a specific scenario in mind.