From 80c32d26aa9ca6ea115fc2a4d7df35fe22acd59a Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 1 Jul 2026 11:39:51 -0700 Subject: [PATCH 1/9] st: sync PulsarApi.proto from apache/pulsar master Bring proto/PulsarApi.proto up to date with apache/pulsar master to pick up the scalable-topics wire protocol that pulsar::st will implement against: the DAG watch commands (CommandScalableTopicLookup/Update/Close), the segment/DAG messages (SegmentInfoProto, SegmentBrokerAddress, ScalableTopicDAG, SegmentState), the consumer-controller commands (CommandScalableTopicSubscribe /SubscribeResponse/AssignmentUpdate, ScalableConsumerAssignment), the topic/TC watch commands, and the supports_scalable_topics connect flag. Straight upstream sync (+329/-7). The only client-visible field change is the PIP-473 rename txn_ttl_seconds -> txn_ttl_millis (same field number, unused by the C++ client). libpulsar rebuilds and links cleanly against the regenerated sources. Signed-off-by: Matteo Merli --- proto/PulsarApi.proto | 336 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 329 insertions(+), 7 deletions(-) diff --git a/proto/PulsarApi.proto b/proto/PulsarApi.proto index 4e207913..529c806d 100644 --- a/proto/PulsarApi.proto +++ b/proto/PulsarApi.proto @@ -45,6 +45,8 @@ message Schema { LocalTime = 18; LocalDateTime = 19; ProtobufNative = 20; + AutoConsume = 21; + External = 22; } required string name = 1; @@ -162,6 +164,24 @@ message MessageMetadata { // Indicate if the message partition key is set optional bool null_partition_key = 30 [default = false]; + + // Indicates the indexes of messages retained in the batch after compaction. When a batch is compacted, + // some messages may be removed (compacted out). For example, if the original batch contains: + // `k0 => v0, k1 => v1, k2 => v2, k1 => null`, the compacted batch will retain only `k0 => v0` and `k2 => v2`. + // In this case, this field will be set to `[0, 2]`, and the payload buffer will only include the retained messages. + // + // Note: Batches compacted by older versions of the compaction service do not include this field. For such batches, + // the `compacted_out` field in `SingleMessageMetadata` must be checked to identify and filter out compacted + // messages (e.g., `k1 => v1` and `k1 => null` in the example above). + repeated int32 compacted_batch_indexes = 31; + optional bytes schema_id = 32; + + // PIP-486 scalable-topic entry-bucketing. The effective entry-bucket hash range covered by this + // entry: the smallest and largest entry-bucket hash (the low 16 bits of the key's Murmur3_32 hash) + // among the batched messages, both bounds inclusive. Set only by scalable-topic (segment://) + // producers; the broker routes the whole entry to the consumer owning that bucket on the segment. + optional uint32 entry_hash_min = 33; + optional uint32 entry_hash_max = 34; } message SingleMessageMetadata { @@ -263,10 +283,11 @@ enum ProtocolVersion { v18 = 18; // Add client support for broker entry metadata v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse v20 = 20; // Add client support for topic migration redirection CommandTopicMigrated + v21 = 21; // Carry the AUTO_CONSUME schema to the Broker after this version } message CommandConnect { - required string client_version = 1; + required string client_version = 1; // The version of the client. Proxy should forward client's client_version. optional AuthMethod auth_method = 2; // Deprecated. Use "auth_method_name" instead. optional string auth_method_name = 5; optional bytes auth_data = 3; @@ -289,13 +310,21 @@ message CommandConnect { // Feature flags optional FeatureFlags feature_flags = 10; + + optional string proxy_version = 11; // Version of the proxy. Should only be forwarded by a proxy. } +// Please also add a new enum for the class "PulsarClientException.FailedFeatureCheck" when adding a new feature flag. message FeatureFlags { optional bool supports_auth_refresh = 1 [default = false]; optional bool supports_broker_entry_metadata = 2 [default = false]; optional bool supports_partial_producer = 3 [default = false]; optional bool supports_topic_watchers = 4 [default = false]; + optional bool supports_get_partitioned_metadata_without_auto_creation = 5 [default = false]; + optional bool supports_repl_dedup_by_lid_and_eid = 6 [default = false]; + optional bool supports_topic_watcher_reconcile = 7 [default = false]; + optional bool supports_scalable_topics = 8 [default = false]; + optional bool supports_tc_metadata_discovery = 9 [default = false]; } message CommandConnected { @@ -306,7 +335,7 @@ message CommandConnected { } message CommandAuthResponse { - optional string client_version = 1; + optional string client_version = 1; // The version of the client. Proxy should forward client's client_version. optional AuthData response = 2; optional int32 protocol_version = 3 [default = 0]; } @@ -409,6 +438,7 @@ message CommandPartitionedTopicMetadata { // to the proxy. optional string original_auth_data = 4; optional string original_auth_method = 5; + optional bool metadata_auto_creation_enabled = 6 [default = true]; } message CommandPartitionedTopicMetadataResponse { @@ -439,6 +469,8 @@ message CommandLookupTopic { optional string original_auth_method = 6; // optional string advertised_listener_name = 7; + // The properties used for topic lookup + repeated KeyValue properties = 8; } message CommandLookupTopicResponse { @@ -603,6 +635,7 @@ message CommandFlow { message CommandUnsubscribe { required uint64 consumer_id = 1; required uint64 request_id = 2; + optional bool force = 3 [default = false]; } // Reset an existing consumer to a particular message id @@ -622,7 +655,7 @@ message CommandReachedEndOfTopic { } message CommandTopicMigrated { - enum ResourceType { + enum ResourceType { Producer = 0; Consumer = 1; } @@ -633,6 +666,7 @@ message CommandTopicMigrated { } + message CommandCloseProducer { required uint64 producer_id = 1; required uint64 request_id = 2; @@ -766,6 +800,8 @@ message CommandGetTopicsOfNamespace { optional Mode mode = 3 [default = PERSISTENT]; optional string topics_pattern = 4; optional string topics_hash = 5; + // Context properties from the client + repeated KeyValue properties = 6; } message CommandGetTopicsOfNamespaceResponse { @@ -807,6 +843,244 @@ message CommandWatchTopicListClose { required uint64 watcher_id = 2; } +/// --- Scalable topic commands --- + +enum SegmentState { + ACTIVE = 0; + SEALED = 1; +} + +message SegmentInfoProto { + required uint64 segment_id = 1; + required uint32 hash_start = 2; + required uint32 hash_end = 3; + required SegmentState state = 4; + repeated uint64 parent_ids = 5; + repeated uint64 child_ids = 6; + required uint64 created_at_epoch = 7; + optional uint64 sealed_at_epoch = 8; + // Wall-clock millis at create / seal time. Used for retention-based segment GC + // and timestamp-based seek; epoch above is a DAG generation number, not a clock. + required uint64 created_at_ms = 9; + optional uint64 sealed_at_ms = 10; + + // Legacy-segment marker. When set, this segment is not managed by the scalable-topic + // controller and has no segment://... topic of its own — it wraps the named, externally + // managed persistent://... topic instead. Used by the synthetic-layout response the + // broker returns for a regular (partitioned or non-partitioned) topic that has not yet + // been migrated to a scalable topic. When absent, the segment URI is computed normally + // from the scalable topic name and segment_id (segment:///). + optional string legacy_topic_name = 11; + + // PIP-486: entry-bucket split points — the ascending, inclusive start hashes of buckets 1..N-1 + // within the 16-bit entry-bucket hash ring (bucket 0 implicitly starts at 0x0000). The segment is + // divided into entry_bucket_splits_count + 1 buckets; an empty list means a single bucket spanning + // the whole ring. Splits may be uneven (e.g. to balance buckets by traffic). Producers bucket their + // batches accordingly and the broker routes by bucket. + repeated uint32 entry_bucket_splits = 12; +} + +message SegmentBrokerAddress { + required uint64 segment_id = 1; + required string broker_url = 2; + optional string broker_url_tls = 3; +} + +message ScalableTopicDAG { + required uint64 epoch = 1; + repeated SegmentInfoProto segments = 2; + repeated SegmentBrokerAddress segment_brokers = 3; + optional string controller_broker_url = 4; + optional string controller_broker_url_tls = 5; +} + +// Client -> Broker: Request scalable topic metadata and initiate watch session +message CommandScalableTopicLookup { + required uint64 session_id = 1; // Client-assigned session ID + // Any of "topic://t/n/x", "persistent://t/n/x", or a short form like + // "my-topic" (normalized by the broker to persistent://public/default/my-topic). + // The broker resolves the input to the canonical topic://... identity and + // returns it in CommandScalableTopicUpdate.resolved_topic_name. + required string topic = 2; + // What to do when the scalable topic is missing on lookup: if true, create it (subject to + // broker/namespace auto-creation policy); if false, fail not-found instead of creating it. + // Namespace (multi-topic) consumers set this false so a deleted topic is never resurrected by + // a reconnecting per-topic watch. Defaults to true to preserve the create-on-lookup behavior + // for explicit single-topic producers/consumers (and for older clients that don't set it). + optional bool create_if_missing = 3 [default = true]; +} + +// Broker -> Client: Used for BOTH initial response and subsequent pushed updates +message CommandScalableTopicUpdate { + required uint64 session_id = 1; + optional ScalableTopicDAG dag = 2; + + optional ServerError error = 3; + optional string message = 4; + + // Canonical scalable-topic identity (always "topic://t/n/x") that the client + // should use for downstream operations. Set on every success response, + // including for inputs that were given as persistent://... or short-form. + // Absent on error responses. + optional string resolved_topic_name = 5; +} + +// Client -> Broker: Close the DAG watch session +message CommandScalableTopicClose { + required uint64 session_id = 1; +} + +// Kind of scalable consumer registering with the controller leader. +// QueueConsumer never registers — it attaches directly to all active and sealed +// segment topics — so it does not appear here. +enum ScalableConsumerType { + STREAM = 0; + CHECKPOINT = 1; +} + +// A single segment assigned to a scalable consumer. +message ScalableAssignedSegment { + required uint64 segment_id = 1; + required uint32 hash_start = 2; + required uint32 hash_end = 3; + // Fully-qualified segment:// topic name the consumer should attach to. + required string segment_topic = 4; + // PIP-486: the entry-bucket hash ranges (16-bit, inclusive) this consumer owns within the + // segment. Empty means the consumer owns the whole segment (single bucket) and subscribes + // Shared; non-empty means the segment is shared by bucket, and the consumer subscribes + // Key_Shared STICKY declaring exactly these ranges. + repeated IntRange bucket_ranges = 5; +} + +// An assignment of active segments to a single consumer. Carries the layout epoch +// it was computed from so the client can reject stale updates. +message ScalableConsumerAssignment { + required uint64 layout_epoch = 1; + repeated ScalableAssignedSegment segments = 2; +} + +// Client -> Broker: register as an ordered (Stream) or external (Checkpoint) consumer +// on a scalable topic and request the initial segment assignment. The broker leader +// persists the consumer registration and returns the current assignment. +message CommandScalableTopicSubscribe { + required uint64 request_id = 1; + required string topic = 2; // e.g. "topic://tenant/ns/my-topic" + required string subscription = 3; + required string consumer_name = 4; + required uint64 consumer_id = 5; + required ScalableConsumerType consumer_type = 6; +} + +// Broker -> Client: response to CommandScalableTopicSubscribe. On success, carries +// the initial ScalableConsumerAssignment. On failure, error + message are populated +// and the assignment is absent. +message CommandScalableTopicSubscribeResponse { + required uint64 request_id = 1; + optional ServerError error = 2; + optional string message = 3; + optional ScalableConsumerAssignment assignment = 4; +} + +// Broker -> Client: push a new assignment to a subscribed consumer after a rebalance +// (triggered by a peer joining/leaving the subscription or by a segment split/merge). +message CommandScalableTopicAssignmentUpdate { + required uint64 consumer_id = 1; + required ScalableConsumerAssignment assignment = 2; +} + +// Multi-topic consumer watcher: subscribes to the union of scalable topics in a +// namespace that match a (possibly empty) set of property filters. The broker keeps +// pushing updates as topics enter or leave the matching set. See +// `multi-topic-consumer-design.md` for the full design. + +// Client -> Broker: open a watch session. +message CommandWatchScalableTopics { + required uint64 watch_id = 1; // Client-assigned watch ID + required string namespace = 2; // tenant/namespace + // Optional AND filters; empty list means "match all topics in the namespace". + repeated KeyValue property_filters = 3; + // Hash of the topics the client believes are currently in its set. Sent on + // reconnect; absent on first subscribe. If it matches the broker's freshly + // computed hash, the broker skips emitting the initial Snapshot — the client's + // local state is already correct and future Diffs will flow as usual. Same + // hash function as CommandGetTopicsOfNamespace (CRC32C over sorted topics). + optional string current_hash = 4; +} + +// Snapshot of the full matching set. Sent on initial subscribe and on every +// reconnect-resync. The client replaces its local set with this list. +message ScalableTopicsSnapshot { + repeated string topics = 1; +} + +// Incremental membership change. Apply removed before added when both are present. +message ScalableTopicsDiff { + repeated string added = 1; + repeated string removed = 2; +} + +// Broker -> Client: either Snapshot or Diff (mutually exclusive via oneof). When the +// initial subscribe fails, neither variant is set and `error`/`message` carry the +// failure reason. +message CommandWatchScalableTopicsUpdate { + required uint64 watch_id = 1; + + oneof event { + ScalableTopicsSnapshot snapshot = 2; + ScalableTopicsDiff diff = 3; + } + + optional ServerError error = 4; + optional string message = 5; +} + +// Client -> Broker: close the watch session. +message CommandWatchScalableTopicsClose { + required uint64 watch_id = 1; +} + +/// --- Transaction-coordinator assignment watch --- +// The scalable-topics transaction coordinator's discovery surface. The client opens one watch +// and the broker replies with the full (partition -> leader) map, then pushes a fresh full +// snapshot whenever any partition's leader changes. There is no point lookup and no diff: the +// map is bounded (parallelism, ~16) and changes rarely, so always sending the whole snapshot is +// simpler and removes a class of apply-ordering / drift bugs. Gated by the +// supports_tc_metadata_discovery feature flag. + +// Client -> Broker: open the assignment watch. +message CommandWatchTcAssignments { + required uint64 watch_id = 1; // client-assigned +} + +// One (partition -> leader) entry. A partition currently mid-election is simply absent from the +// snapshot; the client parks any transaction routed there until a later snapshot fills it in. +message TcAssignment { + required uint64 tc_id = 1; // TC partition = TxnID.mostSigBits + optional string broker_service_url = 2; + optional string broker_service_url_tls = 3; +} + +// Full map. Sent on initial watch and again, in full, on every change. The client replaces its +// local map wholesale — no merge, no ordering rules. parallelism lets the client size its handler +// array without a separate metadata read. +message TcAssignmentsSnapshot { + required uint32 parallelism = 1; + repeated TcAssignment assignments = 2; +} + +// Broker -> Client: the current full snapshot, or (on initial-watch failure) an error. +message CommandWatchTcAssignmentsUpdate { + required uint64 watch_id = 1; + optional TcAssignmentsSnapshot snapshot = 2; + optional ServerError error = 3; + optional string message = 4; +} + +// Client -> Broker: close the watch. +message CommandWatchTcAssignmentsClose { + required uint64 watch_id = 1; +} + message CommandGetSchema { required uint64 request_id = 1; required string topic = 2; @@ -824,9 +1098,10 @@ message CommandGetSchemaResponse { } message CommandGetOrCreateSchema { - required uint64 request_id = 1; - required string topic = 2; - required Schema schema = 3; + required uint64 request_id = 1; + required string topic = 2; + required Schema schema = 3; + optional string producerName = 4; } message CommandGetOrCreateSchemaResponse { @@ -847,6 +1122,8 @@ enum TxnAction { message CommandTcClientConnectRequest { required uint64 request_id = 1; required uint64 tc_id = 2 [default = 0]; + // Route to the scalable-topics (PIP-473) coordinator. See CommandNewTxn.scalable. + optional bool scalable = 3 [default = false]; } message CommandTcClientConnectResponse { @@ -857,8 +1134,15 @@ message CommandTcClientConnectResponse { message CommandNewTxn { required uint64 request_id = 1; - optional uint64 txn_ttl_seconds = 2 [default = 0]; + // Transaction timeout in milliseconds. Despite the field number's legacy name history, the value + // has always been carried in milliseconds (the client sends unit.toMillis(...) and both + // coordinators consume it as ms); the field is named accordingly to avoid confusion. + optional uint64 txn_ttl_millis = 2 [default = 0]; optional uint64 tc_id = 3 [default = 0]; + // When true, route to the metadata-driven (scalable-topics, PIP-473) transaction coordinator + // instead of the legacy one. Set by v5 clients; absent for v4 clients. Lets both coordinators + // serve their own clients on the same cluster. + optional bool scalable = 4 [default = false]; } message CommandNewTxnResponse { @@ -874,6 +1158,8 @@ message CommandAddPartitionToTxn { optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; repeated string partitions = 4; + // Route to the scalable-topics (PIP-473) coordinator. See CommandNewTxn.scalable. + optional bool scalable = 5 [default = false]; } message CommandAddPartitionToTxnResponse { @@ -893,6 +1179,8 @@ message CommandAddSubscriptionToTxn { optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; repeated Subscription subscription = 4; + // Route to the scalable-topics (PIP-473) coordinator. See CommandNewTxn.scalable. + optional bool scalable = 5 [default = false]; } message CommandAddSubscriptionToTxnResponse { @@ -908,6 +1196,8 @@ message CommandEndTxn { optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional TxnAction txn_action = 4; + // Route to the scalable-topics (PIP-473) coordinator. See CommandNewTxn.scalable. + optional bool scalable = 5 [default = false]; } message CommandEndTxnResponse { @@ -1043,6 +1333,22 @@ message BaseCommand { WATCH_TOPIC_LIST_CLOSE = 67; TOPIC_MIGRATED = 68; + + SCALABLE_TOPIC_LOOKUP = 70; + SCALABLE_TOPIC_UPDATE = 71; + SCALABLE_TOPIC_CLOSE = 72; + + SCALABLE_TOPIC_SUBSCRIBE = 73; + SCALABLE_TOPIC_SUBSCRIBE_RESPONSE = 74; + SCALABLE_TOPIC_ASSIGNMENT_UPDATE = 75; + + WATCH_SCALABLE_TOPICS = 76; + WATCH_SCALABLE_TOPICS_UPDATE = 77; + WATCH_SCALABLE_TOPICS_CLOSE = 78; + + WATCH_TC_ASSIGNMENTS = 79; + WATCH_TC_ASSIGNMENTS_UPDATE = 80; + WATCH_TC_ASSIGNMENTS_CLOSE = 81; } @@ -1126,4 +1432,20 @@ message BaseCommand { optional CommandWatchTopicListClose watchTopicListClose = 67; optional CommandTopicMigrated topicMigrated = 68; + + optional CommandScalableTopicLookup scalableTopicLookup = 70; + optional CommandScalableTopicUpdate scalableTopicUpdate = 71; + optional CommandScalableTopicClose scalableTopicClose = 72; + + optional CommandScalableTopicSubscribe scalableTopicSubscribe = 73; + optional CommandScalableTopicSubscribeResponse scalableTopicSubscribeResponse = 74; + optional CommandScalableTopicAssignmentUpdate scalableTopicAssignmentUpdate = 75; + + optional CommandWatchScalableTopics watchScalableTopics = 76; + optional CommandWatchScalableTopicsUpdate watchScalableTopicsUpdate = 77; + optional CommandWatchScalableTopicsClose watchScalableTopicsClose = 78; + + optional CommandWatchTcAssignments watchTcAssignments = 79; + optional CommandWatchTcAssignmentsUpdate watchTcAssignmentsUpdate = 80; + optional CommandWatchTcAssignmentsClose watchTcAssignmentsClose = 81; } From 50df108b97134c4390b26902e5491409e22cacf8 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 1 Jul 2026 14:56:56 -0700 Subject: [PATCH 2/9] st: build wiring for the lib/st implementation Wire the pulsar::st implementation into the build: - lib/st compiles as ST_OBJECT_LIB, a separate object library with a per-target CXX_STANDARD 20 (the classic client stays C++17), merged into the same libpulsar shared and static artifacts (and into the MSVC pulsarStaticWithDeps fat lib; the non-MSVC fat lib merges pulsarStatic and inherits it). Seeded with the out-of-line MessageId/Checkpoint default constructors - final code, and exported symbols that prove the merge. - reflectcpp joins vcpkg.json (0.24.0), making jsonSchema() buildable against the real reflect-cpp instead of a stand-in. The vcpkg port also ships rfl/avro.hpp, so the avroSchema() __has_include gate lights up too. StExamples now compiles SampleStJsonSchema.cc against it. - tests/st: new pulsar-st-tests gtest binary (C++20, links pulsarStatic, no broker needed) with 42 tests covering Expected (value/error, throwing value(), monadic ops incl. the && overloads with move-only T), Future (get/timed get/listeners, thenApply incl. void and move-only mappers, abandoned-promise failure, first-writer-wins, co_await ready/suspend/error) and the Schema codecs (primitive roundtrips, big-endian wire format, short payload errors, BytesView zero-copy, unset schema, custom SerDe), plus a handle test that links the lib/st symbols out of the merged archive. Verified: full local build of pulsarShared/pulsarStatic/pulsar-st-tests/ StExamples (vcpkg, macOS); 42/42 tests pass; pulsar::st symbols present in both artifacts; gcc-13 -Wextra -Werror parity on lib/st; clang-format-11 clean. Signed-off-by: Matteo Merli --- lib/CMakeLists.txt | 24 +++- lib/st/Checkpoint.cc | 27 ++++ lib/st/MessageId.cc | 27 ++++ tests/BuildTests.cmake | 8 ++ tests/st/ExpectedTest.cc | 146 ++++++++++++++++++++++ tests/st/FutureTest.cc | 242 ++++++++++++++++++++++++++++++++++++ tests/st/SchemaCodecTest.cc | 147 ++++++++++++++++++++++ tests/st/StHandleTest.cc | 47 +++++++ vcpkg.json | 4 + 9 files changed, 669 insertions(+), 3 deletions(-) create mode 100644 lib/st/Checkpoint.cc create mode 100644 lib/st/MessageId.cc create mode 100644 tests/st/ExpectedTest.cc create mode 100644 tests/st/FutureTest.cc create mode 100644 tests/st/SchemaCodecTest.cc create mode 100644 tests/st/StHandleTest.cc diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 3478f10e..33628d7d 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -77,9 +77,27 @@ target_include_directories(PULSAR_OBJECT_LIB PUBLIC "${CMAKE_SOURCE_DIR}/include" "${CMAKE_BINARY_DIR}/include") +# --- Scalable topics (pulsar::st) implementation ---------------------------- +# The st API (include/pulsar/st) targets C++20 while the classic client above +# stays C++17, so its implementation compiles in a separate object library with +# a per-target standard. The objects are merged into the same libpulsar +# shared/static artifacts below; a C++20-compiled object archive links fine +# into C++17 consumers of the classic API. +file(GLOB PULSAR_ST_SOURCES st/*.cc st/*.h) +add_library(ST_OBJECT_LIB OBJECT ${PULSAR_ST_SOURCES}) +set_property(TARGET ST_OBJECT_LIB PROPERTY POSITION_INDEPENDENT_CODE 1) +set_target_properties(ST_OBJECT_LIB PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON) +if (INTEGRATE_VCPKG) + target_link_libraries(ST_OBJECT_LIB PROTO_OBJECTS) +endif () +target_include_directories(ST_OBJECT_LIB PUBLIC + "${CMAKE_SOURCE_DIR}" + "${CMAKE_SOURCE_DIR}/include" + "${CMAKE_BINARY_DIR}/include") + include(CheckCXXSymbolExists) if (BUILD_DYNAMIC_LIB) - add_library(pulsarShared SHARED $) + add_library(pulsarShared SHARED $ $) set_property(TARGET pulsarShared PROPERTY OUTPUT_NAME ${LIB_NAME_SHARED}) set_property(TARGET pulsarShared PROPERTY VERSION ${LIBRARY_VERSION}) target_link_libraries(pulsarShared PRIVATE ${COMMON_LIBS} ${CMAKE_DL_LIBS}) @@ -102,7 +120,7 @@ if(HAVE_AUXV_GETAUXVAL) endif() if (BUILD_STATIC_LIB) - add_library(pulsarStatic STATIC $) + add_library(pulsarStatic STATIC $ $) target_include_directories(pulsarStatic PUBLIC ${CMAKE_BINARY_DIR}/include ${CMAKE_SOURCE_DIR} @@ -139,7 +157,7 @@ if (LINK_STATIC AND BUILD_STATIC_LIB) set(${OUTLIST} ${TEMP_OUT} PARENT_SCOPE) endfunction(remove_libtype) - add_library(pulsarStaticWithDeps STATIC ${PULSAR_SOURCES}) + add_library(pulsarStaticWithDeps STATIC ${PULSAR_SOURCES} $) target_include_directories(pulsarStaticWithDeps PRIVATE ${dlfcn-win32_INCLUDE_DIRS}) if (VCPKG_TRIPLET) # Collect ALL vcpkg-installed static archives so every transitive dependency diff --git a/lib/st/Checkpoint.cc b/lib/st/Checkpoint.cc new file mode 100644 index 00000000..58125047 --- /dev/null +++ b/lib/st/Checkpoint.cc @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +namespace pulsar::st { + +// An empty checkpoint: impl_ stays null, so the handle is falsy under +// operator bool and must not be used as a start position. +Checkpoint::Checkpoint() = default; + +} // namespace pulsar::st diff --git a/lib/st/MessageId.cc b/lib/st/MessageId.cc new file mode 100644 index 00000000..4bfa037a --- /dev/null +++ b/lib/st/MessageId.cc @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +namespace pulsar::st { + +// An empty id: impl_ stays null, so the handle is falsy under operator bool and +// compares equal only to other empty ids. +MessageId::MessageId() = default; + +} // namespace pulsar::st diff --git a/tests/BuildTests.cmake b/tests/BuildTests.cmake index 0fe74300..db44f6aa 100644 --- a/tests/BuildTests.cmake +++ b/tests/BuildTests.cmake @@ -58,3 +58,11 @@ target_link_libraries(ChunkDedupTest pulsarStatic ${GTEST_TARGETS}) add_executable(ExtensibleLoadManagerTest extensibleLM/ExtensibleLoadManagerTest.cc HttpHelper.cc) target_link_libraries(ExtensibleLoadManagerTest PRIVATE pulsarStatic ${GTEST_TARGETS}) + +# --- Scalable topics (pulsar::st) unit tests -------------------------------- +# Pure client-side tests for the st API and its lib/st implementation; they do +# not require a running broker. C++20 per-target, like the st API itself. +file(GLOB ST_TEST_SOURCES st/*.cc) +add_executable(pulsar-st-tests ${ST_TEST_SOURCES}) +set_target_properties(pulsar-st-tests PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON) +target_link_libraries(pulsar-st-tests PRIVATE pulsarStatic ${GTEST_TARGETS}) diff --git a/tests/st/ExpectedTest.cc b/tests/st/ExpectedTest.cc new file mode 100644 index 00000000..08ab950d --- /dev/null +++ b/tests/st/ExpectedTest.cc @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include +#include + +using namespace pulsar::st; +using UniqueInt = std::unique_ptr; + +TEST(ExpectedTest, testValueState) { + Expected e(42); + ASSERT_TRUE(e.has_value()); + ASSERT_TRUE(static_cast(e)); + ASSERT_EQ(*e, 42); + ASSERT_EQ(e.value(), 42); +} + +TEST(ExpectedTest, testErrorState) { + Expected e(Error{ResultTimeout, "timed out"}); + ASSERT_FALSE(e.has_value()); + ASSERT_EQ(e.error().result, ResultTimeout); + ASSERT_EQ(e.error().message, "timed out"); +} + +TEST(ExpectedTest, testUnexpectedFactory) { + Expected e = unexpected(ResultInvalidMessage, "bad payload"); + ASSERT_FALSE(e); + ASSERT_EQ(e.error().result, ResultInvalidMessage); +} + +TEST(ExpectedTest, testValueThrowsClientExceptionOnError) { + Expected e(Error{ResultUnknownError, "boom"}); + ASSERT_THROW(e.value(), ClientException); +} + +TEST(ExpectedTest, testVoidSpecialization) { + Expected ok; + ASSERT_TRUE(ok); + ASSERT_NO_THROW(ok.value()); + + Expected failed(Error{ResultTimeout, "t"}); + ASSERT_FALSE(failed); + ASSERT_EQ(failed.error().result, ResultTimeout); + ASSERT_THROW(failed.value(), ClientException); +} + +TEST(ExpectedTest, testValueOr) { + Expected ok(5); + ASSERT_EQ(ok.value_or(0), 5); + + Expected failed(Error{ResultUnknownError, ""}); + ASSERT_EQ(failed.value_or(7), 7); +} + +TEST(ExpectedTest, testMonadicOpsOnLvalue) { + Expected e(5); + auto doubled = e.transform([](int x) { return x * 2; }); + ASSERT_TRUE(doubled); + ASSERT_EQ(*doubled, 10); + + auto chained = e.and_then([](int x) { return Expected(x + 1L); }); + ASSERT_TRUE(chained); + ASSERT_EQ(*chained, 6L); + + auto recovered = e.or_else([](const Error&) { return Expected(-1); }); + ASSERT_TRUE(recovered); + ASSERT_EQ(*recovered, 5); + + Expected failed(Error{ResultTimeout, "t"}); + auto mapped = failed.transform([](int x) { return x * 2; }); + ASSERT_FALSE(mapped); + ASSERT_EQ(mapped.error().result, ResultTimeout); + auto rescued = failed.or_else([](const Error&) { return Expected(42); }); + ASSERT_TRUE(rescued); + ASSERT_EQ(*rescued, 42); +} + +// Rvalue overloads: a move-only T flows through the chain without a copy. + +TEST(ExpectedTest, testRvalueValueOrMovesOut) { + Expected ok(std::make_unique(7)); + UniqueInt got = std::move(ok).value_or(nullptr); + ASSERT_TRUE(got); + ASSERT_EQ(*got, 7); + + Expected failed(Error{ResultUnknownError, ""}); + UniqueInt fallback = std::move(failed).value_or(std::make_unique(99)); + ASSERT_TRUE(fallback); + ASSERT_EQ(*fallback, 99); +} + +TEST(ExpectedTest, testRvalueAndThenConsumesValue) { + Expected ok(std::make_unique(5)); + auto r = std::move(ok).and_then([](UniqueInt p) { return Expected(*p + 1); }); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 6); + + Expected failed(Error{ResultTimeout, "t"}); + auto propagated = std::move(failed).and_then([](UniqueInt) { return Expected(0); }); + ASSERT_FALSE(propagated); + ASSERT_EQ(propagated.error().result, ResultTimeout); +} + +TEST(ExpectedTest, testRvalueTransformMapsMoveOnly) { + Expected ok(std::make_unique(3)); + auto r = std::move(ok).transform([](UniqueInt p) { return std::make_unique(*p * 10L); }); + ASSERT_TRUE(r); + ASSERT_EQ(**r, 30L); +} + +TEST(ExpectedTest, testRvalueOrElse) { + Expected failed(Error{ResultUnknownError, ""}); + auto recovered = std::move(failed).or_else([](const Error&) { return Expected(42); }); + ASSERT_TRUE(recovered); + ASSERT_EQ(*recovered, 42); + + Expected ok(10); + auto passthrough = std::move(ok).or_else([](const Error&) { return Expected(-1); }); + ASSERT_TRUE(passthrough); + ASSERT_EQ(*passthrough, 10); +} + +TEST(ExpectedTest, testRvalueValueMovesOut) { + Expected e(std::string("hello")); + std::string s = std::move(e).value(); + ASSERT_EQ(s, "hello"); +} diff --git a/tests/st/FutureTest.cc b/tests/st/FutureTest.cc new file mode 100644 index 00000000..43acbc06 --- /dev/null +++ b/tests/st/FutureTest.cc @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace pulsar::st; +using pulsar::st::detail::Promise; + +TEST(FutureTest, testGetReturnsCompletedValue) { + Promise promise; + Future future = promise.getFuture(); + ASSERT_FALSE(future.isReady()); + promise.setValue(42); + ASSERT_TRUE(future.isReady()); + auto r = future.get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 42); +} + +TEST(FutureTest, testGetBlocksUntilCompletedFromAnotherThread) { + Promise promise; + Future future = promise.getFuture(); + std::thread completer([promise]() { promise.setValue(7); }); + auto r = future.get(); + completer.join(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 7); +} + +TEST(FutureTest, testTimedGetTimesOutWhilePending) { + Promise promise; + Future future = promise.getFuture(); + auto r = future.get(std::chrono::milliseconds(10)); + ASSERT_FALSE(r.has_value()); // timed out, still pending + promise.setValue(1); + auto r2 = future.get(std::chrono::milliseconds(10)); + ASSERT_TRUE(r2.has_value()); + ASSERT_TRUE(*r2); + ASSERT_EQ(**r2, 1); +} + +TEST(FutureTest, testListenerRunsOnCompletion) { + Promise promise; + Future future = promise.getFuture(); + int seen = -1; + future.addListener([&seen](const Expected& r) { seen = r ? *r : -2; }); + ASSERT_EQ(seen, -1); + promise.setValue(5); + ASSERT_EQ(seen, 5); +} + +TEST(FutureTest, testListenerAfterCompletionRunsSynchronously) { + Promise promise; + promise.setValue(9); + int seen = -1; + promise.getFuture().addListener([&seen](const Expected& r) { seen = r ? *r : -2; }); + ASSERT_EQ(seen, 9); +} + +TEST(FutureTest, testCompleteIsFirstWriterWins) { + Promise promise; + ASSERT_TRUE(promise.setValue(1)); + ASSERT_FALSE(promise.setValue(2)); + ASSERT_FALSE(promise.setError(Error{ResultUnknownError, ""})); + auto r = promise.getFuture().get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 1); +} + +// --- thenApply --------------------------------------------------------------- + +TEST(FutureTest, testThenApplyMapsValue) { + Promise promise; + Future mapped = + promise.getFuture().thenApply([](const int& x) { return std::to_string(x + 1); }); + promise.setValue(41); + auto r = mapped.get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, "42"); +} + +TEST(FutureTest, testThenApplyPropagatesError) { + Promise promise; + bool called = false; + Future mapped = promise.getFuture().thenApply([&called](const int& x) { + called = true; + return x; + }); + promise.setError(Error{ResultTimeout, "t"}); + auto r = mapped.get(); + ASSERT_FALSE(r); + ASSERT_EQ(r.error().result, ResultTimeout); + ASSERT_FALSE(called); +} + +TEST(FutureTest, testThenApplyVoidMapper) { + Promise promise; + int seen = -1; + Future done = promise.getFuture().thenApply([&seen](const int& x) { seen = x; }); + promise.setValue(7); + auto r = done.get(); + ASSERT_TRUE(r); + ASSERT_EQ(seen, 7); +} + +TEST(FutureTest, testThenApplyMoveOnlyMapper) { + Promise promise; + auto bonus = std::make_unique(100); + Future mapped = + promise.getFuture().thenApply([b = std::move(bonus)](const int& x) { return x + *b; }); + promise.setValue(5); + auto r = mapped.get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 105); +} + +// --- broken promise ---------------------------------------------------------- + +TEST(FutureTest, testAbandonedPromiseFailsTheFuture) { + Future future = [] { + Promise abandoned; + return abandoned.getFuture(); + }(); + auto r = future.get(); // must not hang + ASSERT_FALSE(r); + ASSERT_EQ(r.error().result, ResultUnknownError); +} + +TEST(FutureTest, testAbandonedPromiseCopiesFailOnlyAfterLastCopyDies) { + Promise outer; + Future future = outer.getFuture(); + { + Promise copy = outer; // a copy dying must not break the promise + } + ASSERT_FALSE(future.isReady()); + outer.setValue(9); + auto r = future.get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 9); +} + +TEST(FutureTest, testAbandonedVoidPromise) { + Future future = [] { + Promise abandoned; + return abandoned.getFuture(); + }(); + auto r = future.get(); + ASSERT_FALSE(r); +} + +TEST(FutureTest, testCompletedPromiseGuardIsNoOp) { + Future future = [] { + Promise promise; + Future f = promise.getFuture(); + promise.setValue(123); + return f; // promise dies after completing: value must be preserved + }(); + auto r = future.get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 123); +} + +// --- coroutine awaiter -------------------------------------------------------- + +namespace { + +struct TestTask { + struct promise_type { + TestTask get_return_object() { return {}; } + std::suspend_never initial_suspend() { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() {} + void unhandled_exception() { std::terminate(); } + }; +}; + +TestTask awaitInto(Future future, Expected& out, std::atomic& done) { + Expected r = co_await future; + out = r; + done = true; +} + +} // namespace + +TEST(FutureTest, testCoAwaitReadyFuture) { + Promise promise; + promise.setValue(11); + Expected out(0); + std::atomic done{false}; + awaitInto(promise.getFuture(), out, done); + ASSERT_TRUE(done.load()); + ASSERT_TRUE(out); + ASSERT_EQ(*out, 11); +} + +TEST(FutureTest, testCoAwaitSuspendsUntilCompleted) { + Promise promise; + Expected out(0); + std::atomic done{false}; + awaitInto(promise.getFuture(), out, done); + ASSERT_FALSE(done.load()); // suspended, not resumed inside await_suspend + promise.setValue(21); // completes -> resumes the coroutine + ASSERT_TRUE(done.load()); + ASSERT_TRUE(out); + ASSERT_EQ(*out, 21); +} + +TEST(FutureTest, testCoAwaitPropagatesError) { + Promise promise; + Expected out(0); + std::atomic done{false}; + awaitInto(promise.getFuture(), out, done); + promise.setError(Error{ResultTimeout, "t"}); + ASSERT_TRUE(done.load()); + ASSERT_FALSE(out); + ASSERT_EQ(out.error().result, ResultTimeout); +} diff --git a/tests/st/SchemaCodecTest.cc b/tests/st/SchemaCodecTest.cc new file mode 100644 index 00000000..a600c44b --- /dev/null +++ b/tests/st/SchemaCodecTest.cc @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace pulsar::st; + +namespace { + +// Encode + decode through the public Schema seam and require the value to +// survive the roundtrip. +template +void expectRoundtrip(const T& value) { + Schema schema; + std::vector encoded; + auto ok = schema.encode(value, encoded); + ASSERT_TRUE(ok); + auto decoded = schema.decode(std::span(encoded)); + ASSERT_TRUE(decoded); + ASSERT_EQ(*decoded, value); +} + +} // namespace + +TEST(SchemaCodecTest, testIntegerRoundtrips) { + expectRoundtrip(42); + expectRoundtrip(-1); + expectRoundtrip(0x1234); + expectRoundtrip(std::numeric_limits::min()); + expectRoundtrip(0x01020304); + expectRoundtrip(-1); + expectRoundtrip(std::numeric_limits::max()); + expectRoundtrip(0x0102030405060708LL); + expectRoundtrip(std::numeric_limits::min()); +} + +TEST(SchemaCodecTest, testFloatingPointRoundtrips) { + expectRoundtrip(3.5f); + expectRoundtrip(-0.0f); + expectRoundtrip(2.718281828459045); + expectRoundtrip(std::numeric_limits::max()); +} + +TEST(SchemaCodecTest, testStringRoundtrip) { + expectRoundtrip("hello scalable topics"); + expectRoundtrip(""); +} + +TEST(SchemaCodecTest, testBytesRoundtrip) { + Bytes payload = {std::byte{0x00}, std::byte{0xFF}, std::byte{0x7F}}; + expectRoundtrip(payload); +} + +TEST(SchemaCodecTest, testInt32IsBigEndianOnTheWire) { + Schema schema; + std::vector encoded; + ASSERT_TRUE(schema.encode(0x01020304, encoded)); + ASSERT_EQ(encoded.size(), 4u); + ASSERT_EQ(encoded[0], std::byte{0x01}); + ASSERT_EQ(encoded[1], std::byte{0x02}); + ASSERT_EQ(encoded[2], std::byte{0x03}); + ASSERT_EQ(encoded[3], std::byte{0x04}); +} + +TEST(SchemaCodecTest, testShortPayloadIsAnErrorNotUb) { + std::vector tooShort(2); + ASSERT_FALSE(Schema{}.decode(std::span(tooShort))); + ASSERT_FALSE(Schema{}.decode(std::span(tooShort))); + ASSERT_FALSE(Schema{}.decode(std::span(tooShort))); + ASSERT_FALSE(Schema{}.decode(std::span(tooShort))); + std::vector empty; + ASSERT_FALSE(Schema{}.decode(std::span(empty))); +} + +TEST(SchemaCodecTest, testBytesViewDecodeIsZeroCopy) { + std::vector buffer = {std::byte{1}, std::byte{2}, std::byte{3}}; + Schema schema; + auto view = schema.decode(std::span(buffer)); + ASSERT_TRUE(view); + ASSERT_EQ(view->data(), buffer.data()); // a view into the same buffer, no copy + ASSERT_EQ(view->size(), buffer.size()); +} + +TEST(SchemaCodecTest, testPrimitiveSchemaInfoNames) { + ASSERT_EQ(Schema{}.info().getName(), "STRING"); + ASSERT_EQ(Schema{}.info().getName(), "INT32"); + ASSERT_EQ(Schema{}.info().getName(), "INT64"); + ASSERT_EQ(Schema{}.info().getName(), "FLOAT"); + ASSERT_EQ(Schema{}.info().getName(), "DOUBLE"); + ASSERT_EQ(Schema{}.info().getName(), "BYTES"); +} + +TEST(SchemaCodecTest, testUnsetSchemaReportsErrors) { + struct NotAPrimitive { + int x = 0; + }; + Schema schema; // no SerDe supplied -> unset codec + std::vector out; + ASSERT_FALSE(schema.encode(NotAPrimitive{}, out)); + ASSERT_FALSE(schema.decode(std::span(out))); +} + +TEST(SchemaCodecTest, testCustomSerDeThroughTypeErasure) { + // A toy SerDe proving the pluggable seam: encodes an int as a decimal string. + struct DecimalSerDe { + SchemaInfo info() const { return SchemaInfo(SchemaType::STRING, "DECIMAL", ""); } + Expected encode(const int& v, std::vector& out) const { + const std::string s = std::to_string(v); + const auto* p = reinterpret_cast(s.data()); + out.assign(p, p + s.size()); + return {}; + } + Expected decode(std::span d) const { + return std::stoi(std::string(reinterpret_cast(d.data()), d.size())); + } + }; + Schema schema{DecimalSerDe{}}; + std::vector encoded; + ASSERT_TRUE(schema.encode(12345, encoded)); + ASSERT_EQ(encoded.size(), 5u); // "12345" + auto decoded = schema.decode(std::span(encoded)); + ASSERT_TRUE(decoded); + ASSERT_EQ(*decoded, 12345); +} diff --git a/tests/st/StHandleTest.cc b/tests/st/StHandleTest.cc new file mode 100644 index 00000000..339bdb0b --- /dev/null +++ b/tests/st/StHandleTest.cc @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include + +// These tests exercise symbols DEFINED in lib/st (not header-inline), proving +// the ST_OBJECT_LIB objects are merged into the libpulsar artifact this test +// binary links against. + +using namespace pulsar::st; + +TEST(StHandleTest, testDefaultMessageIdIsEmpty) { + MessageId id; + ASSERT_FALSE(static_cast(id)); +} + +TEST(StHandleTest, testDefaultCheckpointIsEmpty) { + Checkpoint checkpoint; + ASSERT_FALSE(static_cast(checkpoint)); +} + +TEST(StHandleTest, testEmptyHandlesAreCopyable) { + MessageId id; + MessageId copy = id; + ASSERT_FALSE(static_cast(copy)); + + Checkpoint checkpoint; + Checkpoint checkpointCopy = checkpoint; + ASSERT_FALSE(static_cast(checkpointCopy)); +} diff --git a/vcpkg.json b/vcpkg.json index 3452492d..721371da 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -43,6 +43,10 @@ "name": "protobuf", "version>=": "6.33.4#1" }, + { + "name": "reflectcpp", + "version>=": "0.24.0" + }, { "name": "snappy", "version>=": "1.2.2" From aaeb561c5d1672733edefbc04334ad8f2c3b950d Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 1 Jul 2026 15:04:07 -0700 Subject: [PATCH 3/9] st: client shell - PulsarClientBuilder::build(), st ClientImpl, ClientCore First slice of lib/st: the client comes to life. - pulsar::st::ClientImpl wraps a classic pulsar::ClientImpl, reusing its connection pool, executor pools, lookup service and memory limiting wholesale (a scalable topic's segments are ordinary persistent topics broker-side, so the transport stack is unchanged). The scalable-topics machinery (DAG watch, routing, controller sessions) layers on top in later phases; until each lands, the corresponding create/subscribe/transaction call fails cleanly with ResultOperationNotSupported instead of hanging. - PulsarClientBuilder::build() validates the config (serviceUrl required; tlsPolicy.enabled requires a pulsar+ssl:// or https:// URL) and maps the st policy structs onto the classic ClientConfiguration. st TLS defaults are authoritative (validateHostname defaults to true, stricter than the classic default). connection.maxConnectionIdleTime has no classic counterpart yet (TODO). Construction failures surface as an Error, not an exception. - detail::ClientCore methods defined, forwarding to the st ClientImpl. closeAsync() bridges the classic CloseCallback onto the continuation Future; shutdown() tears down immediately. - tests/st/StClientTest.cc: 8 broker-free tests covering builder validation, policy mapping, close/shutdown lifecycle, and the typed builder path (newProducer().create() flows through ProducerBuilder -> ClientCore -> st ClientImpl and back out as a typed Expected error). Verified: 50/50 pulsar-st-tests green (macOS/clang, vcpkg build); clang-format-11 clean. Signed-off-by: Matteo Merli --- lib/st/ClientCore.cc | 50 ++++++++++++++ lib/st/ClientImpl.cc | 78 ++++++++++++++++++++++ lib/st/ClientImpl.h | 65 +++++++++++++++++++ lib/st/PulsarClientBuilder.cc | 119 ++++++++++++++++++++++++++++++++++ tests/st/StClientTest.cc | 102 +++++++++++++++++++++++++++++ 5 files changed, 414 insertions(+) create mode 100644 lib/st/ClientCore.cc create mode 100644 lib/st/ClientImpl.cc create mode 100644 lib/st/ClientImpl.h create mode 100644 lib/st/PulsarClientBuilder.cc create mode 100644 tests/st/StClientTest.cc diff --git a/lib/st/ClientCore.cc b/lib/st/ClientCore.cc new file mode 100644 index 00000000..4da77f3b --- /dev/null +++ b/lib/st/ClientCore.cc @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include + +#include "ClientImpl.h" + +namespace pulsar::st::detail { + +Future ClientCore::createProducerAsync(ProducerConfig config) const { + return impl_->createProducerAsync(std::move(config)); +} + +Future ClientCore::subscribeStreamAsync(StreamConsumerConfig config) const { + return impl_->subscribeStreamAsync(std::move(config)); +} + +Future ClientCore::subscribeQueueAsync(QueueConsumerConfig config) const { + return impl_->subscribeQueueAsync(std::move(config)); +} + +Future ClientCore::createCheckpointConsumerAsync( + CheckpointConsumerConfig config) const { + return impl_->createCheckpointConsumerAsync(std::move(config)); +} + +Future ClientCore::newTransactionAsync() const { return impl_->newTransactionAsync(); } + +Future ClientCore::closeAsync() const { return impl_->closeAsync(); } + +void ClientCore::shutdown() const { impl_->shutdown(); } + +} // namespace pulsar::st::detail diff --git a/lib/st/ClientImpl.cc b/lib/st/ClientImpl.cc new file mode 100644 index 00000000..c971a355 --- /dev/null +++ b/lib/st/ClientImpl.cc @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "ClientImpl.h" + +#include +#include + +namespace pulsar::st { + +namespace { + +// Placeholder for the operations whose lib/st implementation has not landed +// yet: fail the returned future instead of hanging or crashing. Each of these +// turns into the real implementation in its own phase (producer first). +template +Future notImplementedYet(const char* what) { + detail::Promise promise; + promise.setError(Error{ResultOperationNotSupported, + std::string(what) + " is not implemented yet in the scalable-topics client"}); + return promise.getFuture(); +} + +} // namespace + +ClientImpl::ClientImpl(pulsar::ClientImplPtr classicClient, TransactionPolicy transactionPolicy) + : classic_(std::move(classicClient)), transactionPolicy_(std::move(transactionPolicy)) {} + +Future ClientImpl::createProducerAsync(ProducerConfig) { + return notImplementedYet("createProducer"); +} + +Future ClientImpl::subscribeStreamAsync(StreamConsumerConfig) { + return notImplementedYet("subscribeStream"); +} + +Future ClientImpl::subscribeQueueAsync(QueueConsumerConfig) { + return notImplementedYet("subscribeQueue"); +} + +Future ClientImpl::createCheckpointConsumerAsync(CheckpointConsumerConfig) { + return notImplementedYet("createCheckpointConsumer"); +} + +Future ClientImpl::newTransactionAsync() { + return notImplementedYet("newTransaction"); +} + +Future ClientImpl::closeAsync() { + detail::Promise promise; + classic_->closeAsync([promise](pulsar::Result result) { + if (result == pulsar::ResultOk) { + promise.setSuccess(); + } else { + promise.setError(Error{result, "failed to close the client"}); + } + }); + return promise.getFuture(); +} + +void ClientImpl::shutdown() { classic_->shutdown(); } + +} // namespace pulsar::st diff --git a/lib/st/ClientImpl.h b/lib/st/ClientImpl.h new file mode 100644 index 00000000..bbcd9847 --- /dev/null +++ b/lib/st/ClientImpl.h @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "lib/ClientImpl.h" + +namespace pulsar::st { + +/** + * The hidden client behind `detail::ClientCore`. + * + * It owns a classic `pulsar::ClientImpl`, reusing its connection pool, executor + * pools, lookup service and memory limiting wholesale — a scalable topic's + * segments are ordinary persistent topics broker-side, so the underlying + * transport stack is unchanged. The scalable-topics specific machinery (DAG + * watch, segment routing, controller sessions) is layered on top in lib/st, + * phase by phase. + */ +class ClientImpl { + public: + ClientImpl(pulsar::ClientImplPtr classicClient, TransactionPolicy transactionPolicy); + + Future createProducerAsync(ProducerConfig config); + Future subscribeStreamAsync(StreamConsumerConfig config); + Future subscribeQueueAsync(QueueConsumerConfig config); + Future createCheckpointConsumerAsync(CheckpointConsumerConfig config); + Future newTransactionAsync(); + Future closeAsync(); + void shutdown(); + + /** The wrapped classic client (connection pool, executors, lookup). */ + pulsar::ClientImpl& classicClient() { return *classic_; } + + private: + pulsar::ClientImplPtr classic_; + TransactionPolicy transactionPolicy_; // applied when the transaction-coordinator session lands +}; + +} // namespace pulsar::st diff --git a/lib/st/PulsarClientBuilder.cc b/lib/st/PulsarClientBuilder.cc new file mode 100644 index 00000000..79b22e4c --- /dev/null +++ b/lib/st/PulsarClientBuilder.cc @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include +#include +#include + +#include "ClientImpl.h" + +namespace pulsar::st { + +namespace { + +bool hasTlsScheme(const std::string& url) { + return url.rfind("pulsar+ssl://", 0) == 0 || url.rfind("https://", 0) == 0; +} + +} // namespace + +Expected PulsarClientBuilder::build() { + if (serviceUrl_.empty()) { + return unexpected(ResultInvalidUrl, "serviceUrl is required; set it on the builder"); + } + if (tlsPolicy_.enabled && !hasTlsScheme(serviceUrl_)) { + return unexpected( + ResultInvalidConfiguration, + "tlsPolicy.enabled requires a pulsar+ssl:// or https:// service URL, got: " + serviceUrl_); + } + + // Map the scalable-topics policy structs onto the classic client + // configuration: the underlying transport stack (connection pool, executor + // pools, lookup) is reused as-is. Fields left unset keep the classic + // defaults. + pulsar::ClientConfiguration conf; + if (authentication_) { + conf.setAuth(authentication_); + } + const ConnectionPolicy& connection = connectionPolicy_; + if (connection.connectionsPerBroker) { + conf.setConnectionsPerBroker(*connection.connectionsPerBroker); + } + if (connection.connectionTimeout) { + conf.setConnectionTimeout(static_cast(connection.connectionTimeout->count())); + } + if (connection.operationTimeout) { + conf.setOperationTimeoutMs(static_cast(connection.operationTimeout->count())); + } + if (connection.keepAliveInterval) { + conf.setKeepAliveIntervalInSeconds(static_cast(connection.keepAliveInterval->count())); + } + if (connection.maxLookupRequests) { + conf.setConcurrentLookupRequest(*connection.maxLookupRequests); + } + if (connection.maxLookupRedirects) { + conf.setMaxLookupRedirects(*connection.maxLookupRedirects); + } + // TODO(scalable-topics): connection.maxConnectionIdleTime has no classic + // ClientConfiguration counterpart; wire it up when lib/st manages its own + // connection reaping. + if (connection.listenerName) { + conf.setListenerName(*connection.listenerName); + } + if (threadPolicy_.ioThreads) { + conf.setIOThreads(*threadPolicy_.ioThreads); + } + if (threadPolicy_.messageListenerThreads) { + conf.setMessageListenerThreads(*threadPolicy_.messageListenerThreads); + } + if (memoryPolicy_.limit) { + conf.setMemoryLimit(memoryPolicy_.limit->bytes); + } + if (backoffPolicy_.initialBackoff) { + conf.setInitialBackoffIntervalMs(static_cast(backoffPolicy_.initialBackoff->count())); + } + if (backoffPolicy_.maxBackoff) { + conf.setMaxBackoffIntervalMs(static_cast(backoffPolicy_.maxBackoff->count())); + } + if (tlsPolicy_.trustCertsFilePath) { + conf.setTlsTrustCertsFilePath(*tlsPolicy_.trustCertsFilePath); + } + if (tlsPolicy_.certificateFilePath) { + conf.setTlsCertificateFilePath(*tlsPolicy_.certificateFilePath); + } + if (tlsPolicy_.privateKeyFilePath) { + conf.setTlsPrivateKeyFilePath(*tlsPolicy_.privateKeyFilePath); + } + conf.setTlsAllowInsecureConnection(tlsPolicy_.allowInsecureConnection); + conf.setValidateHostName(tlsPolicy_.validateHostname); + + try { + auto classic = std::make_shared(serviceUrl_, conf); + classic->initialize(); + auto impl = std::make_shared(std::move(classic), transactionPolicy_); + return PulsarClient(detail::ClientCore(std::move(impl))); + } catch (const std::exception& e) { + return unexpected(ResultInvalidUrl, std::string("failed to initialize the client: ") + e.what()); + } +} + +} // namespace pulsar::st diff --git a/tests/st/StClientTest.cc b/tests/st/StClientTest.cc new file mode 100644 index 00000000..839b7ee1 --- /dev/null +++ b/tests/st/StClientTest.cc @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include + +// Broker-free tests for PulsarClientBuilder::build() and the client shell: they +// validate configuration mapping and lifecycle, never connecting anywhere. + +using namespace pulsar::st; + +TEST(StClientTest, testEmptyServiceUrlIsRejected) { + auto client = PulsarClient::builder().build(); + ASSERT_FALSE(client); + ASSERT_EQ(client.error().result, ResultInvalidUrl); +} + +TEST(StClientTest, testTlsEnabledRequiresTlsScheme) { + auto client = + PulsarClient::builder().serviceUrl("pulsar://localhost:6650").tlsPolicy({.enabled = true}).build(); + ASSERT_FALSE(client); + ASSERT_EQ(client.error().result, ResultInvalidConfiguration); +} + +TEST(StClientTest, testTlsEnabledAcceptsTlsScheme) { + auto client = PulsarClient::builder() + .serviceUrl("pulsar+ssl://localhost:6651") + .tlsPolicy({.enabled = true, .allowInsecureConnection = true}) + .build(); + ASSERT_TRUE(client); + ASSERT_TRUE(client->close()); +} + +TEST(StClientTest, testBuildAndCloseWithoutBroker) { + auto client = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build(); + ASSERT_TRUE(client); + ASSERT_TRUE(static_cast(*client)); + + auto closed = client->close(); + ASSERT_TRUE(closed); +} + +TEST(StClientTest, testBuildWithPoliciesWithoutBroker) { + using namespace std::chrono_literals; + auto client = PulsarClient::builder() + .serviceUrl("pulsar://localhost:6650") + .connectionPolicy({.connectionsPerBroker = 2, + .connectionTimeout = 5000ms, + .operationTimeout = 20000ms, + .keepAliveInterval = 30s}) + .threadPolicy({.ioThreads = 2, .messageListenerThreads = 2}) + .memoryPolicy({.limit = MemorySize::ofMiB(64)}) + .backoffPolicy({.initialBackoff = 100ms, .maxBackoff = 10s}) + .build(); + ASSERT_TRUE(client); + ASSERT_TRUE(client->close()); +} + +TEST(StClientTest, testProducerCreationReportsNotSupportedYet) { + auto client = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build(); + ASSERT_TRUE(client); + + auto producer = client->newProducer().topic("st-topic").create(); + ASSERT_FALSE(producer); + ASSERT_EQ(producer.error().result, ResultOperationNotSupported); + + ASSERT_TRUE(client->close()); +} + +TEST(StClientTest, testTransactionReportsNotSupportedYet) { + auto client = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build(); + ASSERT_TRUE(client); + + auto txn = client->newTransaction(); + ASSERT_FALSE(txn); + ASSERT_EQ(txn.error().result, ResultOperationNotSupported); + + ASSERT_TRUE(client->close()); +} + +TEST(StClientTest, testShutdownWithoutBroker) { + auto client = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build(); + ASSERT_TRUE(client); + client->shutdown(); // immediate teardown must not crash or hang +} From c86cbe0a872d27c154d1e27f6476fc868a240788 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 1 Jul 2026 15:18:33 -0700 Subject: [PATCH 4/9] st: segment layout and router The client-side model of a scalable topic's segment layout and the key -> segment router, ported from the Java v5 client (ClientSegmentLayout / SegmentRouter / ScalableTopicHashing) so the two clients route identically. - ScalableTopicHashing: one raw (unmasked) 32-bit Murmur3 hash per key, split into two independent 16-bit halves - high half routes segments, low half routes entry-buckets (PIP-486). Murmur3_32Hash grows a makeRawHash() accessor (the classic makeHash clears bit 31, which would confine the high half to [0, 0x7FFF]), mirroring the same addition on the Java class. - SegmentLayout::fromProto builds an immutable per-epoch view from the broker's ScalableTopicDAG: active segments sorted by hash-range start, sealed segments, per-segment broker addresses (plain + TLS), controller address, legacy-segment markers and entry-bucket splits. Segment topics are computed as segment://tenant/ns/topic/, matching SegmentTopicName.formatDescriptor. Malformed DAGs (bad scheme, out-of-bounds ranges) surface as Errors. - SegmentRouter: keyed messages route by the 16-bit segment hash to the active segment whose inclusive range contains it; when EVERY active segment is legacy (synthetic layout for a not-yet-migrated regular topic) routing switches to signSafeMod(classicMurmur3(key), N) over segment_id, matching classic partitioned-topic producers exactly; keyless messages round-robin. No-active-segments and uncovered-hash conditions are typed Errors, not exceptions. - tests/st/SegmentLayoutTest.cc: 14 tests porting the Java test semantics - hash split/unmasked-raw properties, layout parse (sorting, URIs, brokers, controller, legacy, splits), malformed-DAG rejection, keyed/deterministic/ single-segment/uncovered/empty routing, all-legacy mod-N parity against the classic hash, mixed-layout range routing, and round-robin coverage. Verified: 64/64 pulsar-st-tests green; pulsarShared still links; gcc-13 parity with the repo warning flags; clang-format-11 clean. Signed-off-by: Matteo Merli --- lib/Murmur3_32Hash.cc | 2 + lib/Murmur3_32Hash.h | 5 + lib/st/SegmentLayout.cc | 178 ++++++++++++++++++++ lib/st/SegmentLayout.h | 190 +++++++++++++++++++++ tests/BuildTests.cmake | 1 + tests/st/SegmentLayoutTest.cc | 303 ++++++++++++++++++++++++++++++++++ 6 files changed, 679 insertions(+) create mode 100644 lib/st/SegmentLayout.cc create mode 100644 lib/st/SegmentLayout.h create mode 100644 tests/st/SegmentLayoutTest.cc diff --git a/lib/Murmur3_32Hash.cc b/lib/Murmur3_32Hash.cc index 45a49885..dff6aedc 100644 --- a/lib/Murmur3_32Hash.cc +++ b/lib/Murmur3_32Hash.cc @@ -60,6 +60,8 @@ int32_t Murmur3_32Hash::makeHash(const std::string &key) { return static_cast(makeHash(&key.front(), key.length()) & std::numeric_limits::max()); } +uint32_t Murmur3_32Hash::makeRawHash(const void *key, int64_t len) { return makeHash(key, len); } + uint32_t Murmur3_32Hash::makeHash(const void *key, const int64_t len) { const uint8_t *data = reinterpret_cast(key); const int nblocks = len / MACRO_CHUNK_SIZE; diff --git a/lib/Murmur3_32Hash.h b/lib/Murmur3_32Hash.h index d83635d2..d293489c 100644 --- a/lib/Murmur3_32Hash.h +++ b/lib/Murmur3_32Hash.h @@ -39,6 +39,11 @@ class PULSAR_PUBLIC Murmur3_32Hash : public Hash { int32_t makeHash(const std::string& key); + // The raw (unmasked) 32-bit hash. Unlike makeHash(), bit 31 is NOT cleared; + // the scalable-topics key hashing splits this into two independent 16-bit + // halves and needs the high half full-range. + uint32_t makeRawHash(const void* key, int64_t len); + private: uint32_t seed; diff --git a/lib/st/SegmentLayout.cc b/lib/st/SegmentLayout.cc new file mode 100644 index 00000000..349c41fc --- /dev/null +++ b/lib/st/SegmentLayout.cc @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "SegmentLayout.h" + +#include +#include +#include + +#include "PulsarApi.pb.h" +#include "lib/Murmur3_32Hash.h" + +namespace pulsar::st { + +namespace { + +constexpr std::string_view kTopicScheme = "topic://"; +constexpr std::string_view kSegmentScheme = "segment://"; + +// "--" with the range bounds as 4-digit hex, matching +// SegmentTopicName.formatDescriptor in the Java client/broker. +std::string segmentDescriptor(const HashRange& range, std::uint64_t segmentId) { + char buf[64]; + std::snprintf(buf, sizeof(buf), "%04x-%04x-%llu", range.start, range.end, + static_cast(segmentId)); + return buf; +} + +int signSafeMod(int dividend, int divisor) { + int mod = dividend % divisor; + return mod < 0 ? mod + divisor : mod; +} + +} // namespace + +std::uint32_t ScalableTopicHashing::murmur(std::string_view key) { + return pulsar::Murmur3_32Hash().makeRawHash(key.data(), static_cast(key.size())); +} + +Expected SegmentLayout::fromProto(const pulsar::proto::ScalableTopicDAG& dag, + const std::string& resolvedTopicName) { + if (resolvedTopicName.rfind(kTopicScheme, 0) != 0) { + return unexpected( + ResultInvalidTopicName, + "resolved scalable-topic name must use the topic:// scheme, got: " + resolvedTopicName); + } + // "topic://tenant/ns/topic" -> "tenant/ns/topic"; the segment topics live at + // "segment://tenant/ns/topic/". + const std::string topicPath = resolvedTopicName.substr(kTopicScheme.size()); + + SegmentLayout layout; + layout.epoch_ = dag.epoch(); + + for (int i = 0; i < dag.segment_brokers_size(); i++) { + const auto& addr = dag.segment_brokers(i); + BrokerAddress broker; + broker.url = addr.broker_url(); + if (addr.has_broker_url_tls()) { + broker.urlTls = addr.broker_url_tls(); + } + layout.segmentBrokers_[addr.segment_id()] = std::move(broker); + } + + for (int i = 0; i < dag.segments_size(); i++) { + const auto& seg = dag.segments(i); + if (seg.hash_start() > HashRange::kMaxHash || seg.hash_end() > HashRange::kMaxHash || + seg.hash_end() < seg.hash_start()) { + return unexpected(ResultUnknownError, + "malformed DAG: segment " + std::to_string(seg.segment_id()) + + " has an invalid hash range [" + std::to_string(seg.hash_start()) + ", " + + std::to_string(seg.hash_end()) + "]"); + } + + Segment segment; + segment.segmentId = seg.segment_id(); + segment.range = HashRange{seg.hash_start(), seg.hash_end()}; + segment.segmentTopicName = std::string(kSegmentScheme) + topicPath + "/" + + segmentDescriptor(segment.range, seg.segment_id()); + if (seg.has_legacy_topic_name()) { + segment.legacyTopicName = seg.legacy_topic_name(); + } + segment.entryBucketSplits.reserve(seg.entry_bucket_splits_size()); + for (int j = 0; j < seg.entry_bucket_splits_size(); j++) { + segment.entryBucketSplits.push_back(seg.entry_bucket_splits(j)); + } + + if (seg.state() == pulsar::proto::ACTIVE) { + layout.activeSegments_.push_back(std::move(segment)); + } else { + layout.sealedSegments_.push_back(std::move(segment)); + } + } + + // Active segments sort by hash-range start (the routing order); sealed order + // doesn't matter for correctness, sort by id for stable iteration. + std::sort(layout.activeSegments_.begin(), layout.activeSegments_.end(), + [](const Segment& a, const Segment& b) { return a.range.start < b.range.start; }); + std::sort(layout.sealedSegments_.begin(), layout.sealedSegments_.end(), + [](const Segment& a, const Segment& b) { return a.segmentId < b.segmentId; }); + + if (dag.has_controller_broker_url()) { + layout.controllerBrokerUrl_ = dag.controller_broker_url(); + } + if (dag.has_controller_broker_url_tls()) { + layout.controllerBrokerUrlTls_ = dag.controller_broker_url_tls(); + } + return layout; +} + +const std::string* SegmentLayout::brokerUrl(std::uint64_t segmentId) const { + auto it = segmentBrokers_.find(segmentId); + return it != segmentBrokers_.end() ? &it->second.url : nullptr; +} + +const std::string* SegmentLayout::brokerUrlTls(std::uint64_t segmentId) const { + auto it = segmentBrokers_.find(segmentId); + return (it != segmentBrokers_.end() && it->second.urlTls) ? &*it->second.urlTls : nullptr; +} + +Expected SegmentRouter::route(std::string_view key, const SegmentLayout& layout) { + const auto& active = layout.activeSegments(); + if (active.empty()) { + return unexpected(ResultServiceUnitNotReady, "no active segments in the topic layout"); + } + + // Synthetic layout for a not-yet-migrated regular topic: route + // signSafeMod(classicMurmur3(key), N) over segment_id, exactly like the + // classic partitioned-topic producers still attached to the same topic. + const bool allLegacy = + std::all_of(active.begin(), active.end(), [](const Segment& s) { return s.isLegacy(); }); + if (allLegacy) { + const int hash32 = pulsar::Murmur3_32Hash().makeHash(std::string(key)); + const int partition = signSafeMod(hash32, static_cast(active.size())); + for (const auto& segment : active) { + if (segment.segmentId == static_cast(partition)) { + return segment.segmentId; + } + } + return unexpected(ResultUnknownError, + "synthetic layout is missing segment_id=" + std::to_string(partition) + + " (N=" + std::to_string(active.size()) + ")"); + } + + const std::uint32_t hash = ScalableTopicHashing::segmentHash(ScalableTopicHashing::murmur(key)); + for (const auto& segment : active) { + if (segment.range.contains(hash)) { + return segment.segmentId; + } + } + return unexpected(ResultUnknownError, + "no active segment covers hash " + std::to_string(hash) + " for the message key"); +} + +Expected SegmentRouter::routeRoundRobin(const SegmentLayout& layout) { + const auto& active = layout.activeSegments(); + if (active.empty()) { + return unexpected(ResultServiceUnitNotReady, "no active segments in the topic layout"); + } + const std::uint32_t index = roundRobinCounter_.fetch_add(1, std::memory_order_relaxed); + return active[index % active.size()].segmentId; +} + +} // namespace pulsar::st diff --git a/lib/st/SegmentLayout.h b/lib/st/SegmentLayout.h new file mode 100644 index 00000000..d484997c --- /dev/null +++ b/lib/st/SegmentLayout.h @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +// The client-side model of a scalable topic's segment layout, built from the +// ScalableTopicDAG the broker sends on the DAG-watch session, plus the key -> +// segment router. Ported from the Java v5 client (ClientSegmentLayout / +// SegmentRouter / ScalableTopicHashing) so the two clients route identically. + +namespace pulsar { +namespace proto { +class ScalableTopicDAG; +} +} // namespace pulsar + +namespace pulsar::st { + +/** + * Scalable-topic key hashing — the single source of truth for both segment + * routing and entry-bucketing. + * + * A single raw (unmasked) 32-bit Murmur3 hash of a key splits into two + * independent 16-bit halves: the HIGH half routes segments, the LOW half + * routes entry-buckets. The raw hash is required so the high half is + * full-range (the classic masked hash clears bit 31, which would confine the + * high half to [0, 0x7FFF]). Compute murmur() once per key and split it. + */ +struct ScalableTopicHashing { + /** The raw 32-bit Murmur3 hash of a key. */ + static std::uint32_t murmur(std::string_view key); + + /** The 16-bit segment-routing hash (high 16 bits) of a precomputed murmur(). */ + static std::uint32_t segmentHash(std::uint32_t murmur) { return (murmur >> 16) & 0xFFFFu; } + + /** The 16-bit entry-bucket hash (low 16 bits) of a precomputed murmur(). */ + static std::uint32_t entryBucketHash(std::uint32_t murmur) { return murmur & 0xFFFFu; } +}; + +/** An inclusive hash range [start, end] within the 16-bit hash space (0x0000-0xFFFF). */ +struct HashRange { + static constexpr std::uint32_t kMinHash = 0x0000; + static constexpr std::uint32_t kMaxHash = 0xFFFF; + + std::uint32_t start = kMinHash; + std::uint32_t end = kMaxHash; + + bool contains(std::uint32_t hash) const { return hash >= start && hash <= end; } + bool operator==(const HashRange&) const = default; +}; + +/** + * One segment of a scalable topic, as seen by the client: its identity, the + * hash range it owns, and the topic its per-segment producer/consumer attaches + * to. + * + * `legacyTopicName` is set for legacy segments — entries in a synthetic layout + * that wrap an existing, externally managed persistent:// topic (e.g. one + * partition of a not-yet-migrated regular topic). Those attach to the wrapped + * topic; regular controller-managed segments attach to the computed + * segment://... topic. + */ +struct Segment { + std::uint64_t segmentId = 0; + HashRange range; + /** The computed segment://tenant/ns/topic/xxxx-xxxx-id topic. */ + std::string segmentTopicName; + /** Wrapped persistent:// topic for legacy segments; unset otherwise. */ + std::optional legacyTopicName; + /** Entry-bucket split points within the segment (empty = one bucket). */ + std::vector entryBucketSplits; + + bool isLegacy() const { return legacyTopicName.has_value(); } + + /** The topic a per-segment producer/consumer attaches to. */ + const std::string& attachTopicName() const { + return legacyTopicName ? *legacyTopicName : segmentTopicName; + } +}; + +/** + * Immutable client-side view of a scalable topic's segment layout at one DAG + * epoch. Built from the ScalableTopicDAG the broker returns on the DAG-watch + * session; consumers of this class swap whole layouts as updates arrive (epoch + * monotonicity is enforced by the watch session, not here). + */ +class SegmentLayout { + public: + /** An empty layout: epoch 0, no segments. */ + SegmentLayout() = default; + + /** + * Build a layout from the broker-provided DAG. + * + * @param dag the DAG snapshot from CommandScalableTopicUpdate. + * @param resolvedTopicName the canonical "topic://tenant/ns/topic" identity + * (CommandScalableTopicUpdate.resolved_topic_name); used to compute + * the per-segment segment:// topic names. + * @return the layout, or an Error if the DAG is malformed (bad topic scheme + * or an out-of-bounds hash range). + */ + static Expected fromProto(const pulsar::proto::ScalableTopicDAG& dag, + const std::string& resolvedTopicName); + + /** The DAG generation this layout represents. */ + std::uint64_t epoch() const { return epoch_; } + + /** Active segments, sorted by hash-range start (the routing order). */ + const std::vector& activeSegments() const { return activeSegments_; } + + /** Sealed segments still present in the DAG (finite, eventually drained), sorted by id. */ + const std::vector& sealedSegments() const { return sealedSegments_; } + + /** The broker serving a segment, or nullptr if the DAG did not name one. */ + const std::string* brokerUrl(std::uint64_t segmentId) const; + /** The TLS address of the broker serving a segment, or nullptr if absent. */ + const std::string* brokerUrlTls(std::uint64_t segmentId) const; + + /** The controller leader's address, when the DAG carries one. */ + const std::optional& controllerBrokerUrl() const { return controllerBrokerUrl_; } + const std::optional& controllerBrokerUrlTls() const { return controllerBrokerUrlTls_; } + + private: + struct BrokerAddress { + std::string url; + std::optional urlTls; + }; + + std::uint64_t epoch_ = 0; + std::vector activeSegments_; + std::vector sealedSegments_; + std::unordered_map segmentBrokers_; + std::optional controllerBrokerUrl_; + std::optional controllerBrokerUrlTls_; +}; + +/** + * Routes messages to segments. + * + * Keyed messages route by the 16-bit segment hash to the active segment whose + * range contains it. If EVERY active segment is a legacy segment (synthetic + * layout for a not-yet-migrated regular topic), routing switches to + * signSafeMod(classicMurmur3(key), N) over segment_id, so producers using this + * client route exactly like the classic partitioned-topic producers still + * attached to the same topic. Keyless messages route round-robin. + */ +class SegmentRouter { + public: + /** + * Route a keyed message. + * + * @return the owning segment's id, or an Error if the layout has no active + * segments (ResultServiceUnitNotReady) or no active segment covers + * the hash — a malformed DAG (ResultUnknownError). + */ + Expected route(std::string_view key, const SegmentLayout& layout); + + /** Route a keyless message round-robin across the active segments. */ + Expected routeRoundRobin(const SegmentLayout& layout); + + private: + std::atomic roundRobinCounter_{0}; +}; + +} // namespace pulsar::st diff --git a/tests/BuildTests.cmake b/tests/BuildTests.cmake index db44f6aa..45dd84f4 100644 --- a/tests/BuildTests.cmake +++ b/tests/BuildTests.cmake @@ -65,4 +65,5 @@ target_link_libraries(ExtensibleLoadManagerTest PRIVATE pulsarStatic ${GTEST_TAR file(GLOB ST_TEST_SOURCES st/*.cc) add_executable(pulsar-st-tests ${ST_TEST_SOURCES}) set_target_properties(pulsar-st-tests PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON) +target_include_directories(pulsar-st-tests PRIVATE ${AUTOGEN_DIR}/lib) target_link_libraries(pulsar-st-tests PRIVATE pulsarStatic ${GTEST_TARGETS}) diff --git a/tests/st/SegmentLayoutTest.cc b/tests/st/SegmentLayoutTest.cc new file mode 100644 index 00000000..b4192ac9 --- /dev/null +++ b/tests/st/SegmentLayoutTest.cc @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include +#include + +#include "PulsarApi.pb.h" +#include "lib/Murmur3_32Hash.h" +#include "lib/st/SegmentLayout.h" + +using namespace pulsar::st; +namespace proto = pulsar::proto; + +namespace { + +const std::string kTopic = "topic://public/default/orders"; + +void addSegment(proto::ScalableTopicDAG& dag, std::uint64_t id, std::uint32_t start, std::uint32_t end, + proto::SegmentState state, const std::string& legacyTopic = {}) { + auto* seg = dag.add_segments(); + seg->set_segment_id(id); + seg->set_hash_start(start); + seg->set_hash_end(end); + seg->set_state(state); + seg->set_created_at_epoch(1); + seg->set_created_at_ms(1000); + if (!legacyTopic.empty()) { + seg->set_legacy_topic_name(legacyTopic); + } +} + +void addBroker(proto::ScalableTopicDAG& dag, std::uint64_t segmentId, const std::string& url, + const std::string& urlTls = {}) { + auto* addr = dag.add_segment_brokers(); + addr->set_segment_id(segmentId); + addr->set_broker_url(url); + if (!urlTls.empty()) { + addr->set_broker_url_tls(urlTls); + } +} + +// A deterministic key whose 16-bit segment hash falls within [start, end]. +std::string findKeyInRange(std::uint32_t start, std::uint32_t end) { + for (int i = 0; i < 1000000; i++) { + std::string key = "key-" + std::to_string(i); + std::uint32_t hash = ScalableTopicHashing::segmentHash(ScalableTopicHashing::murmur(key)); + if (hash >= start && hash <= end) { + return key; + } + } + ADD_FAILURE() << "no key found in range [" << start << ", " << end << "]"; + return {}; +} + +} // namespace + +TEST(ScalableTopicHashingTest, testHashSplitsIntoIndependentHalves) { + const std::string key = "some-routing-key"; + std::uint32_t murmur = ScalableTopicHashing::murmur(key); + std::uint32_t segment = ScalableTopicHashing::segmentHash(murmur); + std::uint32_t bucket = ScalableTopicHashing::entryBucketHash(murmur); + ASSERT_LE(segment, 0xFFFFu); + ASSERT_LE(bucket, 0xFFFFu); + ASSERT_EQ((segment << 16) | bucket, murmur); +} + +TEST(ScalableTopicHashingTest, testRawHashIsUnmasked) { + // The classic makeHash clears bit 31; the raw hash must not, or the high + // half of the split would be confined to [0, 0x7FFF]. Find a key where the + // two differ, proving the raw path is in use. + bool foundHighBit = false; + for (int i = 0; i < 100000 && !foundHighBit; i++) { + std::string key = "key-" + std::to_string(i); + foundHighBit = (ScalableTopicHashing::murmur(key) & 0x80000000u) != 0; + } + ASSERT_TRUE(foundHighBit); +} + +TEST(SegmentLayoutTest, testFromProtoBuildsSortedLayout) { + proto::ScalableTopicDAG dag; + dag.set_epoch(7); + // Add out of order to verify sorting by range start. + addSegment(dag, 2, 0x8000, 0xFFFF, proto::ACTIVE); + addSegment(dag, 1, 0x0000, 0x7FFF, proto::ACTIVE); + addSegment(dag, 0, 0x0000, 0xFFFF, proto::SEALED); + addBroker(dag, 1, "pulsar://broker1:6650", "pulsar+ssl://broker1:6651"); + addBroker(dag, 2, "pulsar://broker2:6650"); + dag.set_controller_broker_url("pulsar://controller:6650"); + + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + ASSERT_EQ(layout->epoch(), 7u); + + ASSERT_EQ(layout->activeSegments().size(), 2u); + ASSERT_EQ(layout->activeSegments()[0].segmentId, 1u); // sorted by range start + ASSERT_EQ(layout->activeSegments()[1].segmentId, 2u); + ASSERT_EQ(layout->activeSegments()[0].range, (HashRange{0x0000, 0x7FFF})); + ASSERT_EQ(layout->activeSegments()[0].segmentTopicName, "segment://public/default/orders/0000-7fff-1"); + ASSERT_EQ(layout->activeSegments()[1].segmentTopicName, "segment://public/default/orders/8000-ffff-2"); + ASSERT_FALSE(layout->activeSegments()[0].isLegacy()); + ASSERT_EQ(layout->activeSegments()[0].attachTopicName(), layout->activeSegments()[0].segmentTopicName); + + ASSERT_EQ(layout->sealedSegments().size(), 1u); + ASSERT_EQ(layout->sealedSegments()[0].segmentId, 0u); + + ASSERT_NE(layout->brokerUrl(1), nullptr); + ASSERT_EQ(*layout->brokerUrl(1), "pulsar://broker1:6650"); + ASSERT_NE(layout->brokerUrlTls(1), nullptr); + ASSERT_EQ(*layout->brokerUrlTls(1), "pulsar+ssl://broker1:6651"); + ASSERT_EQ(layout->brokerUrlTls(2), nullptr); + ASSERT_EQ(layout->brokerUrl(42), nullptr); + + ASSERT_TRUE(layout->controllerBrokerUrl().has_value()); + ASSERT_EQ(*layout->controllerBrokerUrl(), "pulsar://controller:6650"); + ASSERT_FALSE(layout->controllerBrokerUrlTls().has_value()); +} + +TEST(SegmentLayoutTest, testFromProtoParsesLegacyAndBucketSplits) { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 0, 0x0000, 0xFFFF, proto::ACTIVE, "persistent://public/default/orders-partition-0"); + auto* seg = dag.mutable_segments(0); + seg->add_entry_bucket_splits(0x4000); + seg->add_entry_bucket_splits(0x8000); + + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + const Segment& segment = layout->activeSegments()[0]; + ASSERT_TRUE(segment.isLegacy()); + ASSERT_EQ(segment.attachTopicName(), "persistent://public/default/orders-partition-0"); + ASSERT_EQ(segment.entryBucketSplits, (std::vector{0x4000, 0x8000})); +} + +TEST(SegmentLayoutTest, testFromProtoRejectsBadTopicScheme) { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + auto layout = SegmentLayout::fromProto(dag, "persistent://public/default/orders"); + ASSERT_FALSE(layout); + ASSERT_EQ(layout.error().result, pulsar::ResultInvalidTopicName); +} + +TEST(SegmentLayoutTest, testFromProtoRejectsMalformedHashRange) { + { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 0, 0x8000, 0x7FFF, proto::ACTIVE); // end < start + ASSERT_FALSE(SegmentLayout::fromProto(dag, kTopic)); + } + { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 0, 0x0000, 0x10000, proto::ACTIVE); // end > 16-bit space + ASSERT_FALSE(SegmentLayout::fromProto(dag, kTopic)); + } +} + +namespace { + +Expected fourSegmentLayout() { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 0, 0x0000, 0x3FFF, proto::ACTIVE); + addSegment(dag, 1, 0x4000, 0x7FFF, proto::ACTIVE); + addSegment(dag, 2, 0x8000, 0xBFFF, proto::ACTIVE); + addSegment(dag, 3, 0xC000, 0xFFFF, proto::ACTIVE); + return SegmentLayout::fromProto(dag, kTopic); +} + +} // namespace + +TEST(SegmentRouterTest, testKeyedRoutingHitsOwningRange) { + auto layout = fourSegmentLayout(); + ASSERT_TRUE(layout); + SegmentRouter router; + + for (int i = 0; i < 200; i++) { + std::string key = "order-" + std::to_string(i); + auto id = router.route(key, *layout); + ASSERT_TRUE(id); + std::uint32_t hash = ScalableTopicHashing::segmentHash(ScalableTopicHashing::murmur(key)); + const Segment& segment = layout->activeSegments()[*id]; // ids == sorted positions here + ASSERT_TRUE(segment.range.contains(hash)) << "key " << key << " hash " << hash; + } +} + +TEST(SegmentRouterTest, testKeyedRoutingIsDeterministic) { + auto layout = fourSegmentLayout(); + ASSERT_TRUE(layout); + SegmentRouter router; + for (int i = 0; i < 20; i++) { + std::string key = "stable-key-" + std::to_string(i); + auto first = router.route(key, *layout); + auto second = router.route(key, *layout); + ASSERT_TRUE(first); + ASSERT_TRUE(second); + ASSERT_EQ(*first, *second); + } +} + +TEST(SegmentRouterTest, testSingleFullRangeSegmentGetsEverything) { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 9, 0x0000, 0xFFFF, proto::ACTIVE); + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + SegmentRouter router; + for (int i = 0; i < 50; i++) { + auto id = router.route("k" + std::to_string(i), *layout); + ASSERT_TRUE(id); + ASSERT_EQ(*id, 9u); + } +} + +TEST(SegmentRouterTest, testNoActiveSegmentsIsAnError) { + SegmentLayout empty; + SegmentRouter router; + auto id = router.route("key", empty); + ASSERT_FALSE(id); + ASSERT_EQ(id.error().result, pulsar::ResultServiceUnitNotReady); + auto rr = router.routeRoundRobin(empty); + ASSERT_FALSE(rr); +} + +TEST(SegmentRouterTest, testUncoveredHashIsAnError) { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 0, 0x0000, 0x7FFF, proto::ACTIVE); // gap: [0x8000, 0xFFFF] uncovered + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + SegmentRouter router; + std::string uncoveredKey = findKeyInRange(0x8000, 0xFFFF); + auto id = router.route(uncoveredKey, *layout); + ASSERT_FALSE(id); + ASSERT_EQ(id.error().result, pulsar::ResultUnknownError); +} + +TEST(SegmentRouterTest, testAllLegacyRoutesModNLikeClassicPartitions) { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + const int n = 4; + for (int i = 0; i < n; i++) { + addSegment(dag, i, i * 0x4000, (i + 1) * 0x4000 - 1, proto::ACTIVE, + "persistent://public/default/orders-partition-" + std::to_string(i)); + } + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + SegmentRouter router; + + for (int i = 0; i < 100; i++) { + std::string key = "legacy-key-" + std::to_string(i); + auto id = router.route(key, *layout); + ASSERT_TRUE(id); + // Must match the classic partitioned-topic routing exactly. + int expected = pulsar::Murmur3_32Hash().makeHash(key) % n; + ASSERT_EQ(*id, static_cast(expected)) << "key " << key; + } +} + +TEST(SegmentRouterTest, testMixedLegacyAndRegularUsesRangeRouting) { + proto::ScalableTopicDAG dag; + dag.set_epoch(2); + addSegment(dag, 0, 0x0000, 0x7FFF, proto::ACTIVE, "persistent://public/default/orders-partition-0"); + addSegment(dag, 1, 0x8000, 0xFFFF, proto::ACTIVE); // one regular segment -> range routing + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + SegmentRouter router; + + std::string keyInUpperHalf = findKeyInRange(0x8000, 0xFFFF); + auto id = router.route(keyInUpperHalf, *layout); + ASSERT_TRUE(id); + ASSERT_EQ(*id, 1u); +} + +TEST(SegmentRouterTest, testRoundRobinCyclesAllActiveSegments) { + auto layout = fourSegmentLayout(); + ASSERT_TRUE(layout); + SegmentRouter router; + std::set seen; + for (int i = 0; i < 8; i++) { + auto id = router.routeRoundRobin(*layout); + ASSERT_TRUE(id); + seen.insert(*id); + } + ASSERT_EQ(seen.size(), 4u); // every active segment hit +} From 5598c9dae607f31a3bc6f5b06bd8760ebdd08d49 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 1 Jul 2026 15:29:31 -0700 Subject: [PATCH 5/9] st: wire plumbing for the DAG-watch session Teach the shared transport layer the scalable-topics session protocol; the lib/st DagWatchSession builds on these hooks next. - Commands: newScalableTopicLookup(sessionId, topic, createIfMissing) and newScalableTopicClose(sessionId) factories. newConnect() now advertises supports_scalable_topics - safe on shared connections, since the broker only sends scalable commands on sessions the client explicitly opens. messageType() gains names for all twelve scalable command types; it runs on every incoming command and throws on unknown enum values, so this is required before the client can receive any scalable push. - ClientConnection: a DAG-watch session registry keyed by the client-assigned session id. registerScalableTopicSession() refuses (returns false) on an already-closed connection; SCALABLE_TOPIC_UPDATE dispatches to the registered listener by session_id (an unknown session logs and drops the push - it may race a just-closed session - rather than treating it as a protocol violation, which would kill the shared connection); close() notifies every registered session with (error, nullptr) so it can re-establish on a new connection. - tests/st/ScalableCommandsTest.cc: frame-level roundtrips of both factories (unwrap [totalSize][cmdSize][BaseCommand], reparse, verify fields) and messageType() coverage of all twelve new types. Verified: 68/68 pulsar-st-tests green; pulsarShared/pulsarStatic and the classic pulsar-tests target compile clean; clang-format-11 clean. Signed-off-by: Matteo Merli --- lib/ClientConnection.cc | 43 +++++++++++++++ lib/ClientConnection.h | 20 +++++++ lib/Commands.cc | 56 +++++++++++++++++++ lib/Commands.h | 7 +++ tests/st/ScalableCommandsTest.cc | 93 ++++++++++++++++++++++++++++++++ 5 files changed, 219 insertions(+) create mode 100644 tests/st/ScalableCommandsTest.cc diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 95597034..668245d3 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -988,6 +988,10 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { handleAckResponse(incomingCmd.ackresponse()); break; + case BaseCommand::SCALABLE_TOPIC_UPDATE: + handleScalableTopicUpdate(incomingCmd.scalabletopicupdate()); + break; + default: LOG_WARN(cnxString() << "Received invalid message from server"); close(Error{ResultDisconnected, cnxString() + "Received invalid message from server"}); @@ -1267,6 +1271,7 @@ const std::future& ClientConnection::close(Error&& error, bool switchClust auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_); auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_); auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_); + auto scalableTopicSessions = std::move(scalableTopicSessions_); numOfPendingLookupRequest_ = 0; @@ -1341,6 +1346,11 @@ const std::future& ClientConnection::close(Error&& error, bool switchClust for (auto& kv : pendingLookupRequests) { kv.second->fail(error); } + // Notify scalable-topic DAG-watch sessions so they can re-establish on a + // new connection. + for (auto& kv : scalableTopicSessions) { + kv.second(error.result, nullptr); + } for (auto& kv : pendingConsumerStatsMap) { LOG_ERROR(cnxString() << " Closing Client Connection, please try again later"); kv.second.setFailed(result); @@ -1379,6 +1389,39 @@ void ClientConnection::removeConsumer(int consumerId) { consumers_.erase(consumerId); } +bool ClientConnection::registerScalableTopicSession(uint64_t sessionId, + ScalableTopicUpdateListener listener) { + Lock lock(mutex_); + if (isClosed()) { + return false; + } + scalableTopicSessions_[sessionId] = std::move(listener); + return true; +} + +void ClientConnection::removeScalableTopicSession(uint64_t sessionId) { + Lock lock(mutex_); + scalableTopicSessions_.erase(sessionId); +} + +void ClientConnection::handleScalableTopicUpdate(const proto::CommandScalableTopicUpdate& update) { + ScalableTopicUpdateListener listener; + { + Lock lock(mutex_); + auto it = scalableTopicSessions_.find(update.session_id()); + if (it != scalableTopicSessions_.end()) { + listener = it->second; + } + } + if (listener) { + listener(ResultOk, &update); + } else { + // A push may race with a just-closed session; drop it rather than + // treating it as a protocol violation. + LOG_WARN(cnxString() << "Received SCALABLE_TOPIC_UPDATE for unknown session " << update.session_id()); + } +} + const std::string& ClientConnection::brokerAddress() const { return physicalAddress_; } int ClientConnection::getServerProtocolVersion() const { return serverProtocolVersion_; } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index fd89fae5..22d5ff38 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -105,6 +105,7 @@ class CommandGetLastMessageIdResponse; class CommandLookupTopicResponse; class CommandPartitionedTopicMetadataResponse; class CommandProducerSuccess; +class CommandScalableTopicUpdate; class CommandSendReceipt; class CommandSendError; class CommandSuccess; @@ -187,6 +188,20 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this ScalableTopicUpdateListener; + + /** + * Register a DAG-watch session. Returns false (without registering) if the + * connection is already closed — the caller should acquire a new connection. + */ + bool registerScalableTopicSession(uint64_t sessionId, ScalableTopicUpdateListener listener); + void removeScalableTopicSession(uint64_t sessionId); + /** * Send a request with a specific Id over the connection. The future will be * triggered when the response for this request is received @@ -354,6 +369,10 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this ConsumersMap; ConsumersMap consumers_; + // Scalable topics: DAG-watch sessions by client-assigned session id. + typedef std::map ScalableTopicSessionsMap; + ScalableTopicSessionsMap scalableTopicSessions_; + typedef std::map> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; @@ -436,6 +455,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&); optional getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&); std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&); diff --git a/lib/Commands.cc b/lib/Commands.cc index 3dd22591..9b800874 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -55,6 +55,7 @@ using proto::CommandLookupTopic; using proto::CommandPartitionedTopicMetadata; using proto::CommandProducer; using proto::CommandRedeliverUnacknowledgedMessages; +using proto::CommandScalableTopicLookup; using proto::CommandSeek; using proto::CommandSend; using proto::CommandSubscribe; @@ -285,6 +286,7 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const FeatureFlags* flags = connect->mutable_feature_flags(); flags->set_supports_auth_refresh(true); flags->set_supports_broker_entry_metadata(true); + flags->set_supports_scalable_topics(true); if (connectingThroughProxy) { Url logicalAddressUrl; Url::parse(logicalAddress, logicalAddressUrl); @@ -552,6 +554,24 @@ SharedBuffer Commands::newCloseConsumer(uint64_t consumerId, uint64_t requestId) return writeMessageWithSize(cmd); } +SharedBuffer Commands::newScalableTopicLookup(uint64_t sessionId, const std::string& topic, + bool createIfMissing) { + BaseCommand cmd; + cmd.set_type(BaseCommand::SCALABLE_TOPIC_LOOKUP); + CommandScalableTopicLookup* lookup = cmd.mutable_scalabletopiclookup(); + lookup->set_session_id(sessionId); + lookup->set_topic(topic); + lookup->set_create_if_missing(createIfMissing); + return writeMessageWithSize(cmd); +} + +SharedBuffer Commands::newScalableTopicClose(uint64_t sessionId) { + BaseCommand cmd; + cmd.set_type(BaseCommand::SCALABLE_TOPIC_CLOSE); + cmd.mutable_scalabletopicclose()->set_session_id(sessionId); + return writeMessageWithSize(cmd); +} + SharedBuffer Commands::newPing() { BaseCommand cmd; cmd.set_type(BaseCommand::PING); @@ -808,6 +828,42 @@ std::string Commands::messageType(BaseCommand_Type type) { case BaseCommand::WATCH_TOPIC_LIST_CLOSE: return "WATCH_TOPIC_LIST_CLOSE"; break; + case BaseCommand::SCALABLE_TOPIC_LOOKUP: + return "SCALABLE_TOPIC_LOOKUP"; + break; + case BaseCommand::SCALABLE_TOPIC_UPDATE: + return "SCALABLE_TOPIC_UPDATE"; + break; + case BaseCommand::SCALABLE_TOPIC_CLOSE: + return "SCALABLE_TOPIC_CLOSE"; + break; + case BaseCommand::SCALABLE_TOPIC_SUBSCRIBE: + return "SCALABLE_TOPIC_SUBSCRIBE"; + break; + case BaseCommand::SCALABLE_TOPIC_SUBSCRIBE_RESPONSE: + return "SCALABLE_TOPIC_SUBSCRIBE_RESPONSE"; + break; + case BaseCommand::SCALABLE_TOPIC_ASSIGNMENT_UPDATE: + return "SCALABLE_TOPIC_ASSIGNMENT_UPDATE"; + break; + case BaseCommand::WATCH_SCALABLE_TOPICS: + return "WATCH_SCALABLE_TOPICS"; + break; + case BaseCommand::WATCH_SCALABLE_TOPICS_UPDATE: + return "WATCH_SCALABLE_TOPICS_UPDATE"; + break; + case BaseCommand::WATCH_SCALABLE_TOPICS_CLOSE: + return "WATCH_SCALABLE_TOPICS_CLOSE"; + break; + case BaseCommand::WATCH_TC_ASSIGNMENTS: + return "WATCH_TC_ASSIGNMENTS"; + break; + case BaseCommand::WATCH_TC_ASSIGNMENTS_UPDATE: + return "WATCH_TC_ASSIGNMENTS_UPDATE"; + break; + case BaseCommand::WATCH_TC_ASSIGNMENTS_CLOSE: + return "WATCH_TC_ASSIGNMENTS_CLOSE"; + break; }; BOOST_THROW_EXCEPTION(std::logic_error("Invalid BaseCommand enumeration value")); } diff --git a/lib/Commands.h b/lib/Commands.h index 8403d6e2..622f95d7 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -147,6 +147,13 @@ class Commands { static SharedBuffer newRedeliverUnacknowledgedMessages(uint64_t consumerId, const std::set& messageIds); + // Scalable topics (pulsar::st): open/close a DAG-watch session. The broker + // answers (and later pushes) CommandScalableTopicUpdate correlated by the + // client-assigned sessionId. + static SharedBuffer newScalableTopicLookup(uint64_t sessionId, const std::string& topic, + bool createIfMissing); + static SharedBuffer newScalableTopicClose(uint64_t sessionId); + static std::string messageType(BaseCommand_Type type); static void initBatchMessageMetadata(const Message& msg, pulsar::proto::MessageMetadata& batchMetadata); diff --git a/tests/st/ScalableCommandsTest.cc b/tests/st/ScalableCommandsTest.cc new file mode 100644 index 00000000..ede92aa6 --- /dev/null +++ b/tests/st/ScalableCommandsTest.cc @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include + +#include "PulsarApi.pb.h" +#include "lib/Commands.h" +#include "lib/SharedBuffer.h" + +// Wire-level tests for the scalable-topics commands: the framed buffers the +// Commands factories produce must deserialize back to the intended protobuf. + +using namespace pulsar; +namespace proto = pulsar::proto; + +namespace { + +// Unwrap [totalSize][cmdSize][BaseCommand] framing produced by the factories. +proto::BaseCommand parseCommand(SharedBuffer buffer) { + const uint32_t totalSize = buffer.readUnsignedInt(); + const uint32_t cmdSize = buffer.readUnsignedInt(); + EXPECT_EQ(totalSize, cmdSize + sizeof(uint32_t)); + EXPECT_EQ(buffer.readableBytes(), cmdSize); + proto::BaseCommand cmd; + EXPECT_TRUE(cmd.ParseFromArray(buffer.data(), static_cast(cmdSize))); + return cmd; +} + +} // namespace + +TEST(ScalableCommandsTest, testScalableTopicLookupRoundtrip) { + auto cmd = parseCommand(Commands::newScalableTopicLookup(42, "persistent://public/default/orders", true)); + ASSERT_EQ(cmd.type(), proto::BaseCommand::SCALABLE_TOPIC_LOOKUP); + ASSERT_TRUE(cmd.has_scalabletopiclookup()); + ASSERT_EQ(cmd.scalabletopiclookup().session_id(), 42u); + ASSERT_EQ(cmd.scalabletopiclookup().topic(), "persistent://public/default/orders"); + ASSERT_TRUE(cmd.scalabletopiclookup().create_if_missing()); +} + +TEST(ScalableCommandsTest, testScalableTopicLookupNoCreate) { + auto cmd = parseCommand(Commands::newScalableTopicLookup(7, "orders", false)); + ASSERT_EQ(cmd.scalabletopiclookup().session_id(), 7u); + ASSERT_FALSE(cmd.scalabletopiclookup().create_if_missing()); +} + +TEST(ScalableCommandsTest, testScalableTopicCloseRoundtrip) { + auto cmd = parseCommand(Commands::newScalableTopicClose(42)); + ASSERT_EQ(cmd.type(), proto::BaseCommand::SCALABLE_TOPIC_CLOSE); + ASSERT_TRUE(cmd.has_scalabletopicclose()); + ASSERT_EQ(cmd.scalabletopicclose().session_id(), 42u); +} + +TEST(ScalableCommandsTest, testMessageTypeNamesScalableCommands) { + // messageType() runs on every incoming command (debug logging) and throws on + // unknown enum values: every scalable type the broker can send or the client + // can log must have a name. + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_LOOKUP), "SCALABLE_TOPIC_LOOKUP"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_UPDATE), "SCALABLE_TOPIC_UPDATE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_CLOSE), "SCALABLE_TOPIC_CLOSE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_SUBSCRIBE), + "SCALABLE_TOPIC_SUBSCRIBE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_SUBSCRIBE_RESPONSE), + "SCALABLE_TOPIC_SUBSCRIBE_RESPONSE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_ASSIGNMENT_UPDATE), + "SCALABLE_TOPIC_ASSIGNMENT_UPDATE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_SCALABLE_TOPICS), "WATCH_SCALABLE_TOPICS"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_SCALABLE_TOPICS_UPDATE), + "WATCH_SCALABLE_TOPICS_UPDATE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_SCALABLE_TOPICS_CLOSE), + "WATCH_SCALABLE_TOPICS_CLOSE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_TC_ASSIGNMENTS), "WATCH_TC_ASSIGNMENTS"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_TC_ASSIGNMENTS_UPDATE), + "WATCH_TC_ASSIGNMENTS_UPDATE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_TC_ASSIGNMENTS_CLOSE), + "WATCH_TC_ASSIGNMENTS_CLOSE"); +} From 5fd437267c993f7efc0d7bb81a7d0cd643b694fe Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 1 Jul 2026 16:20:28 -0700 Subject: [PATCH 6/9] st: DagWatchSession - the per-topic DAG-watch session The lib/st session that keeps a scalable topic's SegmentLayout current, ported from the Java v5 client's DagWatchClient so the two clients behave identically. - start() acquires a connection through the classic client's pool (the topic:// spelling maps to its persistent:// twin for the connection lookup only; the wire lookup keeps the original), verifies the broker advertised scalable-topics support, registers the session on the connection and sends CommandScalableTopicLookup. The returned future completes with the initial layout or the lookup failure. - Updates (initial response and pushes alike) arrive through the connection registry: the resolved topic:// identity from the broker is adopted, the DAG becomes a new SegmentLayout, the reconnect backoff resets, and the layout-change listener fires with (new, old). A malformed first response fails start() instead of hanging it; later malformed pushes keep the last good layout. Broker errors before the first layout fail start() (TopicNotFound mapped to ResultTopicNotFound); afterwards they are treated as transient, matching the Java client. - On connection close after the initial layout the session reconnects with exponential backoff (100ms..30s) and re-issues the lookup; before the initial layout it fails start() rather than retrying behind the caller. Deliberately no epoch gating, same as the Java client: updates are ordered per connection and the old connection's registry is drained before the session re-attaches, so stale pushes cannot arrive. - close() is idempotent: cancels the reconnect timer, unregisters, and sends CommandScalableTopicClose. - ClientConnection now records the broker's CONNECTED feature flags and exposes supportsScalableTopics(), which the session gates on (previously the C++ client ignored broker feature flags entirely). - tests/st/DagWatchSessionTest.cc: the lookup-name mapping and the no-connection lifecycle (fresh state, unique ids, close before start, double close). The full protocol is exercised end-to-end by the producer integration tests against a scalable-topics broker. Verified: 70/70 pulsar-st-tests green; pulsarShared and the classic pulsar-tests target compile clean; clang-format-11 clean. Signed-off-by: Matteo Merli --- lib/ClientConnection.cc | 5 + lib/ClientConnection.h | 4 + lib/st/DagWatchSession.cc | 264 ++++++++++++++++++++++++++++++++ lib/st/DagWatchSession.h | 125 +++++++++++++++ tests/st/DagWatchSessionTest.cc | 66 ++++++++ 5 files changed, 464 insertions(+) create mode 100644 lib/st/DagWatchSession.cc create mode 100644 lib/st/DagWatchSession.h create mode 100644 tests/st/DagWatchSessionTest.cc diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 668245d3..474a96b1 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -313,6 +313,11 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC LOG_DEBUG("Current max message size is: " << maxMessageSize_); } + if (cmdConnected.has_feature_flags()) { + supportsScalableTopics_.store(cmdConnected.feature_flags().supports_scalable_topics(), + std::memory_order_release); + } + Lock lock(mutex_); if (isClosed()) { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 22d5ff38..8591f546 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -202,6 +202,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this ScalableTopicSessionsMap; ScalableTopicSessionsMap scalableTopicSessions_; + std::atomic supportsScalableTopics_{false}; typedef std::map> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; diff --git a/lib/st/DagWatchSession.cc b/lib/st/DagWatchSession.cc new file mode 100644 index 00000000..fb50fdd1 --- /dev/null +++ b/lib/st/DagWatchSession.cc @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "DagWatchSession.h" + +#include +#include + +#include "PulsarApi.pb.h" +#include "lib/Commands.h" +#include "lib/ExecutorService.h" +#include "lib/LogUtils.h" + +DECLARE_LOG_OBJECT() + +namespace pulsar::st { + +namespace { + +std::atomic sessionIdGenerator{0}; + +Result toResult(pulsar::proto::ServerError error) { + return error == pulsar::proto::TopicNotFound ? ResultTopicNotFound : ResultLookupError; +} + +} // namespace + +DagWatchSession::DagWatchSession(pulsar::ClientImplPtr client, std::string topic, bool createIfMissing) + : client_(std::move(client)), + inputTopic_(std::move(topic)), + createIfMissing_(createIfMissing), + sessionId_(++sessionIdGenerator), + backoff_(std::chrono::milliseconds(100), std::chrono::seconds(30), std::chrono::milliseconds(0)), + reconnectTimer_(client_->getIOExecutorProvider()->get()->createDeadlineTimer()) {} + +std::string DagWatchSession::lookupCompatibleTopic(const std::string& topic) { + constexpr std::string_view kTopicScheme = "topic://"; + if (topic.rfind(kTopicScheme, 0) == 0) { + return "persistent://" + topic.substr(kTopicScheme.size()); + } + return topic; +} + +Future DagWatchSession::start() { + auto self = shared_from_this(); + client_->getConnection("", lookupCompatibleTopic(inputTopic_), static_cast(sessionId_)) + .addListener([self](pulsar::Result result, const pulsar::ClientConnectionPtr& cnx) { + if (result == pulsar::ResultOk) { + self->attach(cnx); + } else { + self->initialLayoutPromise_.setError( + Error{result, "failed to get a connection for the scalable-topic lookup"}); + } + }); + return initialLayoutPromise_.getFuture(); +} + +void DagWatchSession::attach(const pulsar::ClientConnectionPtr& cnx) { + if (closed_) { + return; + } + if (!cnx->supportsScalableTopics()) { + Error error{ResultUnsupportedVersionError, "the broker does not support scalable topics"}; + if (!initialLayoutPromise_.setError(error)) { + // Initial layout already delivered: keep retrying, another broker in + // the cluster may still support the feature. + LOG_WARN("[" << inputTopic_ << "] session " << sessionId_ + << ": reconnect target broker does not support scalable topics"); + scheduleReconnect(); + } + return; + } + + { + std::lock_guard lock(mutex_); + cnx_ = cnx; + } + + auto self = shared_from_this(); + bool registered = cnx->registerScalableTopicSession( + sessionId_, [self](pulsar::Result result, const pulsar::proto::CommandScalableTopicUpdate* update) { + self->handleSessionEvent(result, update); + }); + if (!registered) { + // The connection closed between acquisition and registration. + handleConnectionClosed(); + return; + } + cnx->sendCommand(Commands::newScalableTopicLookup(sessionId_, inputTopic_, createIfMissing_)); + // A failed write closes the connection, which notifies the session through + // the registry: no separate write-failure path is needed here. +} + +void DagWatchSession::handleSessionEvent(pulsar::Result result, + const pulsar::proto::CommandScalableTopicUpdate* update) { + if (closed_) { + return; + } + if (update == nullptr) { + (void)result; + handleConnectionClosed(); + } else if (update->has_error()) { + handleServerError(*update); + } else if (update->has_dag()) { + handleDagUpdate(*update); + } else { + LOG_WARN("[" << inputTopic_ << "] session " << sessionId_ + << ": update carries neither a DAG nor an error; ignoring"); + } +} + +void DagWatchSession::handleDagUpdate(const pulsar::proto::CommandScalableTopicUpdate& update) { + std::string resolved; + { + std::lock_guard lock(mutex_); + if (update.has_resolved_topic_name()) { + resolvedTopicName_ = update.resolved_topic_name(); + } + resolved = !resolvedTopicName_.empty() ? resolvedTopicName_ : inputTopic_; + } + + auto layout = SegmentLayout::fromProto(update.dag(), resolved); + if (!layout) { + LOG_ERROR("[" << inputTopic_ << "] session " << sessionId_ + << ": discarding malformed DAG update: " << layout.error().message); + // A malformed FIRST response must fail start() rather than hang it; after + // that, keep the last good layout. + initialLayoutPromise_.setError(layout.error()); + return; + } + + SegmentLayout oldLayout; + LayoutChangeListener listener; + { + std::lock_guard lock(mutex_); + oldLayout = std::move(currentLayout_); + currentLayout_ = *layout; + listener = listener_; + } + + // The broker confirmed the session is live and our state is fresh. + backoff_.reset(); + + LOG_INFO("[" << resolved << "] session " << sessionId_ << ": layout updated, epoch " << oldLayout.epoch() + << " -> " << layout->epoch() << ", " << layout->activeSegments().size() + << " active segments"); + + initialLayoutPromise_.complete(*layout); + if (listener) { + listener(*layout, oldLayout); + } +} + +void DagWatchSession::handleServerError(const pulsar::proto::CommandScalableTopicUpdate& update) { + const std::string message = update.has_message() ? update.message() : "scalable-topic lookup failed"; + LOG_ERROR("[" << inputTopic_ << "] session " << sessionId_ << ": broker reported " << update.error() + << ": " << message); + // Before the initial layout this is a lookup failure and start() surfaces it; + // afterwards broker-side errors are transient (a reconnect clears them) and + // the connection-closed path picks them up. + initialLayoutPromise_.setError(Error{toResult(update.error()), message}); +} + +void DagWatchSession::handleConnectionClosed() { + { + std::lock_guard lock(mutex_); + cnx_.reset(); + } + if (closed_) { + return; + } + if (initialLayoutPromise_.setError( + Error{ResultConnectError, "connection closed while waiting for the scalable-topic layout"})) { + // The initial lookup never completed: surface the failure to start()'s + // caller instead of retrying behind its back. + return; + } + scheduleReconnect(); +} + +void DagWatchSession::scheduleReconnect() { + if (closed_) { + return; + } + auto delay = backoff_.next(); + LOG_INFO("[" << inputTopic_ << "] session " << sessionId_ << ": reconnecting the DAG watch in " + << std::chrono::duration_cast(delay).count() << " ms"); + std::weak_ptr weakSelf = shared_from_this(); + reconnectTimer_->expires_from_now(delay); + reconnectTimer_->async_wait([weakSelf](const ASIO_ERROR& error) { + auto self = weakSelf.lock(); + if (self && !error) { + self->reconnect(); + } + }); +} + +void DagWatchSession::reconnect() { + if (closed_) { + return; + } + auto self = shared_from_this(); + client_->getConnection("", lookupCompatibleTopic(inputTopic_), static_cast(sessionId_)) + .addListener([self](pulsar::Result result, const pulsar::ClientConnectionPtr& cnx) { + if (result == pulsar::ResultOk) { + self->attach(cnx); + } else { + LOG_WARN("[" << self->inputTopic_ << "] session " << self->sessionId_ + << ": DAG watch reconnect failed (" << result << "); will retry"); + self->scheduleReconnect(); + } + }); +} + +SegmentLayout DagWatchSession::currentLayout() const { + std::lock_guard lock(mutex_); + return currentLayout_; +} + +std::string DagWatchSession::topicName() const { + std::lock_guard lock(mutex_); + return !resolvedTopicName_.empty() ? resolvedTopicName_ : inputTopic_; +} + +void DagWatchSession::setLayoutChangeListener(LayoutChangeListener listener) { + std::lock_guard lock(mutex_); + listener_ = std::move(listener); +} + +void DagWatchSession::close() { + if (closed_.exchange(true)) { + return; + } + ASIO_ERROR ignored; + reconnectTimer_->cancel(ignored); + + pulsar::ClientConnectionPtr cnx; + { + std::lock_guard lock(mutex_); + cnx = cnx_.lock(); + cnx_.reset(); + } + if (cnx) { + cnx->removeScalableTopicSession(sessionId_); + cnx->sendCommand(Commands::newScalableTopicClose(sessionId_)); + } +} + +} // namespace pulsar::st diff --git a/lib/st/DagWatchSession.h b/lib/st/DagWatchSession.h new file mode 100644 index 00000000..93b9ed8b --- /dev/null +++ b/lib/st/DagWatchSession.h @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include + +#include "SegmentLayout.h" +#include "lib/Backoff.h" +#include "lib/ClientConnection.h" +#include "lib/ClientImpl.h" + +namespace pulsar::st { + +/** + * The DAG-watch session of one scalable topic, ported from the Java v5 client's + * DagWatchClient so the two clients behave identically. + * + * start() connects to a broker (through the classic client's pool) and sends a + * CommandScalableTopicLookup; the broker answers — and afterwards pushes — a + * CommandScalableTopicUpdate carrying the full DAG, correlated by the + * client-assigned session id. Each accepted update replaces the current + * SegmentLayout and is reported to the layout-change listener. When the + * connection drops after the initial layout, the session reconnects with + * exponential backoff and re-issues the lookup; if it drops before the first + * layout, start()'s future fails instead (the caller's create() surfaces the + * error rather than silently retrying). Stale pushes cannot arrive out of + * order: updates are ordered per connection, and the old connection's registry + * is drained before the session re-attaches to a new one. + */ +class DagWatchSession : public std::enable_shared_from_this { + public: + /** Invoked for every accepted layout, including the first (oldLayout is empty then). */ + using LayoutChangeListener = + std::function; + + /** + * @param client the classic client whose connection pool and executors are reused. + * @param topic the scalable topic, in any accepted spelling ("topic://t/n/x", + * "persistent://t/n/x", or a short form); the broker resolves it and the + * session adopts the canonical identity from the first response. + * @param createIfMissing whether the broker may auto-create a missing topic on + * lookup (single-topic producers/consumers pass true; namespace watchers + * pass false so a deleted topic is not resurrected by a reconnect). + */ + DagWatchSession(pulsar::ClientImplPtr client, std::string topic, bool createIfMissing); + + /** + * Start the session. May be called once. + * @return a future completing with the initial layout, or the lookup failure. + */ + Future start(); + + /** Snapshot of the most recent layout (empty before the first update). */ + SegmentLayout currentLayout() const; + + /** The canonical topic identity, falling back to the input before the first response. */ + std::string topicName() const; + + /** Register the listener notified on every accepted layout update. */ + void setLayoutChangeListener(LayoutChangeListener listener); + + /** Close the session: stop reconnecting and tell the broker. Idempotent. */ + void close(); + + std::uint64_t sessionId() const { return sessionId_; } + + /** + * The topic spelling used for the classic connection lookup. The classic + * TopicName does not know the topic:// scheme, so that spelling maps to its + * persistent://... twin for the purpose of finding a broker; the wire lookup + * still carries the caller's original spelling. + */ + static std::string lookupCompatibleTopic(const std::string& topic); + + private: + void attach(const pulsar::ClientConnectionPtr& cnx); + void handleSessionEvent(pulsar::Result result, const pulsar::proto::CommandScalableTopicUpdate* update); + void handleDagUpdate(const pulsar::proto::CommandScalableTopicUpdate& update); + void handleServerError(const pulsar::proto::CommandScalableTopicUpdate& update); + void handleConnectionClosed(); + void scheduleReconnect(); + void reconnect(); + + pulsar::ClientImplPtr client_; + const std::string inputTopic_; + const bool createIfMissing_; + const std::uint64_t sessionId_; + pulsar::Backoff backoff_; + DeadlineTimerPtr reconnectTimer_; // global-scope alias from AsioTimer.h + detail::Promise initialLayoutPromise_; + std::atomic closed_{false}; + + mutable std::mutex mutex_; + SegmentLayout currentLayout_; // guarded by mutex_ + std::string resolvedTopicName_; // guarded by mutex_; empty until the first response + LayoutChangeListener listener_; // guarded by mutex_ + pulsar::ClientConnectionWeakPtr cnx_; // guarded by mutex_ +}; + +using DagWatchSessionPtr = std::shared_ptr; + +} // namespace pulsar::st diff --git a/tests/st/DagWatchSessionTest.cc b/tests/st/DagWatchSessionTest.cc new file mode 100644 index 00000000..bed4a8a3 --- /dev/null +++ b/tests/st/DagWatchSessionTest.cc @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include + +#include "lib/ClientImpl.h" +#include "lib/st/DagWatchSession.h" + +// Broker-free tests for the DAG-watch session: the pure lookup-name mapping and +// the lifecycle paths that must be safe without any connection. The full +// session protocol (lookup, updates, reconnect) is exercised end-to-end against +// a real broker by the producer integration tests. + +using namespace pulsar::st; + +TEST(DagWatchSessionTest, testLookupCompatibleTopicMapsTopicScheme) { + // The classic TopicName does not know the topic:// scheme; the connection + // lookup uses the persistent:// twin while the wire lookup keeps the + // original spelling. + ASSERT_EQ(DagWatchSession::lookupCompatibleTopic("topic://public/default/orders"), + "persistent://public/default/orders"); + ASSERT_EQ(DagWatchSession::lookupCompatibleTopic("persistent://public/default/orders"), + "persistent://public/default/orders"); + ASSERT_EQ(DagWatchSession::lookupCompatibleTopic("orders"), "orders"); +} + +TEST(DagWatchSessionTest, testSessionLifecycleWithoutConnection) { + auto classic = + std::make_shared("pulsar://localhost:6650", pulsar::ClientConfiguration{}); + classic->initialize(); + + auto session = std::make_shared(classic, "topic://public/default/orders", true); + + // Fresh session: empty layout, input identity, unique session id. + ASSERT_TRUE(session->currentLayout().activeSegments().empty()); + ASSERT_EQ(session->currentLayout().epoch(), 0u); + ASSERT_EQ(session->topicName(), "topic://public/default/orders"); + + auto other = std::make_shared(classic, "topic://public/default/other", true); + ASSERT_NE(session->sessionId(), other->sessionId()); + + // Closing before start (and closing twice) must be safe. + session->close(); + session->close(); + other->close(); + + classic->shutdown(); +} From 7bc64b93dd71082ed47ae735b08e81af8ceb6904 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 1 Jul 2026 17:21:54 -0700 Subject: [PATCH 7/9] st: enable the reflect-cpp Avro backend (avroSchema()) Review feedback on the vcpkg manifest: also include Avro support for reflectcpp. The registry reflectcpp port ships the rfl/avro.hpp headers but has no "avro" feature, so libreflectcpp.a was built without the backend and the headers could not even compile (they include , which was not installed). Add a vcpkg overlay port (vcpkg-overlay/reflectcpp, wired via vcpkg-configuration.json) that adds the feature - REFLECTCPP_AVRO plus an avro-c (1.12.1) dependency - and turn it on in the manifest. Drop the overlay once the upstream port grows the feature. Building against the real backend (instead of the earlier structural stand-in) exposed two API mismatches in AvroSchema.h, now fixed: rfl::avro::to_schema() returns a Schema (use .json_str() for the SchemaInfo definition, a proper Avro record schema), and rfl::avro::write() returns std::vector, not std::string. decode() now uses the pointer+size read overload instead of copying into a string. Verified with a runtime roundtrip through avroSchema() against the rebuilt reflectcpp[avro] + avro-c: schema derivation, binary encode/decode, and malformed-input-as-Error all pass; 70/70 pulsar-st-tests and the StExamples/pulsarShared builds stay green; clang-format-11 clean. Signed-off-by: Matteo Merli --- examples/CMakeLists.txt | 10 +- include/pulsar/st/AvroSchema.h | 9 +- vcpkg-configuration.json | 5 + vcpkg-overlay/reflectcpp/portfile.cmake | 54 +++++++++ vcpkg-overlay/reflectcpp/usage | 4 + vcpkg-overlay/reflectcpp/vcpkg.json | 142 ++++++++++++++++++++++++ vcpkg.json | 3 + 7 files changed, 218 insertions(+), 9 deletions(-) create mode 100644 vcpkg-configuration.json create mode 100644 vcpkg-overlay/reflectcpp/portfile.cmake create mode 100644 vcpkg-overlay/reflectcpp/usage create mode 100644 vcpkg-overlay/reflectcpp/vcpkg.json diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index f0c0db2f..07534775 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -124,11 +124,11 @@ set(SAMPLE_ST_SOURCES st/SampleStQueueConsumer.cc st/SampleStCheckpointConsumer.cc ) -# reflect-cpp powers jsonSchema() (reflection-based JSON SerDe + schema). It is -# optional for this API-only PR: when the package is present the JSON sample is -# added and linked against it; when absent, only that one sample is skipped. (The -# reflectcpp vcpkg port does not yet ship an Avro backend, so it is not yet wired -# into the manifest; it will be added with the lib/st implementation.) +# reflect-cpp powers jsonSchema() and avroSchema() (reflection-based SerDe + +# derived schema). The vcpkg manifest carries reflectcpp plus avro-c (the C library +# behind reflect-cpp's header-only Avro backend); the find_package guard only +# matters for non-manifest builds (e.g. the CodeQL job), where the JSON sample is +# skipped. find_package(reflectcpp CONFIG QUIET) if (reflectcpp_FOUND) list(APPEND SAMPLE_ST_SOURCES st/SampleStJsonSchema.cc) diff --git a/include/pulsar/st/AvroSchema.h b/include/pulsar/st/AvroSchema.h index f8bc58f7..bd3d6c13 100644 --- a/include/pulsar/st/AvroSchema.h +++ b/include/pulsar/st/AvroSchema.h @@ -45,10 +45,12 @@ namespace pulsar::st { namespace detail { template struct AvroSerDe { - SchemaInfo info() const { return SchemaInfo(SchemaType::AVRO, "AVRO", rfl::avro::to_schema()); } + SchemaInfo info() const { + return SchemaInfo(SchemaType::AVRO, "AVRO", rfl::avro::to_schema().json_str()); + } Expected encode(const T& value, std::vector& out) const { try { - const std::string s = rfl::avro::write(value); + const std::vector s = rfl::avro::write(value); const auto* p = reinterpret_cast(s.data()); out.assign(p, p + s.size()); return {}; @@ -58,8 +60,7 @@ struct AvroSerDe { } Expected decode(std::span data) const { try { - return rfl::avro::read(std::string(reinterpret_cast(data.data()), data.size())) - .value(); + return rfl::avro::read(reinterpret_cast(data.data()), data.size()).value(); } catch (const std::exception& e) { return unexpected(pulsar::ResultInvalidMessage, e.what()); } diff --git a/vcpkg-configuration.json b/vcpkg-configuration.json new file mode 100644 index 00000000..edf5c695 --- /dev/null +++ b/vcpkg-configuration.json @@ -0,0 +1,5 @@ +{ + "overlay-ports": [ + "./vcpkg-overlay" + ] +} diff --git a/vcpkg-overlay/reflectcpp/portfile.cmake b/vcpkg-overlay/reflectcpp/portfile.cmake new file mode 100644 index 00000000..5d75e6f2 --- /dev/null +++ b/vcpkg-overlay/reflectcpp/portfile.cmake @@ -0,0 +1,54 @@ +vcpkg_from_github( + OUT_SOURCE_PATH SOURCE_PATH + REPO getml/reflect-cpp + REF "v${VERSION}" + SHA512 4be84fc69efd6f4ce766d38cedc8b1d0fd0fa8170e69293383f7dbd59c6bce45797f0e7cf653ef9c839b15fd7da702c9daf30efd34c779555fe4e5bd5eb29481 + HEAD_REF main +) + +if(VCPKG_TARGET_IS_WINDOWS) + vcpkg_check_linkage(ONLY_STATIC_LIBRARY) +endif() +string(COMPARE EQUAL "${VCPKG_LIBRARY_LINKAGE}" "dynamic" REFLECTCPP_BUILD_SHARED) + +# Overlay of the upstream vcpkg port: adds the "avro" feature (REFLECTCPP_AVRO + +# avro-c), which the registry port does not expose yet. Drop this overlay once the +# upstream port grows the feature. +vcpkg_check_features(OUT_FEATURE_OPTIONS FEATURE_OPTIONS + FEATURES + avro REFLECTCPP_AVRO + bson REFLECTCPP_BSON + capnproto REFLECTCPP_CAPNPROTO + cbor REFLECTCPP_CBOR + csv REFLECTCPP_CSV + flexbuffers REFLECTCPP_FLEXBUFFERS + msgpack REFLECTCPP_MSGPACK + parquet REFLECTCPP_PARQUET + toml REFLECTCPP_TOML + ubjson REFLECTCPP_UBJSON + xml REFLECTCPP_XML + yaml REFLECTCPP_YAML +) + +vcpkg_cmake_configure( + SOURCE_PATH ${SOURCE_PATH} + OPTIONS + ${FEATURE_OPTIONS} + -DREFLECTCPP_BUILD_TESTS=OFF + -DREFLECTCPP_BUILD_SHARED=${REFLECTCPP_BUILD_SHARED} + -DREFLECTCPP_USE_BUNDLED_DEPENDENCIES=OFF +) + +vcpkg_cmake_install() + +vcpkg_cmake_config_fixup( + CONFIG_PATH "lib/cmake/${PORT}" +) + +file(REMOVE_RECURSE + "${CURRENT_PACKAGES_DIR}/debug/include" + "${CURRENT_PACKAGES_DIR}/debug/share" +) + +vcpkg_install_copyright(FILE_LIST "${SOURCE_PATH}/LICENSE") +configure_file("${CMAKE_CURRENT_LIST_DIR}/usage" "${CURRENT_PACKAGES_DIR}/share/${PORT}/usage" COPYONLY) diff --git a/vcpkg-overlay/reflectcpp/usage b/vcpkg-overlay/reflectcpp/usage new file mode 100644 index 00000000..ebc57e43 --- /dev/null +++ b/vcpkg-overlay/reflectcpp/usage @@ -0,0 +1,4 @@ +reflect-cpp provides CMake targets: + + find_package(reflectcpp CONFIG REQUIRED) + target_link_libraries(main PRIVATE reflectcpp::reflectcpp) diff --git a/vcpkg-overlay/reflectcpp/vcpkg.json b/vcpkg-overlay/reflectcpp/vcpkg.json new file mode 100644 index 00000000..b24665b9 --- /dev/null +++ b/vcpkg-overlay/reflectcpp/vcpkg.json @@ -0,0 +1,142 @@ +{ + "name": "reflectcpp", + "version": "0.24.0", + "port-version": 1, + "description": "A C++ library for serialization and deserialization using reflection. Supports JSON, Avro, BSON, Cap'n Proto, CBOR, CSV, flexbuffers, msgpack, parquet, TOML, UBJSON, XML, YAML.", + "homepage": "https://github.com/getml/reflect-cpp/", + "license": "MIT", + "dependencies": [ + { + "name": "ctre", + "version>=": "3.10.0" + }, + { + "name": "vcpkg-cmake", + "host": true + }, + { + "name": "vcpkg-cmake-config", + "host": true + }, + { + "name": "yyjson", + "version>=": "0.10.0" + } + ], + "features": { + "avro": { + "description": "Support for the Avro format", + "dependencies": [ + { + "name": "avro-c", + "version>=": "1.12.1" + } + ] + }, + "bson": { + "description": "Support for the BSON format", + "dependencies": [ + { + "name": "libbson", + "version>=": "1.25.1" + } + ] + }, + "capnproto": { + "description": "Support for the Cap'n Proto format", + "dependencies": [ + { + "name": "capnproto", + "version>=": "1.0.2#1" + } + ] + }, + "cbor": { + "description": "Support for the CBOR format", + "dependencies": [ + { + "name": "jsoncons", + "version>=": "1.4.0" + } + ] + }, + "csv": { + "description": "Enable CSV support", + "dependencies": [ + { + "name": "arrow", + "features": [ + "csv" + ], + "version>=": "21.0.0" + } + ] + }, + "flexbuffers": { + "description": "Support for the flexbuffers format (part of flatbuffers)", + "dependencies": [ + { + "name": "flatbuffers", + "version>=": "23.5.26#1" + } + ] + }, + "msgpack": { + "description": "Support for the msgpack format", + "dependencies": [ + { + "name": "msgpack-c", + "version>=": "6.0.0" + } + ] + }, + "parquet": { + "description": "Enable parquet support", + "dependencies": [ + { + "name": "arrow", + "features": [ + "parquet" + ], + "version>=": "21.0.0" + } + ] + }, + "toml": { + "description": "Support for the TOML format", + "dependencies": [ + { + "name": "tomlplusplus", + "version>=": "3.4.0#1" + } + ] + }, + "ubjson": { + "description": "Support for the UBJSON format", + "dependencies": [ + { + "name": "jsoncons", + "version>=": "1.4.0" + } + ] + }, + "xml": { + "description": "Support for the XML format", + "dependencies": [ + { + "name": "pugixml", + "version>=": "1.15" + } + ] + }, + "yaml": { + "description": "Support for the YAML format", + "dependencies": [ + { + "name": "yaml-cpp", + "version>=": "0.8.0#1" + } + ] + } + } +} diff --git a/vcpkg.json b/vcpkg.json index 721371da..c10d8666 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -45,6 +45,9 @@ }, { "name": "reflectcpp", + "features": [ + "avro" + ], "version>=": "0.24.0" }, { From fbb1a6892d5acd3c060ec6aa5b3fe3a26a39c41e Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 2 Jul 2026 06:45:13 -0700 Subject: [PATCH 8/9] st: fix reflectcpp consumer configure - find jansson for the Avro backend CI (Lint and unit-test jobs) failed at CMake generate time with: The link interface of target "reflectcpp::reflectcpp" contains: jansson::jansson but the target was not found. With the avro feature on, reflect-cpp links the static avro-c archive, whose link interface carries jansson::jansson - but the upstream reflectcpp-config.cmake has no Avro find_dependency block, so the target is never defined in the consumer. The local macOS build didn't catch it because the only local consumer (StExamples) is an OBJECT library, which never links and so never triggers CMake's imported-target validation; a minimal linking-consumer reproduction fails identically to CI. Patch the installed config from the overlay portfile (gated on the avro feature) to find_dependency(jansson CONFIG) before including the exports, and bump the overlay to port-version 2 since the previously pushed content changed. Verified: the linking-consumer repro configures and links; the full local build and 83/83 pulsar-st-tests stay green. Signed-off-by: Matteo Merli --- vcpkg-overlay/reflectcpp/portfile.cmake | 10 ++++++++++ vcpkg-overlay/reflectcpp/vcpkg.json | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/vcpkg-overlay/reflectcpp/portfile.cmake b/vcpkg-overlay/reflectcpp/portfile.cmake index 5d75e6f2..e81fb51e 100644 --- a/vcpkg-overlay/reflectcpp/portfile.cmake +++ b/vcpkg-overlay/reflectcpp/portfile.cmake @@ -45,6 +45,16 @@ vcpkg_cmake_config_fixup( CONFIG_PATH "lib/cmake/${PORT}" ) +if("avro" IN_LIST FEATURES) + # The Avro backend links the static avro-c archive, whose link interface + # carries jansson::jansson — but the upstream config has no Avro + # find_dependency block, so consumers fail at generate time with + # "the target was not found". Define it before the exports are included. + vcpkg_replace_string("${CURRENT_PACKAGES_DIR}/share/${PORT}/${PORT}-config.cmake" + "include(\${CMAKE_CURRENT_LIST_DIR}/reflectcpp-exports.cmake)" + "include(CMakeFindDependencyMacro)\nfind_dependency(jansson CONFIG)\ninclude(\${CMAKE_CURRENT_LIST_DIR}/reflectcpp-exports.cmake)") +endif() + file(REMOVE_RECURSE "${CURRENT_PACKAGES_DIR}/debug/include" "${CURRENT_PACKAGES_DIR}/debug/share" diff --git a/vcpkg-overlay/reflectcpp/vcpkg.json b/vcpkg-overlay/reflectcpp/vcpkg.json index b24665b9..b27b750c 100644 --- a/vcpkg-overlay/reflectcpp/vcpkg.json +++ b/vcpkg-overlay/reflectcpp/vcpkg.json @@ -1,7 +1,7 @@ { "name": "reflectcpp", "version": "0.24.0", - "port-version": 1, + "port-version": 2, "description": "A C++ library for serialization and deserialization using reflection. Supports JSON, Avro, BSON, Cap'n Proto, CBOR, CSV, flexbuffers, msgpack, parquet, TOML, UBJSON, XML, YAML.", "homepage": "https://github.com/getml/reflect-cpp/", "license": "MIT", From 52d72ef11e928866cc91650263a9b049c4e30570 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 2 Jul 2026 13:06:13 -0700 Subject: [PATCH 9/9] st: fix CI - clang-tidy performance checks and MSBuild proto ordering Two PR #600 failures the reflectcpp fix exposed once the build got further: Lint (clang-tidy, performance-* treated as errors) on lib/st/ClientImpl.cc: - the std::move of the trivially-copyable TransactionPolicy constructor argument had no effect; take it by const reference and copy the member. - the four not-yet-implemented create/subscribe stubs take their config by value (the sink the real implementation will move from) but do not consume it yet, so performance-unnecessary-value-param fired. Suppress it with a NOLINTNEXTLINE on each stub rather than churning the by-value signature the follow-up phases need (which would cascade const-ref up the builder chain). Windows (Visual Studio generator) failed compiling ST_OBJECT_LIB with "Cannot open include file: PulsarApi.pb.h": the st sources include the generated proto header, and while linking PROTO_OBJECTS propagates its include directory, the OBJECT->OBJECT link does not reliably order the build under MSBuild, so the st sources compiled before the proto existed. Add an explicit add_dependencies(ST_OBJECT_LIB PROTO_OBJECTS) (and the legacy PULSAR_OBJECT_LIB equivalent) and the generated-dir include. Verified: full local build + 83/83 pulsar-st-tests green; clang-format clean. Signed-off-by: Matteo Merli --- lib/CMakeLists.txt | 10 ++++++++++ lib/st/ClientImpl.cc | 11 +++++++++-- lib/st/ClientImpl.h | 2 +- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 33628d7d..de2ca968 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -89,7 +89,17 @@ set_property(TARGET ST_OBJECT_LIB PROPERTY POSITION_INDEPENDENT_CODE 1) set_target_properties(ST_OBJECT_LIB PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON) if (INTEGRATE_VCPKG) target_link_libraries(ST_OBJECT_LIB PROTO_OBJECTS) + # The st sources #include the generated PulsarApi.pb.h. Linking PROTO_OBJECTS + # propagates its include dir, but the OBJECT->OBJECT link does not reliably + # order the build under the Visual Studio generator, so the st sources can + # compile before the proto is generated. Force the ordering explicitly. + add_dependencies(ST_OBJECT_LIB PROTO_OBJECTS) +else () + # Legacy (non-vcpkg) build generates the proto as part of PULSAR_OBJECT_LIB; + # ensure it exists before the st sources that include it compile. + add_dependencies(ST_OBJECT_LIB PULSAR_OBJECT_LIB) endif () +target_include_directories(ST_OBJECT_LIB PRIVATE ${LIB_AUTOGEN_DIR}) target_include_directories(ST_OBJECT_LIB PUBLIC "${CMAKE_SOURCE_DIR}" "${CMAKE_SOURCE_DIR}/include" diff --git a/lib/st/ClientImpl.cc b/lib/st/ClientImpl.cc index c971a355..74c9e62c 100644 --- a/lib/st/ClientImpl.cc +++ b/lib/st/ClientImpl.cc @@ -38,21 +38,28 @@ Future notImplementedYet(const char* what) { } // namespace -ClientImpl::ClientImpl(pulsar::ClientImplPtr classicClient, TransactionPolicy transactionPolicy) - : classic_(std::move(classicClient)), transactionPolicy_(std::move(transactionPolicy)) {} +ClientImpl::ClientImpl(pulsar::ClientImplPtr classicClient, const TransactionPolicy& transactionPolicy) + : classic_(std::move(classicClient)), transactionPolicy_(transactionPolicy) {} +// The producer/consumer/transaction paths land in follow-up phases. Each takes +// its config by value (the sink the real implementation will move from), but as +// a stub it does not consume the config yet — hence the value-param suppressions. +// NOLINTNEXTLINE(performance-unnecessary-value-param) Future ClientImpl::createProducerAsync(ProducerConfig) { return notImplementedYet("createProducer"); } +// NOLINTNEXTLINE(performance-unnecessary-value-param) Future ClientImpl::subscribeStreamAsync(StreamConsumerConfig) { return notImplementedYet("subscribeStream"); } +// NOLINTNEXTLINE(performance-unnecessary-value-param) Future ClientImpl::subscribeQueueAsync(QueueConsumerConfig) { return notImplementedYet("subscribeQueue"); } +// NOLINTNEXTLINE(performance-unnecessary-value-param) Future ClientImpl::createCheckpointConsumerAsync(CheckpointConsumerConfig) { return notImplementedYet("createCheckpointConsumer"); } diff --git a/lib/st/ClientImpl.h b/lib/st/ClientImpl.h index bbcd9847..6a17f331 100644 --- a/lib/st/ClientImpl.h +++ b/lib/st/ClientImpl.h @@ -44,7 +44,7 @@ namespace pulsar::st { */ class ClientImpl { public: - ClientImpl(pulsar::ClientImplPtr classicClient, TransactionPolicy transactionPolicy); + ClientImpl(pulsar::ClientImplPtr classicClient, const TransactionPolicy& transactionPolicy); Future createProducerAsync(ProducerConfig config); Future subscribeStreamAsync(StreamConsumerConfig config);