diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index f0c0db2f..07534775 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -124,11 +124,11 @@ set(SAMPLE_ST_SOURCES st/SampleStQueueConsumer.cc st/SampleStCheckpointConsumer.cc ) -# reflect-cpp powers jsonSchema() (reflection-based JSON SerDe + schema). It is -# optional for this API-only PR: when the package is present the JSON sample is -# added and linked against it; when absent, only that one sample is skipped. (The -# reflectcpp vcpkg port does not yet ship an Avro backend, so it is not yet wired -# into the manifest; it will be added with the lib/st implementation.) +# reflect-cpp powers jsonSchema() and avroSchema() (reflection-based SerDe + +# derived schema). The vcpkg manifest carries reflectcpp plus avro-c (the C library +# behind reflect-cpp's header-only Avro backend); the find_package guard only +# matters for non-manifest builds (e.g. the CodeQL job), where the JSON sample is +# skipped. find_package(reflectcpp CONFIG QUIET) if (reflectcpp_FOUND) list(APPEND SAMPLE_ST_SOURCES st/SampleStJsonSchema.cc) diff --git a/include/pulsar/st/AvroSchema.h b/include/pulsar/st/AvroSchema.h index f8bc58f7..bd3d6c13 100644 --- a/include/pulsar/st/AvroSchema.h +++ b/include/pulsar/st/AvroSchema.h @@ -45,10 +45,12 @@ namespace pulsar::st { namespace detail { template struct AvroSerDe { - SchemaInfo info() const { return SchemaInfo(SchemaType::AVRO, "AVRO", rfl::avro::to_schema()); } + SchemaInfo info() const { + return SchemaInfo(SchemaType::AVRO, "AVRO", rfl::avro::to_schema().json_str()); + } Expected encode(const T& value, std::vector& out) const { try { - const std::string s = rfl::avro::write(value); + const std::vector s = rfl::avro::write(value); const auto* p = reinterpret_cast(s.data()); out.assign(p, p + s.size()); return {}; @@ -58,8 +60,7 @@ struct AvroSerDe { } Expected decode(std::span data) const { try { - return rfl::avro::read(std::string(reinterpret_cast(data.data()), data.size())) - .value(); + return rfl::avro::read(reinterpret_cast(data.data()), data.size()).value(); } catch (const std::exception& e) { return unexpected(pulsar::ResultInvalidMessage, e.what()); } diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 3478f10e..de2ca968 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -77,9 +77,37 @@ target_include_directories(PULSAR_OBJECT_LIB PUBLIC "${CMAKE_SOURCE_DIR}/include" "${CMAKE_BINARY_DIR}/include") +# --- Scalable topics (pulsar::st) implementation ---------------------------- +# The st API (include/pulsar/st) targets C++20 while the classic client above +# stays C++17, so its implementation compiles in a separate object library with +# a per-target standard. The objects are merged into the same libpulsar +# shared/static artifacts below; a C++20-compiled object archive links fine +# into C++17 consumers of the classic API. +file(GLOB PULSAR_ST_SOURCES st/*.cc st/*.h) +add_library(ST_OBJECT_LIB OBJECT ${PULSAR_ST_SOURCES}) +set_property(TARGET ST_OBJECT_LIB PROPERTY POSITION_INDEPENDENT_CODE 1) +set_target_properties(ST_OBJECT_LIB PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON) +if (INTEGRATE_VCPKG) + target_link_libraries(ST_OBJECT_LIB PROTO_OBJECTS) + # The st sources #include the generated PulsarApi.pb.h. Linking PROTO_OBJECTS + # propagates its include dir, but the OBJECT->OBJECT link does not reliably + # order the build under the Visual Studio generator, so the st sources can + # compile before the proto is generated. Force the ordering explicitly. + add_dependencies(ST_OBJECT_LIB PROTO_OBJECTS) +else () + # Legacy (non-vcpkg) build generates the proto as part of PULSAR_OBJECT_LIB; + # ensure it exists before the st sources that include it compile. + add_dependencies(ST_OBJECT_LIB PULSAR_OBJECT_LIB) +endif () +target_include_directories(ST_OBJECT_LIB PRIVATE ${LIB_AUTOGEN_DIR}) +target_include_directories(ST_OBJECT_LIB PUBLIC + "${CMAKE_SOURCE_DIR}" + "${CMAKE_SOURCE_DIR}/include" + "${CMAKE_BINARY_DIR}/include") + include(CheckCXXSymbolExists) if (BUILD_DYNAMIC_LIB) - add_library(pulsarShared SHARED $) + add_library(pulsarShared SHARED $ $) set_property(TARGET pulsarShared PROPERTY OUTPUT_NAME ${LIB_NAME_SHARED}) set_property(TARGET pulsarShared PROPERTY VERSION ${LIBRARY_VERSION}) target_link_libraries(pulsarShared PRIVATE ${COMMON_LIBS} ${CMAKE_DL_LIBS}) @@ -102,7 +130,7 @@ if(HAVE_AUXV_GETAUXVAL) endif() if (BUILD_STATIC_LIB) - add_library(pulsarStatic STATIC $) + add_library(pulsarStatic STATIC $ $) target_include_directories(pulsarStatic PUBLIC ${CMAKE_BINARY_DIR}/include ${CMAKE_SOURCE_DIR} @@ -139,7 +167,7 @@ if (LINK_STATIC AND BUILD_STATIC_LIB) set(${OUTLIST} ${TEMP_OUT} PARENT_SCOPE) endfunction(remove_libtype) - add_library(pulsarStaticWithDeps STATIC ${PULSAR_SOURCES}) + add_library(pulsarStaticWithDeps STATIC ${PULSAR_SOURCES} $) target_include_directories(pulsarStaticWithDeps PRIVATE ${dlfcn-win32_INCLUDE_DIRS}) if (VCPKG_TRIPLET) # Collect ALL vcpkg-installed static archives so every transitive dependency diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 95597034..474a96b1 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -313,6 +313,11 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC LOG_DEBUG("Current max message size is: " << maxMessageSize_); } + if (cmdConnected.has_feature_flags()) { + supportsScalableTopics_.store(cmdConnected.feature_flags().supports_scalable_topics(), + std::memory_order_release); + } + Lock lock(mutex_); if (isClosed()) { @@ -988,6 +993,10 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { handleAckResponse(incomingCmd.ackresponse()); break; + case BaseCommand::SCALABLE_TOPIC_UPDATE: + handleScalableTopicUpdate(incomingCmd.scalabletopicupdate()); + break; + default: LOG_WARN(cnxString() << "Received invalid message from server"); close(Error{ResultDisconnected, cnxString() + "Received invalid message from server"}); @@ -1267,6 +1276,7 @@ const std::future& ClientConnection::close(Error&& error, bool switchClust auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_); auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_); auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_); + auto scalableTopicSessions = std::move(scalableTopicSessions_); numOfPendingLookupRequest_ = 0; @@ -1341,6 +1351,11 @@ const std::future& ClientConnection::close(Error&& error, bool switchClust for (auto& kv : pendingLookupRequests) { kv.second->fail(error); } + // Notify scalable-topic DAG-watch sessions so they can re-establish on a + // new connection. + for (auto& kv : scalableTopicSessions) { + kv.second(error.result, nullptr); + } for (auto& kv : pendingConsumerStatsMap) { LOG_ERROR(cnxString() << " Closing Client Connection, please try again later"); kv.second.setFailed(result); @@ -1379,6 +1394,39 @@ void ClientConnection::removeConsumer(int consumerId) { consumers_.erase(consumerId); } +bool ClientConnection::registerScalableTopicSession(uint64_t sessionId, + ScalableTopicUpdateListener listener) { + Lock lock(mutex_); + if (isClosed()) { + return false; + } + scalableTopicSessions_[sessionId] = std::move(listener); + return true; +} + +void ClientConnection::removeScalableTopicSession(uint64_t sessionId) { + Lock lock(mutex_); + scalableTopicSessions_.erase(sessionId); +} + +void ClientConnection::handleScalableTopicUpdate(const proto::CommandScalableTopicUpdate& update) { + ScalableTopicUpdateListener listener; + { + Lock lock(mutex_); + auto it = scalableTopicSessions_.find(update.session_id()); + if (it != scalableTopicSessions_.end()) { + listener = it->second; + } + } + if (listener) { + listener(ResultOk, &update); + } else { + // A push may race with a just-closed session; drop it rather than + // treating it as a protocol violation. + LOG_WARN(cnxString() << "Received SCALABLE_TOPIC_UPDATE for unknown session " << update.session_id()); + } +} + const std::string& ClientConnection::brokerAddress() const { return physicalAddress_; } int ClientConnection::getServerProtocolVersion() const { return serverProtocolVersion_; } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index fd89fae5..8591f546 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -105,6 +105,7 @@ class CommandGetLastMessageIdResponse; class CommandLookupTopicResponse; class CommandPartitionedTopicMetadataResponse; class CommandProducerSuccess; +class CommandScalableTopicUpdate; class CommandSendReceipt; class CommandSendError; class CommandSuccess; @@ -187,6 +188,23 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this ScalableTopicUpdateListener; + + /** + * Register a DAG-watch session. Returns false (without registering) if the + * connection is already closed — the caller should acquire a new connection. + */ + bool registerScalableTopicSession(uint64_t sessionId, ScalableTopicUpdateListener listener); + void removeScalableTopicSession(uint64_t sessionId); + + /** Whether the broker advertised scalable-topics support on CONNECTED. */ + bool supportsScalableTopics() const { return supportsScalableTopics_.load(std::memory_order_acquire); } + /** * Send a request with a specific Id over the connection. The future will be * triggered when the response for this request is received @@ -354,6 +372,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this ConsumersMap; ConsumersMap consumers_; + // Scalable topics: DAG-watch sessions by client-assigned session id. + typedef std::map ScalableTopicSessionsMap; + ScalableTopicSessionsMap scalableTopicSessions_; + std::atomic supportsScalableTopics_{false}; + typedef std::map> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; @@ -436,6 +459,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&); optional getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&); std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&); diff --git a/lib/Commands.cc b/lib/Commands.cc index 3dd22591..9b800874 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -55,6 +55,7 @@ using proto::CommandLookupTopic; using proto::CommandPartitionedTopicMetadata; using proto::CommandProducer; using proto::CommandRedeliverUnacknowledgedMessages; +using proto::CommandScalableTopicLookup; using proto::CommandSeek; using proto::CommandSend; using proto::CommandSubscribe; @@ -285,6 +286,7 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const FeatureFlags* flags = connect->mutable_feature_flags(); flags->set_supports_auth_refresh(true); flags->set_supports_broker_entry_metadata(true); + flags->set_supports_scalable_topics(true); if (connectingThroughProxy) { Url logicalAddressUrl; Url::parse(logicalAddress, logicalAddressUrl); @@ -552,6 +554,24 @@ SharedBuffer Commands::newCloseConsumer(uint64_t consumerId, uint64_t requestId) return writeMessageWithSize(cmd); } +SharedBuffer Commands::newScalableTopicLookup(uint64_t sessionId, const std::string& topic, + bool createIfMissing) { + BaseCommand cmd; + cmd.set_type(BaseCommand::SCALABLE_TOPIC_LOOKUP); + CommandScalableTopicLookup* lookup = cmd.mutable_scalabletopiclookup(); + lookup->set_session_id(sessionId); + lookup->set_topic(topic); + lookup->set_create_if_missing(createIfMissing); + return writeMessageWithSize(cmd); +} + +SharedBuffer Commands::newScalableTopicClose(uint64_t sessionId) { + BaseCommand cmd; + cmd.set_type(BaseCommand::SCALABLE_TOPIC_CLOSE); + cmd.mutable_scalabletopicclose()->set_session_id(sessionId); + return writeMessageWithSize(cmd); +} + SharedBuffer Commands::newPing() { BaseCommand cmd; cmd.set_type(BaseCommand::PING); @@ -808,6 +828,42 @@ std::string Commands::messageType(BaseCommand_Type type) { case BaseCommand::WATCH_TOPIC_LIST_CLOSE: return "WATCH_TOPIC_LIST_CLOSE"; break; + case BaseCommand::SCALABLE_TOPIC_LOOKUP: + return "SCALABLE_TOPIC_LOOKUP"; + break; + case BaseCommand::SCALABLE_TOPIC_UPDATE: + return "SCALABLE_TOPIC_UPDATE"; + break; + case BaseCommand::SCALABLE_TOPIC_CLOSE: + return "SCALABLE_TOPIC_CLOSE"; + break; + case BaseCommand::SCALABLE_TOPIC_SUBSCRIBE: + return "SCALABLE_TOPIC_SUBSCRIBE"; + break; + case BaseCommand::SCALABLE_TOPIC_SUBSCRIBE_RESPONSE: + return "SCALABLE_TOPIC_SUBSCRIBE_RESPONSE"; + break; + case BaseCommand::SCALABLE_TOPIC_ASSIGNMENT_UPDATE: + return "SCALABLE_TOPIC_ASSIGNMENT_UPDATE"; + break; + case BaseCommand::WATCH_SCALABLE_TOPICS: + return "WATCH_SCALABLE_TOPICS"; + break; + case BaseCommand::WATCH_SCALABLE_TOPICS_UPDATE: + return "WATCH_SCALABLE_TOPICS_UPDATE"; + break; + case BaseCommand::WATCH_SCALABLE_TOPICS_CLOSE: + return "WATCH_SCALABLE_TOPICS_CLOSE"; + break; + case BaseCommand::WATCH_TC_ASSIGNMENTS: + return "WATCH_TC_ASSIGNMENTS"; + break; + case BaseCommand::WATCH_TC_ASSIGNMENTS_UPDATE: + return "WATCH_TC_ASSIGNMENTS_UPDATE"; + break; + case BaseCommand::WATCH_TC_ASSIGNMENTS_CLOSE: + return "WATCH_TC_ASSIGNMENTS_CLOSE"; + break; }; BOOST_THROW_EXCEPTION(std::logic_error("Invalid BaseCommand enumeration value")); } diff --git a/lib/Commands.h b/lib/Commands.h index 8403d6e2..622f95d7 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -147,6 +147,13 @@ class Commands { static SharedBuffer newRedeliverUnacknowledgedMessages(uint64_t consumerId, const std::set& messageIds); + // Scalable topics (pulsar::st): open/close a DAG-watch session. The broker + // answers (and later pushes) CommandScalableTopicUpdate correlated by the + // client-assigned sessionId. + static SharedBuffer newScalableTopicLookup(uint64_t sessionId, const std::string& topic, + bool createIfMissing); + static SharedBuffer newScalableTopicClose(uint64_t sessionId); + static std::string messageType(BaseCommand_Type type); static void initBatchMessageMetadata(const Message& msg, pulsar::proto::MessageMetadata& batchMetadata); diff --git a/lib/Murmur3_32Hash.cc b/lib/Murmur3_32Hash.cc index 45a49885..dff6aedc 100644 --- a/lib/Murmur3_32Hash.cc +++ b/lib/Murmur3_32Hash.cc @@ -60,6 +60,8 @@ int32_t Murmur3_32Hash::makeHash(const std::string &key) { return static_cast(makeHash(&key.front(), key.length()) & std::numeric_limits::max()); } +uint32_t Murmur3_32Hash::makeRawHash(const void *key, int64_t len) { return makeHash(key, len); } + uint32_t Murmur3_32Hash::makeHash(const void *key, const int64_t len) { const uint8_t *data = reinterpret_cast(key); const int nblocks = len / MACRO_CHUNK_SIZE; diff --git a/lib/Murmur3_32Hash.h b/lib/Murmur3_32Hash.h index d83635d2..d293489c 100644 --- a/lib/Murmur3_32Hash.h +++ b/lib/Murmur3_32Hash.h @@ -39,6 +39,11 @@ class PULSAR_PUBLIC Murmur3_32Hash : public Hash { int32_t makeHash(const std::string& key); + // The raw (unmasked) 32-bit hash. Unlike makeHash(), bit 31 is NOT cleared; + // the scalable-topics key hashing splits this into two independent 16-bit + // halves and needs the high half full-range. + uint32_t makeRawHash(const void* key, int64_t len); + private: uint32_t seed; diff --git a/lib/st/Checkpoint.cc b/lib/st/Checkpoint.cc new file mode 100644 index 00000000..58125047 --- /dev/null +++ b/lib/st/Checkpoint.cc @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +namespace pulsar::st { + +// An empty checkpoint: impl_ stays null, so the handle is falsy under +// operator bool and must not be used as a start position. +Checkpoint::Checkpoint() = default; + +} // namespace pulsar::st diff --git a/lib/st/ClientCore.cc b/lib/st/ClientCore.cc new file mode 100644 index 00000000..4da77f3b --- /dev/null +++ b/lib/st/ClientCore.cc @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include + +#include "ClientImpl.h" + +namespace pulsar::st::detail { + +Future ClientCore::createProducerAsync(ProducerConfig config) const { + return impl_->createProducerAsync(std::move(config)); +} + +Future ClientCore::subscribeStreamAsync(StreamConsumerConfig config) const { + return impl_->subscribeStreamAsync(std::move(config)); +} + +Future ClientCore::subscribeQueueAsync(QueueConsumerConfig config) const { + return impl_->subscribeQueueAsync(std::move(config)); +} + +Future ClientCore::createCheckpointConsumerAsync( + CheckpointConsumerConfig config) const { + return impl_->createCheckpointConsumerAsync(std::move(config)); +} + +Future ClientCore::newTransactionAsync() const { return impl_->newTransactionAsync(); } + +Future ClientCore::closeAsync() const { return impl_->closeAsync(); } + +void ClientCore::shutdown() const { impl_->shutdown(); } + +} // namespace pulsar::st::detail diff --git a/lib/st/ClientImpl.cc b/lib/st/ClientImpl.cc new file mode 100644 index 00000000..74c9e62c --- /dev/null +++ b/lib/st/ClientImpl.cc @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "ClientImpl.h" + +#include +#include + +namespace pulsar::st { + +namespace { + +// Placeholder for the operations whose lib/st implementation has not landed +// yet: fail the returned future instead of hanging or crashing. Each of these +// turns into the real implementation in its own phase (producer first). +template +Future notImplementedYet(const char* what) { + detail::Promise promise; + promise.setError(Error{ResultOperationNotSupported, + std::string(what) + " is not implemented yet in the scalable-topics client"}); + return promise.getFuture(); +} + +} // namespace + +ClientImpl::ClientImpl(pulsar::ClientImplPtr classicClient, const TransactionPolicy& transactionPolicy) + : classic_(std::move(classicClient)), transactionPolicy_(transactionPolicy) {} + +// The producer/consumer/transaction paths land in follow-up phases. Each takes +// its config by value (the sink the real implementation will move from), but as +// a stub it does not consume the config yet — hence the value-param suppressions. +// NOLINTNEXTLINE(performance-unnecessary-value-param) +Future ClientImpl::createProducerAsync(ProducerConfig) { + return notImplementedYet("createProducer"); +} + +// NOLINTNEXTLINE(performance-unnecessary-value-param) +Future ClientImpl::subscribeStreamAsync(StreamConsumerConfig) { + return notImplementedYet("subscribeStream"); +} + +// NOLINTNEXTLINE(performance-unnecessary-value-param) +Future ClientImpl::subscribeQueueAsync(QueueConsumerConfig) { + return notImplementedYet("subscribeQueue"); +} + +// NOLINTNEXTLINE(performance-unnecessary-value-param) +Future ClientImpl::createCheckpointConsumerAsync(CheckpointConsumerConfig) { + return notImplementedYet("createCheckpointConsumer"); +} + +Future ClientImpl::newTransactionAsync() { + return notImplementedYet("newTransaction"); +} + +Future ClientImpl::closeAsync() { + detail::Promise promise; + classic_->closeAsync([promise](pulsar::Result result) { + if (result == pulsar::ResultOk) { + promise.setSuccess(); + } else { + promise.setError(Error{result, "failed to close the client"}); + } + }); + return promise.getFuture(); +} + +void ClientImpl::shutdown() { classic_->shutdown(); } + +} // namespace pulsar::st diff --git a/lib/st/ClientImpl.h b/lib/st/ClientImpl.h new file mode 100644 index 00000000..6a17f331 --- /dev/null +++ b/lib/st/ClientImpl.h @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "lib/ClientImpl.h" + +namespace pulsar::st { + +/** + * The hidden client behind `detail::ClientCore`. + * + * It owns a classic `pulsar::ClientImpl`, reusing its connection pool, executor + * pools, lookup service and memory limiting wholesale — a scalable topic's + * segments are ordinary persistent topics broker-side, so the underlying + * transport stack is unchanged. The scalable-topics specific machinery (DAG + * watch, segment routing, controller sessions) is layered on top in lib/st, + * phase by phase. + */ +class ClientImpl { + public: + ClientImpl(pulsar::ClientImplPtr classicClient, const TransactionPolicy& transactionPolicy); + + Future createProducerAsync(ProducerConfig config); + Future subscribeStreamAsync(StreamConsumerConfig config); + Future subscribeQueueAsync(QueueConsumerConfig config); + Future createCheckpointConsumerAsync(CheckpointConsumerConfig config); + Future newTransactionAsync(); + Future closeAsync(); + void shutdown(); + + /** The wrapped classic client (connection pool, executors, lookup). */ + pulsar::ClientImpl& classicClient() { return *classic_; } + + private: + pulsar::ClientImplPtr classic_; + TransactionPolicy transactionPolicy_; // applied when the transaction-coordinator session lands +}; + +} // namespace pulsar::st diff --git a/lib/st/DagWatchSession.cc b/lib/st/DagWatchSession.cc new file mode 100644 index 00000000..fb50fdd1 --- /dev/null +++ b/lib/st/DagWatchSession.cc @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "DagWatchSession.h" + +#include +#include + +#include "PulsarApi.pb.h" +#include "lib/Commands.h" +#include "lib/ExecutorService.h" +#include "lib/LogUtils.h" + +DECLARE_LOG_OBJECT() + +namespace pulsar::st { + +namespace { + +std::atomic sessionIdGenerator{0}; + +Result toResult(pulsar::proto::ServerError error) { + return error == pulsar::proto::TopicNotFound ? ResultTopicNotFound : ResultLookupError; +} + +} // namespace + +DagWatchSession::DagWatchSession(pulsar::ClientImplPtr client, std::string topic, bool createIfMissing) + : client_(std::move(client)), + inputTopic_(std::move(topic)), + createIfMissing_(createIfMissing), + sessionId_(++sessionIdGenerator), + backoff_(std::chrono::milliseconds(100), std::chrono::seconds(30), std::chrono::milliseconds(0)), + reconnectTimer_(client_->getIOExecutorProvider()->get()->createDeadlineTimer()) {} + +std::string DagWatchSession::lookupCompatibleTopic(const std::string& topic) { + constexpr std::string_view kTopicScheme = "topic://"; + if (topic.rfind(kTopicScheme, 0) == 0) { + return "persistent://" + topic.substr(kTopicScheme.size()); + } + return topic; +} + +Future DagWatchSession::start() { + auto self = shared_from_this(); + client_->getConnection("", lookupCompatibleTopic(inputTopic_), static_cast(sessionId_)) + .addListener([self](pulsar::Result result, const pulsar::ClientConnectionPtr& cnx) { + if (result == pulsar::ResultOk) { + self->attach(cnx); + } else { + self->initialLayoutPromise_.setError( + Error{result, "failed to get a connection for the scalable-topic lookup"}); + } + }); + return initialLayoutPromise_.getFuture(); +} + +void DagWatchSession::attach(const pulsar::ClientConnectionPtr& cnx) { + if (closed_) { + return; + } + if (!cnx->supportsScalableTopics()) { + Error error{ResultUnsupportedVersionError, "the broker does not support scalable topics"}; + if (!initialLayoutPromise_.setError(error)) { + // Initial layout already delivered: keep retrying, another broker in + // the cluster may still support the feature. + LOG_WARN("[" << inputTopic_ << "] session " << sessionId_ + << ": reconnect target broker does not support scalable topics"); + scheduleReconnect(); + } + return; + } + + { + std::lock_guard lock(mutex_); + cnx_ = cnx; + } + + auto self = shared_from_this(); + bool registered = cnx->registerScalableTopicSession( + sessionId_, [self](pulsar::Result result, const pulsar::proto::CommandScalableTopicUpdate* update) { + self->handleSessionEvent(result, update); + }); + if (!registered) { + // The connection closed between acquisition and registration. + handleConnectionClosed(); + return; + } + cnx->sendCommand(Commands::newScalableTopicLookup(sessionId_, inputTopic_, createIfMissing_)); + // A failed write closes the connection, which notifies the session through + // the registry: no separate write-failure path is needed here. +} + +void DagWatchSession::handleSessionEvent(pulsar::Result result, + const pulsar::proto::CommandScalableTopicUpdate* update) { + if (closed_) { + return; + } + if (update == nullptr) { + (void)result; + handleConnectionClosed(); + } else if (update->has_error()) { + handleServerError(*update); + } else if (update->has_dag()) { + handleDagUpdate(*update); + } else { + LOG_WARN("[" << inputTopic_ << "] session " << sessionId_ + << ": update carries neither a DAG nor an error; ignoring"); + } +} + +void DagWatchSession::handleDagUpdate(const pulsar::proto::CommandScalableTopicUpdate& update) { + std::string resolved; + { + std::lock_guard lock(mutex_); + if (update.has_resolved_topic_name()) { + resolvedTopicName_ = update.resolved_topic_name(); + } + resolved = !resolvedTopicName_.empty() ? resolvedTopicName_ : inputTopic_; + } + + auto layout = SegmentLayout::fromProto(update.dag(), resolved); + if (!layout) { + LOG_ERROR("[" << inputTopic_ << "] session " << sessionId_ + << ": discarding malformed DAG update: " << layout.error().message); + // A malformed FIRST response must fail start() rather than hang it; after + // that, keep the last good layout. + initialLayoutPromise_.setError(layout.error()); + return; + } + + SegmentLayout oldLayout; + LayoutChangeListener listener; + { + std::lock_guard lock(mutex_); + oldLayout = std::move(currentLayout_); + currentLayout_ = *layout; + listener = listener_; + } + + // The broker confirmed the session is live and our state is fresh. + backoff_.reset(); + + LOG_INFO("[" << resolved << "] session " << sessionId_ << ": layout updated, epoch " << oldLayout.epoch() + << " -> " << layout->epoch() << ", " << layout->activeSegments().size() + << " active segments"); + + initialLayoutPromise_.complete(*layout); + if (listener) { + listener(*layout, oldLayout); + } +} + +void DagWatchSession::handleServerError(const pulsar::proto::CommandScalableTopicUpdate& update) { + const std::string message = update.has_message() ? update.message() : "scalable-topic lookup failed"; + LOG_ERROR("[" << inputTopic_ << "] session " << sessionId_ << ": broker reported " << update.error() + << ": " << message); + // Before the initial layout this is a lookup failure and start() surfaces it; + // afterwards broker-side errors are transient (a reconnect clears them) and + // the connection-closed path picks them up. + initialLayoutPromise_.setError(Error{toResult(update.error()), message}); +} + +void DagWatchSession::handleConnectionClosed() { + { + std::lock_guard lock(mutex_); + cnx_.reset(); + } + if (closed_) { + return; + } + if (initialLayoutPromise_.setError( + Error{ResultConnectError, "connection closed while waiting for the scalable-topic layout"})) { + // The initial lookup never completed: surface the failure to start()'s + // caller instead of retrying behind its back. + return; + } + scheduleReconnect(); +} + +void DagWatchSession::scheduleReconnect() { + if (closed_) { + return; + } + auto delay = backoff_.next(); + LOG_INFO("[" << inputTopic_ << "] session " << sessionId_ << ": reconnecting the DAG watch in " + << std::chrono::duration_cast(delay).count() << " ms"); + std::weak_ptr weakSelf = shared_from_this(); + reconnectTimer_->expires_from_now(delay); + reconnectTimer_->async_wait([weakSelf](const ASIO_ERROR& error) { + auto self = weakSelf.lock(); + if (self && !error) { + self->reconnect(); + } + }); +} + +void DagWatchSession::reconnect() { + if (closed_) { + return; + } + auto self = shared_from_this(); + client_->getConnection("", lookupCompatibleTopic(inputTopic_), static_cast(sessionId_)) + .addListener([self](pulsar::Result result, const pulsar::ClientConnectionPtr& cnx) { + if (result == pulsar::ResultOk) { + self->attach(cnx); + } else { + LOG_WARN("[" << self->inputTopic_ << "] session " << self->sessionId_ + << ": DAG watch reconnect failed (" << result << "); will retry"); + self->scheduleReconnect(); + } + }); +} + +SegmentLayout DagWatchSession::currentLayout() const { + std::lock_guard lock(mutex_); + return currentLayout_; +} + +std::string DagWatchSession::topicName() const { + std::lock_guard lock(mutex_); + return !resolvedTopicName_.empty() ? resolvedTopicName_ : inputTopic_; +} + +void DagWatchSession::setLayoutChangeListener(LayoutChangeListener listener) { + std::lock_guard lock(mutex_); + listener_ = std::move(listener); +} + +void DagWatchSession::close() { + if (closed_.exchange(true)) { + return; + } + ASIO_ERROR ignored; + reconnectTimer_->cancel(ignored); + + pulsar::ClientConnectionPtr cnx; + { + std::lock_guard lock(mutex_); + cnx = cnx_.lock(); + cnx_.reset(); + } + if (cnx) { + cnx->removeScalableTopicSession(sessionId_); + cnx->sendCommand(Commands::newScalableTopicClose(sessionId_)); + } +} + +} // namespace pulsar::st diff --git a/lib/st/DagWatchSession.h b/lib/st/DagWatchSession.h new file mode 100644 index 00000000..93b9ed8b --- /dev/null +++ b/lib/st/DagWatchSession.h @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include + +#include "SegmentLayout.h" +#include "lib/Backoff.h" +#include "lib/ClientConnection.h" +#include "lib/ClientImpl.h" + +namespace pulsar::st { + +/** + * The DAG-watch session of one scalable topic, ported from the Java v5 client's + * DagWatchClient so the two clients behave identically. + * + * start() connects to a broker (through the classic client's pool) and sends a + * CommandScalableTopicLookup; the broker answers — and afterwards pushes — a + * CommandScalableTopicUpdate carrying the full DAG, correlated by the + * client-assigned session id. Each accepted update replaces the current + * SegmentLayout and is reported to the layout-change listener. When the + * connection drops after the initial layout, the session reconnects with + * exponential backoff and re-issues the lookup; if it drops before the first + * layout, start()'s future fails instead (the caller's create() surfaces the + * error rather than silently retrying). Stale pushes cannot arrive out of + * order: updates are ordered per connection, and the old connection's registry + * is drained before the session re-attaches to a new one. + */ +class DagWatchSession : public std::enable_shared_from_this { + public: + /** Invoked for every accepted layout, including the first (oldLayout is empty then). */ + using LayoutChangeListener = + std::function; + + /** + * @param client the classic client whose connection pool and executors are reused. + * @param topic the scalable topic, in any accepted spelling ("topic://t/n/x", + * "persistent://t/n/x", or a short form); the broker resolves it and the + * session adopts the canonical identity from the first response. + * @param createIfMissing whether the broker may auto-create a missing topic on + * lookup (single-topic producers/consumers pass true; namespace watchers + * pass false so a deleted topic is not resurrected by a reconnect). + */ + DagWatchSession(pulsar::ClientImplPtr client, std::string topic, bool createIfMissing); + + /** + * Start the session. May be called once. + * @return a future completing with the initial layout, or the lookup failure. + */ + Future start(); + + /** Snapshot of the most recent layout (empty before the first update). */ + SegmentLayout currentLayout() const; + + /** The canonical topic identity, falling back to the input before the first response. */ + std::string topicName() const; + + /** Register the listener notified on every accepted layout update. */ + void setLayoutChangeListener(LayoutChangeListener listener); + + /** Close the session: stop reconnecting and tell the broker. Idempotent. */ + void close(); + + std::uint64_t sessionId() const { return sessionId_; } + + /** + * The topic spelling used for the classic connection lookup. The classic + * TopicName does not know the topic:// scheme, so that spelling maps to its + * persistent://... twin for the purpose of finding a broker; the wire lookup + * still carries the caller's original spelling. + */ + static std::string lookupCompatibleTopic(const std::string& topic); + + private: + void attach(const pulsar::ClientConnectionPtr& cnx); + void handleSessionEvent(pulsar::Result result, const pulsar::proto::CommandScalableTopicUpdate* update); + void handleDagUpdate(const pulsar::proto::CommandScalableTopicUpdate& update); + void handleServerError(const pulsar::proto::CommandScalableTopicUpdate& update); + void handleConnectionClosed(); + void scheduleReconnect(); + void reconnect(); + + pulsar::ClientImplPtr client_; + const std::string inputTopic_; + const bool createIfMissing_; + const std::uint64_t sessionId_; + pulsar::Backoff backoff_; + DeadlineTimerPtr reconnectTimer_; // global-scope alias from AsioTimer.h + detail::Promise initialLayoutPromise_; + std::atomic closed_{false}; + + mutable std::mutex mutex_; + SegmentLayout currentLayout_; // guarded by mutex_ + std::string resolvedTopicName_; // guarded by mutex_; empty until the first response + LayoutChangeListener listener_; // guarded by mutex_ + pulsar::ClientConnectionWeakPtr cnx_; // guarded by mutex_ +}; + +using DagWatchSessionPtr = std::shared_ptr; + +} // namespace pulsar::st diff --git a/lib/st/MessageId.cc b/lib/st/MessageId.cc new file mode 100644 index 00000000..4bfa037a --- /dev/null +++ b/lib/st/MessageId.cc @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +namespace pulsar::st { + +// An empty id: impl_ stays null, so the handle is falsy under operator bool and +// compares equal only to other empty ids. +MessageId::MessageId() = default; + +} // namespace pulsar::st diff --git a/lib/st/PulsarClientBuilder.cc b/lib/st/PulsarClientBuilder.cc new file mode 100644 index 00000000..79b22e4c --- /dev/null +++ b/lib/st/PulsarClientBuilder.cc @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include +#include +#include + +#include "ClientImpl.h" + +namespace pulsar::st { + +namespace { + +bool hasTlsScheme(const std::string& url) { + return url.rfind("pulsar+ssl://", 0) == 0 || url.rfind("https://", 0) == 0; +} + +} // namespace + +Expected PulsarClientBuilder::build() { + if (serviceUrl_.empty()) { + return unexpected(ResultInvalidUrl, "serviceUrl is required; set it on the builder"); + } + if (tlsPolicy_.enabled && !hasTlsScheme(serviceUrl_)) { + return unexpected( + ResultInvalidConfiguration, + "tlsPolicy.enabled requires a pulsar+ssl:// or https:// service URL, got: " + serviceUrl_); + } + + // Map the scalable-topics policy structs onto the classic client + // configuration: the underlying transport stack (connection pool, executor + // pools, lookup) is reused as-is. Fields left unset keep the classic + // defaults. + pulsar::ClientConfiguration conf; + if (authentication_) { + conf.setAuth(authentication_); + } + const ConnectionPolicy& connection = connectionPolicy_; + if (connection.connectionsPerBroker) { + conf.setConnectionsPerBroker(*connection.connectionsPerBroker); + } + if (connection.connectionTimeout) { + conf.setConnectionTimeout(static_cast(connection.connectionTimeout->count())); + } + if (connection.operationTimeout) { + conf.setOperationTimeoutMs(static_cast(connection.operationTimeout->count())); + } + if (connection.keepAliveInterval) { + conf.setKeepAliveIntervalInSeconds(static_cast(connection.keepAliveInterval->count())); + } + if (connection.maxLookupRequests) { + conf.setConcurrentLookupRequest(*connection.maxLookupRequests); + } + if (connection.maxLookupRedirects) { + conf.setMaxLookupRedirects(*connection.maxLookupRedirects); + } + // TODO(scalable-topics): connection.maxConnectionIdleTime has no classic + // ClientConfiguration counterpart; wire it up when lib/st manages its own + // connection reaping. + if (connection.listenerName) { + conf.setListenerName(*connection.listenerName); + } + if (threadPolicy_.ioThreads) { + conf.setIOThreads(*threadPolicy_.ioThreads); + } + if (threadPolicy_.messageListenerThreads) { + conf.setMessageListenerThreads(*threadPolicy_.messageListenerThreads); + } + if (memoryPolicy_.limit) { + conf.setMemoryLimit(memoryPolicy_.limit->bytes); + } + if (backoffPolicy_.initialBackoff) { + conf.setInitialBackoffIntervalMs(static_cast(backoffPolicy_.initialBackoff->count())); + } + if (backoffPolicy_.maxBackoff) { + conf.setMaxBackoffIntervalMs(static_cast(backoffPolicy_.maxBackoff->count())); + } + if (tlsPolicy_.trustCertsFilePath) { + conf.setTlsTrustCertsFilePath(*tlsPolicy_.trustCertsFilePath); + } + if (tlsPolicy_.certificateFilePath) { + conf.setTlsCertificateFilePath(*tlsPolicy_.certificateFilePath); + } + if (tlsPolicy_.privateKeyFilePath) { + conf.setTlsPrivateKeyFilePath(*tlsPolicy_.privateKeyFilePath); + } + conf.setTlsAllowInsecureConnection(tlsPolicy_.allowInsecureConnection); + conf.setValidateHostName(tlsPolicy_.validateHostname); + + try { + auto classic = std::make_shared(serviceUrl_, conf); + classic->initialize(); + auto impl = std::make_shared(std::move(classic), transactionPolicy_); + return PulsarClient(detail::ClientCore(std::move(impl))); + } catch (const std::exception& e) { + return unexpected(ResultInvalidUrl, std::string("failed to initialize the client: ") + e.what()); + } +} + +} // namespace pulsar::st diff --git a/lib/st/SegmentLayout.cc b/lib/st/SegmentLayout.cc new file mode 100644 index 00000000..349c41fc --- /dev/null +++ b/lib/st/SegmentLayout.cc @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "SegmentLayout.h" + +#include +#include +#include + +#include "PulsarApi.pb.h" +#include "lib/Murmur3_32Hash.h" + +namespace pulsar::st { + +namespace { + +constexpr std::string_view kTopicScheme = "topic://"; +constexpr std::string_view kSegmentScheme = "segment://"; + +// "--" with the range bounds as 4-digit hex, matching +// SegmentTopicName.formatDescriptor in the Java client/broker. +std::string segmentDescriptor(const HashRange& range, std::uint64_t segmentId) { + char buf[64]; + std::snprintf(buf, sizeof(buf), "%04x-%04x-%llu", range.start, range.end, + static_cast(segmentId)); + return buf; +} + +int signSafeMod(int dividend, int divisor) { + int mod = dividend % divisor; + return mod < 0 ? mod + divisor : mod; +} + +} // namespace + +std::uint32_t ScalableTopicHashing::murmur(std::string_view key) { + return pulsar::Murmur3_32Hash().makeRawHash(key.data(), static_cast(key.size())); +} + +Expected SegmentLayout::fromProto(const pulsar::proto::ScalableTopicDAG& dag, + const std::string& resolvedTopicName) { + if (resolvedTopicName.rfind(kTopicScheme, 0) != 0) { + return unexpected( + ResultInvalidTopicName, + "resolved scalable-topic name must use the topic:// scheme, got: " + resolvedTopicName); + } + // "topic://tenant/ns/topic" -> "tenant/ns/topic"; the segment topics live at + // "segment://tenant/ns/topic/". + const std::string topicPath = resolvedTopicName.substr(kTopicScheme.size()); + + SegmentLayout layout; + layout.epoch_ = dag.epoch(); + + for (int i = 0; i < dag.segment_brokers_size(); i++) { + const auto& addr = dag.segment_brokers(i); + BrokerAddress broker; + broker.url = addr.broker_url(); + if (addr.has_broker_url_tls()) { + broker.urlTls = addr.broker_url_tls(); + } + layout.segmentBrokers_[addr.segment_id()] = std::move(broker); + } + + for (int i = 0; i < dag.segments_size(); i++) { + const auto& seg = dag.segments(i); + if (seg.hash_start() > HashRange::kMaxHash || seg.hash_end() > HashRange::kMaxHash || + seg.hash_end() < seg.hash_start()) { + return unexpected(ResultUnknownError, + "malformed DAG: segment " + std::to_string(seg.segment_id()) + + " has an invalid hash range [" + std::to_string(seg.hash_start()) + ", " + + std::to_string(seg.hash_end()) + "]"); + } + + Segment segment; + segment.segmentId = seg.segment_id(); + segment.range = HashRange{seg.hash_start(), seg.hash_end()}; + segment.segmentTopicName = std::string(kSegmentScheme) + topicPath + "/" + + segmentDescriptor(segment.range, seg.segment_id()); + if (seg.has_legacy_topic_name()) { + segment.legacyTopicName = seg.legacy_topic_name(); + } + segment.entryBucketSplits.reserve(seg.entry_bucket_splits_size()); + for (int j = 0; j < seg.entry_bucket_splits_size(); j++) { + segment.entryBucketSplits.push_back(seg.entry_bucket_splits(j)); + } + + if (seg.state() == pulsar::proto::ACTIVE) { + layout.activeSegments_.push_back(std::move(segment)); + } else { + layout.sealedSegments_.push_back(std::move(segment)); + } + } + + // Active segments sort by hash-range start (the routing order); sealed order + // doesn't matter for correctness, sort by id for stable iteration. + std::sort(layout.activeSegments_.begin(), layout.activeSegments_.end(), + [](const Segment& a, const Segment& b) { return a.range.start < b.range.start; }); + std::sort(layout.sealedSegments_.begin(), layout.sealedSegments_.end(), + [](const Segment& a, const Segment& b) { return a.segmentId < b.segmentId; }); + + if (dag.has_controller_broker_url()) { + layout.controllerBrokerUrl_ = dag.controller_broker_url(); + } + if (dag.has_controller_broker_url_tls()) { + layout.controllerBrokerUrlTls_ = dag.controller_broker_url_tls(); + } + return layout; +} + +const std::string* SegmentLayout::brokerUrl(std::uint64_t segmentId) const { + auto it = segmentBrokers_.find(segmentId); + return it != segmentBrokers_.end() ? &it->second.url : nullptr; +} + +const std::string* SegmentLayout::brokerUrlTls(std::uint64_t segmentId) const { + auto it = segmentBrokers_.find(segmentId); + return (it != segmentBrokers_.end() && it->second.urlTls) ? &*it->second.urlTls : nullptr; +} + +Expected SegmentRouter::route(std::string_view key, const SegmentLayout& layout) { + const auto& active = layout.activeSegments(); + if (active.empty()) { + return unexpected(ResultServiceUnitNotReady, "no active segments in the topic layout"); + } + + // Synthetic layout for a not-yet-migrated regular topic: route + // signSafeMod(classicMurmur3(key), N) over segment_id, exactly like the + // classic partitioned-topic producers still attached to the same topic. + const bool allLegacy = + std::all_of(active.begin(), active.end(), [](const Segment& s) { return s.isLegacy(); }); + if (allLegacy) { + const int hash32 = pulsar::Murmur3_32Hash().makeHash(std::string(key)); + const int partition = signSafeMod(hash32, static_cast(active.size())); + for (const auto& segment : active) { + if (segment.segmentId == static_cast(partition)) { + return segment.segmentId; + } + } + return unexpected(ResultUnknownError, + "synthetic layout is missing segment_id=" + std::to_string(partition) + + " (N=" + std::to_string(active.size()) + ")"); + } + + const std::uint32_t hash = ScalableTopicHashing::segmentHash(ScalableTopicHashing::murmur(key)); + for (const auto& segment : active) { + if (segment.range.contains(hash)) { + return segment.segmentId; + } + } + return unexpected(ResultUnknownError, + "no active segment covers hash " + std::to_string(hash) + " for the message key"); +} + +Expected SegmentRouter::routeRoundRobin(const SegmentLayout& layout) { + const auto& active = layout.activeSegments(); + if (active.empty()) { + return unexpected(ResultServiceUnitNotReady, "no active segments in the topic layout"); + } + const std::uint32_t index = roundRobinCounter_.fetch_add(1, std::memory_order_relaxed); + return active[index % active.size()].segmentId; +} + +} // namespace pulsar::st diff --git a/lib/st/SegmentLayout.h b/lib/st/SegmentLayout.h new file mode 100644 index 00000000..d484997c --- /dev/null +++ b/lib/st/SegmentLayout.h @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +// The client-side model of a scalable topic's segment layout, built from the +// ScalableTopicDAG the broker sends on the DAG-watch session, plus the key -> +// segment router. Ported from the Java v5 client (ClientSegmentLayout / +// SegmentRouter / ScalableTopicHashing) so the two clients route identically. + +namespace pulsar { +namespace proto { +class ScalableTopicDAG; +} +} // namespace pulsar + +namespace pulsar::st { + +/** + * Scalable-topic key hashing — the single source of truth for both segment + * routing and entry-bucketing. + * + * A single raw (unmasked) 32-bit Murmur3 hash of a key splits into two + * independent 16-bit halves: the HIGH half routes segments, the LOW half + * routes entry-buckets. The raw hash is required so the high half is + * full-range (the classic masked hash clears bit 31, which would confine the + * high half to [0, 0x7FFF]). Compute murmur() once per key and split it. + */ +struct ScalableTopicHashing { + /** The raw 32-bit Murmur3 hash of a key. */ + static std::uint32_t murmur(std::string_view key); + + /** The 16-bit segment-routing hash (high 16 bits) of a precomputed murmur(). */ + static std::uint32_t segmentHash(std::uint32_t murmur) { return (murmur >> 16) & 0xFFFFu; } + + /** The 16-bit entry-bucket hash (low 16 bits) of a precomputed murmur(). */ + static std::uint32_t entryBucketHash(std::uint32_t murmur) { return murmur & 0xFFFFu; } +}; + +/** An inclusive hash range [start, end] within the 16-bit hash space (0x0000-0xFFFF). */ +struct HashRange { + static constexpr std::uint32_t kMinHash = 0x0000; + static constexpr std::uint32_t kMaxHash = 0xFFFF; + + std::uint32_t start = kMinHash; + std::uint32_t end = kMaxHash; + + bool contains(std::uint32_t hash) const { return hash >= start && hash <= end; } + bool operator==(const HashRange&) const = default; +}; + +/** + * One segment of a scalable topic, as seen by the client: its identity, the + * hash range it owns, and the topic its per-segment producer/consumer attaches + * to. + * + * `legacyTopicName` is set for legacy segments — entries in a synthetic layout + * that wrap an existing, externally managed persistent:// topic (e.g. one + * partition of a not-yet-migrated regular topic). Those attach to the wrapped + * topic; regular controller-managed segments attach to the computed + * segment://... topic. + */ +struct Segment { + std::uint64_t segmentId = 0; + HashRange range; + /** The computed segment://tenant/ns/topic/xxxx-xxxx-id topic. */ + std::string segmentTopicName; + /** Wrapped persistent:// topic for legacy segments; unset otherwise. */ + std::optional legacyTopicName; + /** Entry-bucket split points within the segment (empty = one bucket). */ + std::vector entryBucketSplits; + + bool isLegacy() const { return legacyTopicName.has_value(); } + + /** The topic a per-segment producer/consumer attaches to. */ + const std::string& attachTopicName() const { + return legacyTopicName ? *legacyTopicName : segmentTopicName; + } +}; + +/** + * Immutable client-side view of a scalable topic's segment layout at one DAG + * epoch. Built from the ScalableTopicDAG the broker returns on the DAG-watch + * session; consumers of this class swap whole layouts as updates arrive (epoch + * monotonicity is enforced by the watch session, not here). + */ +class SegmentLayout { + public: + /** An empty layout: epoch 0, no segments. */ + SegmentLayout() = default; + + /** + * Build a layout from the broker-provided DAG. + * + * @param dag the DAG snapshot from CommandScalableTopicUpdate. + * @param resolvedTopicName the canonical "topic://tenant/ns/topic" identity + * (CommandScalableTopicUpdate.resolved_topic_name); used to compute + * the per-segment segment:// topic names. + * @return the layout, or an Error if the DAG is malformed (bad topic scheme + * or an out-of-bounds hash range). + */ + static Expected fromProto(const pulsar::proto::ScalableTopicDAG& dag, + const std::string& resolvedTopicName); + + /** The DAG generation this layout represents. */ + std::uint64_t epoch() const { return epoch_; } + + /** Active segments, sorted by hash-range start (the routing order). */ + const std::vector& activeSegments() const { return activeSegments_; } + + /** Sealed segments still present in the DAG (finite, eventually drained), sorted by id. */ + const std::vector& sealedSegments() const { return sealedSegments_; } + + /** The broker serving a segment, or nullptr if the DAG did not name one. */ + const std::string* brokerUrl(std::uint64_t segmentId) const; + /** The TLS address of the broker serving a segment, or nullptr if absent. */ + const std::string* brokerUrlTls(std::uint64_t segmentId) const; + + /** The controller leader's address, when the DAG carries one. */ + const std::optional& controllerBrokerUrl() const { return controllerBrokerUrl_; } + const std::optional& controllerBrokerUrlTls() const { return controllerBrokerUrlTls_; } + + private: + struct BrokerAddress { + std::string url; + std::optional urlTls; + }; + + std::uint64_t epoch_ = 0; + std::vector activeSegments_; + std::vector sealedSegments_; + std::unordered_map segmentBrokers_; + std::optional controllerBrokerUrl_; + std::optional controllerBrokerUrlTls_; +}; + +/** + * Routes messages to segments. + * + * Keyed messages route by the 16-bit segment hash to the active segment whose + * range contains it. If EVERY active segment is a legacy segment (synthetic + * layout for a not-yet-migrated regular topic), routing switches to + * signSafeMod(classicMurmur3(key), N) over segment_id, so producers using this + * client route exactly like the classic partitioned-topic producers still + * attached to the same topic. Keyless messages route round-robin. + */ +class SegmentRouter { + public: + /** + * Route a keyed message. + * + * @return the owning segment's id, or an Error if the layout has no active + * segments (ResultServiceUnitNotReady) or no active segment covers + * the hash — a malformed DAG (ResultUnknownError). + */ + Expected route(std::string_view key, const SegmentLayout& layout); + + /** Route a keyless message round-robin across the active segments. */ + Expected routeRoundRobin(const SegmentLayout& layout); + + private: + std::atomic roundRobinCounter_{0}; +}; + +} // namespace pulsar::st diff --git a/proto/PulsarApi.proto b/proto/PulsarApi.proto index 4e207913..529c806d 100644 --- a/proto/PulsarApi.proto +++ b/proto/PulsarApi.proto @@ -45,6 +45,8 @@ message Schema { LocalTime = 18; LocalDateTime = 19; ProtobufNative = 20; + AutoConsume = 21; + External = 22; } required string name = 1; @@ -162,6 +164,24 @@ message MessageMetadata { // Indicate if the message partition key is set optional bool null_partition_key = 30 [default = false]; + + // Indicates the indexes of messages retained in the batch after compaction. When a batch is compacted, + // some messages may be removed (compacted out). For example, if the original batch contains: + // `k0 => v0, k1 => v1, k2 => v2, k1 => null`, the compacted batch will retain only `k0 => v0` and `k2 => v2`. + // In this case, this field will be set to `[0, 2]`, and the payload buffer will only include the retained messages. + // + // Note: Batches compacted by older versions of the compaction service do not include this field. For such batches, + // the `compacted_out` field in `SingleMessageMetadata` must be checked to identify and filter out compacted + // messages (e.g., `k1 => v1` and `k1 => null` in the example above). + repeated int32 compacted_batch_indexes = 31; + optional bytes schema_id = 32; + + // PIP-486 scalable-topic entry-bucketing. The effective entry-bucket hash range covered by this + // entry: the smallest and largest entry-bucket hash (the low 16 bits of the key's Murmur3_32 hash) + // among the batched messages, both bounds inclusive. Set only by scalable-topic (segment://) + // producers; the broker routes the whole entry to the consumer owning that bucket on the segment. + optional uint32 entry_hash_min = 33; + optional uint32 entry_hash_max = 34; } message SingleMessageMetadata { @@ -263,10 +283,11 @@ enum ProtocolVersion { v18 = 18; // Add client support for broker entry metadata v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse v20 = 20; // Add client support for topic migration redirection CommandTopicMigrated + v21 = 21; // Carry the AUTO_CONSUME schema to the Broker after this version } message CommandConnect { - required string client_version = 1; + required string client_version = 1; // The version of the client. Proxy should forward client's client_version. optional AuthMethod auth_method = 2; // Deprecated. Use "auth_method_name" instead. optional string auth_method_name = 5; optional bytes auth_data = 3; @@ -289,13 +310,21 @@ message CommandConnect { // Feature flags optional FeatureFlags feature_flags = 10; + + optional string proxy_version = 11; // Version of the proxy. Should only be forwarded by a proxy. } +// Please also add a new enum for the class "PulsarClientException.FailedFeatureCheck" when adding a new feature flag. message FeatureFlags { optional bool supports_auth_refresh = 1 [default = false]; optional bool supports_broker_entry_metadata = 2 [default = false]; optional bool supports_partial_producer = 3 [default = false]; optional bool supports_topic_watchers = 4 [default = false]; + optional bool supports_get_partitioned_metadata_without_auto_creation = 5 [default = false]; + optional bool supports_repl_dedup_by_lid_and_eid = 6 [default = false]; + optional bool supports_topic_watcher_reconcile = 7 [default = false]; + optional bool supports_scalable_topics = 8 [default = false]; + optional bool supports_tc_metadata_discovery = 9 [default = false]; } message CommandConnected { @@ -306,7 +335,7 @@ message CommandConnected { } message CommandAuthResponse { - optional string client_version = 1; + optional string client_version = 1; // The version of the client. Proxy should forward client's client_version. optional AuthData response = 2; optional int32 protocol_version = 3 [default = 0]; } @@ -409,6 +438,7 @@ message CommandPartitionedTopicMetadata { // to the proxy. optional string original_auth_data = 4; optional string original_auth_method = 5; + optional bool metadata_auto_creation_enabled = 6 [default = true]; } message CommandPartitionedTopicMetadataResponse { @@ -439,6 +469,8 @@ message CommandLookupTopic { optional string original_auth_method = 6; // optional string advertised_listener_name = 7; + // The properties used for topic lookup + repeated KeyValue properties = 8; } message CommandLookupTopicResponse { @@ -603,6 +635,7 @@ message CommandFlow { message CommandUnsubscribe { required uint64 consumer_id = 1; required uint64 request_id = 2; + optional bool force = 3 [default = false]; } // Reset an existing consumer to a particular message id @@ -622,7 +655,7 @@ message CommandReachedEndOfTopic { } message CommandTopicMigrated { - enum ResourceType { + enum ResourceType { Producer = 0; Consumer = 1; } @@ -633,6 +666,7 @@ message CommandTopicMigrated { } + message CommandCloseProducer { required uint64 producer_id = 1; required uint64 request_id = 2; @@ -766,6 +800,8 @@ message CommandGetTopicsOfNamespace { optional Mode mode = 3 [default = PERSISTENT]; optional string topics_pattern = 4; optional string topics_hash = 5; + // Context properties from the client + repeated KeyValue properties = 6; } message CommandGetTopicsOfNamespaceResponse { @@ -807,6 +843,244 @@ message CommandWatchTopicListClose { required uint64 watcher_id = 2; } +/// --- Scalable topic commands --- + +enum SegmentState { + ACTIVE = 0; + SEALED = 1; +} + +message SegmentInfoProto { + required uint64 segment_id = 1; + required uint32 hash_start = 2; + required uint32 hash_end = 3; + required SegmentState state = 4; + repeated uint64 parent_ids = 5; + repeated uint64 child_ids = 6; + required uint64 created_at_epoch = 7; + optional uint64 sealed_at_epoch = 8; + // Wall-clock millis at create / seal time. Used for retention-based segment GC + // and timestamp-based seek; epoch above is a DAG generation number, not a clock. + required uint64 created_at_ms = 9; + optional uint64 sealed_at_ms = 10; + + // Legacy-segment marker. When set, this segment is not managed by the scalable-topic + // controller and has no segment://... topic of its own — it wraps the named, externally + // managed persistent://... topic instead. Used by the synthetic-layout response the + // broker returns for a regular (partitioned or non-partitioned) topic that has not yet + // been migrated to a scalable topic. When absent, the segment URI is computed normally + // from the scalable topic name and segment_id (segment:///). + optional string legacy_topic_name = 11; + + // PIP-486: entry-bucket split points — the ascending, inclusive start hashes of buckets 1..N-1 + // within the 16-bit entry-bucket hash ring (bucket 0 implicitly starts at 0x0000). The segment is + // divided into entry_bucket_splits_count + 1 buckets; an empty list means a single bucket spanning + // the whole ring. Splits may be uneven (e.g. to balance buckets by traffic). Producers bucket their + // batches accordingly and the broker routes by bucket. + repeated uint32 entry_bucket_splits = 12; +} + +message SegmentBrokerAddress { + required uint64 segment_id = 1; + required string broker_url = 2; + optional string broker_url_tls = 3; +} + +message ScalableTopicDAG { + required uint64 epoch = 1; + repeated SegmentInfoProto segments = 2; + repeated SegmentBrokerAddress segment_brokers = 3; + optional string controller_broker_url = 4; + optional string controller_broker_url_tls = 5; +} + +// Client -> Broker: Request scalable topic metadata and initiate watch session +message CommandScalableTopicLookup { + required uint64 session_id = 1; // Client-assigned session ID + // Any of "topic://t/n/x", "persistent://t/n/x", or a short form like + // "my-topic" (normalized by the broker to persistent://public/default/my-topic). + // The broker resolves the input to the canonical topic://... identity and + // returns it in CommandScalableTopicUpdate.resolved_topic_name. + required string topic = 2; + // What to do when the scalable topic is missing on lookup: if true, create it (subject to + // broker/namespace auto-creation policy); if false, fail not-found instead of creating it. + // Namespace (multi-topic) consumers set this false so a deleted topic is never resurrected by + // a reconnecting per-topic watch. Defaults to true to preserve the create-on-lookup behavior + // for explicit single-topic producers/consumers (and for older clients that don't set it). + optional bool create_if_missing = 3 [default = true]; +} + +// Broker -> Client: Used for BOTH initial response and subsequent pushed updates +message CommandScalableTopicUpdate { + required uint64 session_id = 1; + optional ScalableTopicDAG dag = 2; + + optional ServerError error = 3; + optional string message = 4; + + // Canonical scalable-topic identity (always "topic://t/n/x") that the client + // should use for downstream operations. Set on every success response, + // including for inputs that were given as persistent://... or short-form. + // Absent on error responses. + optional string resolved_topic_name = 5; +} + +// Client -> Broker: Close the DAG watch session +message CommandScalableTopicClose { + required uint64 session_id = 1; +} + +// Kind of scalable consumer registering with the controller leader. +// QueueConsumer never registers — it attaches directly to all active and sealed +// segment topics — so it does not appear here. +enum ScalableConsumerType { + STREAM = 0; + CHECKPOINT = 1; +} + +// A single segment assigned to a scalable consumer. +message ScalableAssignedSegment { + required uint64 segment_id = 1; + required uint32 hash_start = 2; + required uint32 hash_end = 3; + // Fully-qualified segment:// topic name the consumer should attach to. + required string segment_topic = 4; + // PIP-486: the entry-bucket hash ranges (16-bit, inclusive) this consumer owns within the + // segment. Empty means the consumer owns the whole segment (single bucket) and subscribes + // Shared; non-empty means the segment is shared by bucket, and the consumer subscribes + // Key_Shared STICKY declaring exactly these ranges. + repeated IntRange bucket_ranges = 5; +} + +// An assignment of active segments to a single consumer. Carries the layout epoch +// it was computed from so the client can reject stale updates. +message ScalableConsumerAssignment { + required uint64 layout_epoch = 1; + repeated ScalableAssignedSegment segments = 2; +} + +// Client -> Broker: register as an ordered (Stream) or external (Checkpoint) consumer +// on a scalable topic and request the initial segment assignment. The broker leader +// persists the consumer registration and returns the current assignment. +message CommandScalableTopicSubscribe { + required uint64 request_id = 1; + required string topic = 2; // e.g. "topic://tenant/ns/my-topic" + required string subscription = 3; + required string consumer_name = 4; + required uint64 consumer_id = 5; + required ScalableConsumerType consumer_type = 6; +} + +// Broker -> Client: response to CommandScalableTopicSubscribe. On success, carries +// the initial ScalableConsumerAssignment. On failure, error + message are populated +// and the assignment is absent. +message CommandScalableTopicSubscribeResponse { + required uint64 request_id = 1; + optional ServerError error = 2; + optional string message = 3; + optional ScalableConsumerAssignment assignment = 4; +} + +// Broker -> Client: push a new assignment to a subscribed consumer after a rebalance +// (triggered by a peer joining/leaving the subscription or by a segment split/merge). +message CommandScalableTopicAssignmentUpdate { + required uint64 consumer_id = 1; + required ScalableConsumerAssignment assignment = 2; +} + +// Multi-topic consumer watcher: subscribes to the union of scalable topics in a +// namespace that match a (possibly empty) set of property filters. The broker keeps +// pushing updates as topics enter or leave the matching set. See +// `multi-topic-consumer-design.md` for the full design. + +// Client -> Broker: open a watch session. +message CommandWatchScalableTopics { + required uint64 watch_id = 1; // Client-assigned watch ID + required string namespace = 2; // tenant/namespace + // Optional AND filters; empty list means "match all topics in the namespace". + repeated KeyValue property_filters = 3; + // Hash of the topics the client believes are currently in its set. Sent on + // reconnect; absent on first subscribe. If it matches the broker's freshly + // computed hash, the broker skips emitting the initial Snapshot — the client's + // local state is already correct and future Diffs will flow as usual. Same + // hash function as CommandGetTopicsOfNamespace (CRC32C over sorted topics). + optional string current_hash = 4; +} + +// Snapshot of the full matching set. Sent on initial subscribe and on every +// reconnect-resync. The client replaces its local set with this list. +message ScalableTopicsSnapshot { + repeated string topics = 1; +} + +// Incremental membership change. Apply removed before added when both are present. +message ScalableTopicsDiff { + repeated string added = 1; + repeated string removed = 2; +} + +// Broker -> Client: either Snapshot or Diff (mutually exclusive via oneof). When the +// initial subscribe fails, neither variant is set and `error`/`message` carry the +// failure reason. +message CommandWatchScalableTopicsUpdate { + required uint64 watch_id = 1; + + oneof event { + ScalableTopicsSnapshot snapshot = 2; + ScalableTopicsDiff diff = 3; + } + + optional ServerError error = 4; + optional string message = 5; +} + +// Client -> Broker: close the watch session. +message CommandWatchScalableTopicsClose { + required uint64 watch_id = 1; +} + +/// --- Transaction-coordinator assignment watch --- +// The scalable-topics transaction coordinator's discovery surface. The client opens one watch +// and the broker replies with the full (partition -> leader) map, then pushes a fresh full +// snapshot whenever any partition's leader changes. There is no point lookup and no diff: the +// map is bounded (parallelism, ~16) and changes rarely, so always sending the whole snapshot is +// simpler and removes a class of apply-ordering / drift bugs. Gated by the +// supports_tc_metadata_discovery feature flag. + +// Client -> Broker: open the assignment watch. +message CommandWatchTcAssignments { + required uint64 watch_id = 1; // client-assigned +} + +// One (partition -> leader) entry. A partition currently mid-election is simply absent from the +// snapshot; the client parks any transaction routed there until a later snapshot fills it in. +message TcAssignment { + required uint64 tc_id = 1; // TC partition = TxnID.mostSigBits + optional string broker_service_url = 2; + optional string broker_service_url_tls = 3; +} + +// Full map. Sent on initial watch and again, in full, on every change. The client replaces its +// local map wholesale — no merge, no ordering rules. parallelism lets the client size its handler +// array without a separate metadata read. +message TcAssignmentsSnapshot { + required uint32 parallelism = 1; + repeated TcAssignment assignments = 2; +} + +// Broker -> Client: the current full snapshot, or (on initial-watch failure) an error. +message CommandWatchTcAssignmentsUpdate { + required uint64 watch_id = 1; + optional TcAssignmentsSnapshot snapshot = 2; + optional ServerError error = 3; + optional string message = 4; +} + +// Client -> Broker: close the watch. +message CommandWatchTcAssignmentsClose { + required uint64 watch_id = 1; +} + message CommandGetSchema { required uint64 request_id = 1; required string topic = 2; @@ -824,9 +1098,10 @@ message CommandGetSchemaResponse { } message CommandGetOrCreateSchema { - required uint64 request_id = 1; - required string topic = 2; - required Schema schema = 3; + required uint64 request_id = 1; + required string topic = 2; + required Schema schema = 3; + optional string producerName = 4; } message CommandGetOrCreateSchemaResponse { @@ -847,6 +1122,8 @@ enum TxnAction { message CommandTcClientConnectRequest { required uint64 request_id = 1; required uint64 tc_id = 2 [default = 0]; + // Route to the scalable-topics (PIP-473) coordinator. See CommandNewTxn.scalable. + optional bool scalable = 3 [default = false]; } message CommandTcClientConnectResponse { @@ -857,8 +1134,15 @@ message CommandTcClientConnectResponse { message CommandNewTxn { required uint64 request_id = 1; - optional uint64 txn_ttl_seconds = 2 [default = 0]; + // Transaction timeout in milliseconds. Despite the field number's legacy name history, the value + // has always been carried in milliseconds (the client sends unit.toMillis(...) and both + // coordinators consume it as ms); the field is named accordingly to avoid confusion. + optional uint64 txn_ttl_millis = 2 [default = 0]; optional uint64 tc_id = 3 [default = 0]; + // When true, route to the metadata-driven (scalable-topics, PIP-473) transaction coordinator + // instead of the legacy one. Set by v5 clients; absent for v4 clients. Lets both coordinators + // serve their own clients on the same cluster. + optional bool scalable = 4 [default = false]; } message CommandNewTxnResponse { @@ -874,6 +1158,8 @@ message CommandAddPartitionToTxn { optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; repeated string partitions = 4; + // Route to the scalable-topics (PIP-473) coordinator. See CommandNewTxn.scalable. + optional bool scalable = 5 [default = false]; } message CommandAddPartitionToTxnResponse { @@ -893,6 +1179,8 @@ message CommandAddSubscriptionToTxn { optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; repeated Subscription subscription = 4; + // Route to the scalable-topics (PIP-473) coordinator. See CommandNewTxn.scalable. + optional bool scalable = 5 [default = false]; } message CommandAddSubscriptionToTxnResponse { @@ -908,6 +1196,8 @@ message CommandEndTxn { optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional TxnAction txn_action = 4; + // Route to the scalable-topics (PIP-473) coordinator. See CommandNewTxn.scalable. + optional bool scalable = 5 [default = false]; } message CommandEndTxnResponse { @@ -1043,6 +1333,22 @@ message BaseCommand { WATCH_TOPIC_LIST_CLOSE = 67; TOPIC_MIGRATED = 68; + + SCALABLE_TOPIC_LOOKUP = 70; + SCALABLE_TOPIC_UPDATE = 71; + SCALABLE_TOPIC_CLOSE = 72; + + SCALABLE_TOPIC_SUBSCRIBE = 73; + SCALABLE_TOPIC_SUBSCRIBE_RESPONSE = 74; + SCALABLE_TOPIC_ASSIGNMENT_UPDATE = 75; + + WATCH_SCALABLE_TOPICS = 76; + WATCH_SCALABLE_TOPICS_UPDATE = 77; + WATCH_SCALABLE_TOPICS_CLOSE = 78; + + WATCH_TC_ASSIGNMENTS = 79; + WATCH_TC_ASSIGNMENTS_UPDATE = 80; + WATCH_TC_ASSIGNMENTS_CLOSE = 81; } @@ -1126,4 +1432,20 @@ message BaseCommand { optional CommandWatchTopicListClose watchTopicListClose = 67; optional CommandTopicMigrated topicMigrated = 68; + + optional CommandScalableTopicLookup scalableTopicLookup = 70; + optional CommandScalableTopicUpdate scalableTopicUpdate = 71; + optional CommandScalableTopicClose scalableTopicClose = 72; + + optional CommandScalableTopicSubscribe scalableTopicSubscribe = 73; + optional CommandScalableTopicSubscribeResponse scalableTopicSubscribeResponse = 74; + optional CommandScalableTopicAssignmentUpdate scalableTopicAssignmentUpdate = 75; + + optional CommandWatchScalableTopics watchScalableTopics = 76; + optional CommandWatchScalableTopicsUpdate watchScalableTopicsUpdate = 77; + optional CommandWatchScalableTopicsClose watchScalableTopicsClose = 78; + + optional CommandWatchTcAssignments watchTcAssignments = 79; + optional CommandWatchTcAssignmentsUpdate watchTcAssignmentsUpdate = 80; + optional CommandWatchTcAssignmentsClose watchTcAssignmentsClose = 81; } diff --git a/tests/BuildTests.cmake b/tests/BuildTests.cmake index 0fe74300..45dd84f4 100644 --- a/tests/BuildTests.cmake +++ b/tests/BuildTests.cmake @@ -58,3 +58,12 @@ target_link_libraries(ChunkDedupTest pulsarStatic ${GTEST_TARGETS}) add_executable(ExtensibleLoadManagerTest extensibleLM/ExtensibleLoadManagerTest.cc HttpHelper.cc) target_link_libraries(ExtensibleLoadManagerTest PRIVATE pulsarStatic ${GTEST_TARGETS}) + +# --- Scalable topics (pulsar::st) unit tests -------------------------------- +# Pure client-side tests for the st API and its lib/st implementation; they do +# not require a running broker. C++20 per-target, like the st API itself. +file(GLOB ST_TEST_SOURCES st/*.cc) +add_executable(pulsar-st-tests ${ST_TEST_SOURCES}) +set_target_properties(pulsar-st-tests PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON) +target_include_directories(pulsar-st-tests PRIVATE ${AUTOGEN_DIR}/lib) +target_link_libraries(pulsar-st-tests PRIVATE pulsarStatic ${GTEST_TARGETS}) diff --git a/tests/st/DagWatchSessionTest.cc b/tests/st/DagWatchSessionTest.cc new file mode 100644 index 00000000..bed4a8a3 --- /dev/null +++ b/tests/st/DagWatchSessionTest.cc @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include + +#include "lib/ClientImpl.h" +#include "lib/st/DagWatchSession.h" + +// Broker-free tests for the DAG-watch session: the pure lookup-name mapping and +// the lifecycle paths that must be safe without any connection. The full +// session protocol (lookup, updates, reconnect) is exercised end-to-end against +// a real broker by the producer integration tests. + +using namespace pulsar::st; + +TEST(DagWatchSessionTest, testLookupCompatibleTopicMapsTopicScheme) { + // The classic TopicName does not know the topic:// scheme; the connection + // lookup uses the persistent:// twin while the wire lookup keeps the + // original spelling. + ASSERT_EQ(DagWatchSession::lookupCompatibleTopic("topic://public/default/orders"), + "persistent://public/default/orders"); + ASSERT_EQ(DagWatchSession::lookupCompatibleTopic("persistent://public/default/orders"), + "persistent://public/default/orders"); + ASSERT_EQ(DagWatchSession::lookupCompatibleTopic("orders"), "orders"); +} + +TEST(DagWatchSessionTest, testSessionLifecycleWithoutConnection) { + auto classic = + std::make_shared("pulsar://localhost:6650", pulsar::ClientConfiguration{}); + classic->initialize(); + + auto session = std::make_shared(classic, "topic://public/default/orders", true); + + // Fresh session: empty layout, input identity, unique session id. + ASSERT_TRUE(session->currentLayout().activeSegments().empty()); + ASSERT_EQ(session->currentLayout().epoch(), 0u); + ASSERT_EQ(session->topicName(), "topic://public/default/orders"); + + auto other = std::make_shared(classic, "topic://public/default/other", true); + ASSERT_NE(session->sessionId(), other->sessionId()); + + // Closing before start (and closing twice) must be safe. + session->close(); + session->close(); + other->close(); + + classic->shutdown(); +} diff --git a/tests/st/ExpectedTest.cc b/tests/st/ExpectedTest.cc new file mode 100644 index 00000000..08ab950d --- /dev/null +++ b/tests/st/ExpectedTest.cc @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include +#include + +using namespace pulsar::st; +using UniqueInt = std::unique_ptr; + +TEST(ExpectedTest, testValueState) { + Expected e(42); + ASSERT_TRUE(e.has_value()); + ASSERT_TRUE(static_cast(e)); + ASSERT_EQ(*e, 42); + ASSERT_EQ(e.value(), 42); +} + +TEST(ExpectedTest, testErrorState) { + Expected e(Error{ResultTimeout, "timed out"}); + ASSERT_FALSE(e.has_value()); + ASSERT_EQ(e.error().result, ResultTimeout); + ASSERT_EQ(e.error().message, "timed out"); +} + +TEST(ExpectedTest, testUnexpectedFactory) { + Expected e = unexpected(ResultInvalidMessage, "bad payload"); + ASSERT_FALSE(e); + ASSERT_EQ(e.error().result, ResultInvalidMessage); +} + +TEST(ExpectedTest, testValueThrowsClientExceptionOnError) { + Expected e(Error{ResultUnknownError, "boom"}); + ASSERT_THROW(e.value(), ClientException); +} + +TEST(ExpectedTest, testVoidSpecialization) { + Expected ok; + ASSERT_TRUE(ok); + ASSERT_NO_THROW(ok.value()); + + Expected failed(Error{ResultTimeout, "t"}); + ASSERT_FALSE(failed); + ASSERT_EQ(failed.error().result, ResultTimeout); + ASSERT_THROW(failed.value(), ClientException); +} + +TEST(ExpectedTest, testValueOr) { + Expected ok(5); + ASSERT_EQ(ok.value_or(0), 5); + + Expected failed(Error{ResultUnknownError, ""}); + ASSERT_EQ(failed.value_or(7), 7); +} + +TEST(ExpectedTest, testMonadicOpsOnLvalue) { + Expected e(5); + auto doubled = e.transform([](int x) { return x * 2; }); + ASSERT_TRUE(doubled); + ASSERT_EQ(*doubled, 10); + + auto chained = e.and_then([](int x) { return Expected(x + 1L); }); + ASSERT_TRUE(chained); + ASSERT_EQ(*chained, 6L); + + auto recovered = e.or_else([](const Error&) { return Expected(-1); }); + ASSERT_TRUE(recovered); + ASSERT_EQ(*recovered, 5); + + Expected failed(Error{ResultTimeout, "t"}); + auto mapped = failed.transform([](int x) { return x * 2; }); + ASSERT_FALSE(mapped); + ASSERT_EQ(mapped.error().result, ResultTimeout); + auto rescued = failed.or_else([](const Error&) { return Expected(42); }); + ASSERT_TRUE(rescued); + ASSERT_EQ(*rescued, 42); +} + +// Rvalue overloads: a move-only T flows through the chain without a copy. + +TEST(ExpectedTest, testRvalueValueOrMovesOut) { + Expected ok(std::make_unique(7)); + UniqueInt got = std::move(ok).value_or(nullptr); + ASSERT_TRUE(got); + ASSERT_EQ(*got, 7); + + Expected failed(Error{ResultUnknownError, ""}); + UniqueInt fallback = std::move(failed).value_or(std::make_unique(99)); + ASSERT_TRUE(fallback); + ASSERT_EQ(*fallback, 99); +} + +TEST(ExpectedTest, testRvalueAndThenConsumesValue) { + Expected ok(std::make_unique(5)); + auto r = std::move(ok).and_then([](UniqueInt p) { return Expected(*p + 1); }); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 6); + + Expected failed(Error{ResultTimeout, "t"}); + auto propagated = std::move(failed).and_then([](UniqueInt) { return Expected(0); }); + ASSERT_FALSE(propagated); + ASSERT_EQ(propagated.error().result, ResultTimeout); +} + +TEST(ExpectedTest, testRvalueTransformMapsMoveOnly) { + Expected ok(std::make_unique(3)); + auto r = std::move(ok).transform([](UniqueInt p) { return std::make_unique(*p * 10L); }); + ASSERT_TRUE(r); + ASSERT_EQ(**r, 30L); +} + +TEST(ExpectedTest, testRvalueOrElse) { + Expected failed(Error{ResultUnknownError, ""}); + auto recovered = std::move(failed).or_else([](const Error&) { return Expected(42); }); + ASSERT_TRUE(recovered); + ASSERT_EQ(*recovered, 42); + + Expected ok(10); + auto passthrough = std::move(ok).or_else([](const Error&) { return Expected(-1); }); + ASSERT_TRUE(passthrough); + ASSERT_EQ(*passthrough, 10); +} + +TEST(ExpectedTest, testRvalueValueMovesOut) { + Expected e(std::string("hello")); + std::string s = std::move(e).value(); + ASSERT_EQ(s, "hello"); +} diff --git a/tests/st/FutureTest.cc b/tests/st/FutureTest.cc new file mode 100644 index 00000000..43acbc06 --- /dev/null +++ b/tests/st/FutureTest.cc @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace pulsar::st; +using pulsar::st::detail::Promise; + +TEST(FutureTest, testGetReturnsCompletedValue) { + Promise promise; + Future future = promise.getFuture(); + ASSERT_FALSE(future.isReady()); + promise.setValue(42); + ASSERT_TRUE(future.isReady()); + auto r = future.get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 42); +} + +TEST(FutureTest, testGetBlocksUntilCompletedFromAnotherThread) { + Promise promise; + Future future = promise.getFuture(); + std::thread completer([promise]() { promise.setValue(7); }); + auto r = future.get(); + completer.join(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 7); +} + +TEST(FutureTest, testTimedGetTimesOutWhilePending) { + Promise promise; + Future future = promise.getFuture(); + auto r = future.get(std::chrono::milliseconds(10)); + ASSERT_FALSE(r.has_value()); // timed out, still pending + promise.setValue(1); + auto r2 = future.get(std::chrono::milliseconds(10)); + ASSERT_TRUE(r2.has_value()); + ASSERT_TRUE(*r2); + ASSERT_EQ(**r2, 1); +} + +TEST(FutureTest, testListenerRunsOnCompletion) { + Promise promise; + Future future = promise.getFuture(); + int seen = -1; + future.addListener([&seen](const Expected& r) { seen = r ? *r : -2; }); + ASSERT_EQ(seen, -1); + promise.setValue(5); + ASSERT_EQ(seen, 5); +} + +TEST(FutureTest, testListenerAfterCompletionRunsSynchronously) { + Promise promise; + promise.setValue(9); + int seen = -1; + promise.getFuture().addListener([&seen](const Expected& r) { seen = r ? *r : -2; }); + ASSERT_EQ(seen, 9); +} + +TEST(FutureTest, testCompleteIsFirstWriterWins) { + Promise promise; + ASSERT_TRUE(promise.setValue(1)); + ASSERT_FALSE(promise.setValue(2)); + ASSERT_FALSE(promise.setError(Error{ResultUnknownError, ""})); + auto r = promise.getFuture().get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 1); +} + +// --- thenApply --------------------------------------------------------------- + +TEST(FutureTest, testThenApplyMapsValue) { + Promise promise; + Future mapped = + promise.getFuture().thenApply([](const int& x) { return std::to_string(x + 1); }); + promise.setValue(41); + auto r = mapped.get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, "42"); +} + +TEST(FutureTest, testThenApplyPropagatesError) { + Promise promise; + bool called = false; + Future mapped = promise.getFuture().thenApply([&called](const int& x) { + called = true; + return x; + }); + promise.setError(Error{ResultTimeout, "t"}); + auto r = mapped.get(); + ASSERT_FALSE(r); + ASSERT_EQ(r.error().result, ResultTimeout); + ASSERT_FALSE(called); +} + +TEST(FutureTest, testThenApplyVoidMapper) { + Promise promise; + int seen = -1; + Future done = promise.getFuture().thenApply([&seen](const int& x) { seen = x; }); + promise.setValue(7); + auto r = done.get(); + ASSERT_TRUE(r); + ASSERT_EQ(seen, 7); +} + +TEST(FutureTest, testThenApplyMoveOnlyMapper) { + Promise promise; + auto bonus = std::make_unique(100); + Future mapped = + promise.getFuture().thenApply([b = std::move(bonus)](const int& x) { return x + *b; }); + promise.setValue(5); + auto r = mapped.get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 105); +} + +// --- broken promise ---------------------------------------------------------- + +TEST(FutureTest, testAbandonedPromiseFailsTheFuture) { + Future future = [] { + Promise abandoned; + return abandoned.getFuture(); + }(); + auto r = future.get(); // must not hang + ASSERT_FALSE(r); + ASSERT_EQ(r.error().result, ResultUnknownError); +} + +TEST(FutureTest, testAbandonedPromiseCopiesFailOnlyAfterLastCopyDies) { + Promise outer; + Future future = outer.getFuture(); + { + Promise copy = outer; // a copy dying must not break the promise + } + ASSERT_FALSE(future.isReady()); + outer.setValue(9); + auto r = future.get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 9); +} + +TEST(FutureTest, testAbandonedVoidPromise) { + Future future = [] { + Promise abandoned; + return abandoned.getFuture(); + }(); + auto r = future.get(); + ASSERT_FALSE(r); +} + +TEST(FutureTest, testCompletedPromiseGuardIsNoOp) { + Future future = [] { + Promise promise; + Future f = promise.getFuture(); + promise.setValue(123); + return f; // promise dies after completing: value must be preserved + }(); + auto r = future.get(); + ASSERT_TRUE(r); + ASSERT_EQ(*r, 123); +} + +// --- coroutine awaiter -------------------------------------------------------- + +namespace { + +struct TestTask { + struct promise_type { + TestTask get_return_object() { return {}; } + std::suspend_never initial_suspend() { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() {} + void unhandled_exception() { std::terminate(); } + }; +}; + +TestTask awaitInto(Future future, Expected& out, std::atomic& done) { + Expected r = co_await future; + out = r; + done = true; +} + +} // namespace + +TEST(FutureTest, testCoAwaitReadyFuture) { + Promise promise; + promise.setValue(11); + Expected out(0); + std::atomic done{false}; + awaitInto(promise.getFuture(), out, done); + ASSERT_TRUE(done.load()); + ASSERT_TRUE(out); + ASSERT_EQ(*out, 11); +} + +TEST(FutureTest, testCoAwaitSuspendsUntilCompleted) { + Promise promise; + Expected out(0); + std::atomic done{false}; + awaitInto(promise.getFuture(), out, done); + ASSERT_FALSE(done.load()); // suspended, not resumed inside await_suspend + promise.setValue(21); // completes -> resumes the coroutine + ASSERT_TRUE(done.load()); + ASSERT_TRUE(out); + ASSERT_EQ(*out, 21); +} + +TEST(FutureTest, testCoAwaitPropagatesError) { + Promise promise; + Expected out(0); + std::atomic done{false}; + awaitInto(promise.getFuture(), out, done); + promise.setError(Error{ResultTimeout, "t"}); + ASSERT_TRUE(done.load()); + ASSERT_FALSE(out); + ASSERT_EQ(out.error().result, ResultTimeout); +} diff --git a/tests/st/ScalableCommandsTest.cc b/tests/st/ScalableCommandsTest.cc new file mode 100644 index 00000000..ede92aa6 --- /dev/null +++ b/tests/st/ScalableCommandsTest.cc @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include + +#include "PulsarApi.pb.h" +#include "lib/Commands.h" +#include "lib/SharedBuffer.h" + +// Wire-level tests for the scalable-topics commands: the framed buffers the +// Commands factories produce must deserialize back to the intended protobuf. + +using namespace pulsar; +namespace proto = pulsar::proto; + +namespace { + +// Unwrap [totalSize][cmdSize][BaseCommand] framing produced by the factories. +proto::BaseCommand parseCommand(SharedBuffer buffer) { + const uint32_t totalSize = buffer.readUnsignedInt(); + const uint32_t cmdSize = buffer.readUnsignedInt(); + EXPECT_EQ(totalSize, cmdSize + sizeof(uint32_t)); + EXPECT_EQ(buffer.readableBytes(), cmdSize); + proto::BaseCommand cmd; + EXPECT_TRUE(cmd.ParseFromArray(buffer.data(), static_cast(cmdSize))); + return cmd; +} + +} // namespace + +TEST(ScalableCommandsTest, testScalableTopicLookupRoundtrip) { + auto cmd = parseCommand(Commands::newScalableTopicLookup(42, "persistent://public/default/orders", true)); + ASSERT_EQ(cmd.type(), proto::BaseCommand::SCALABLE_TOPIC_LOOKUP); + ASSERT_TRUE(cmd.has_scalabletopiclookup()); + ASSERT_EQ(cmd.scalabletopiclookup().session_id(), 42u); + ASSERT_EQ(cmd.scalabletopiclookup().topic(), "persistent://public/default/orders"); + ASSERT_TRUE(cmd.scalabletopiclookup().create_if_missing()); +} + +TEST(ScalableCommandsTest, testScalableTopicLookupNoCreate) { + auto cmd = parseCommand(Commands::newScalableTopicLookup(7, "orders", false)); + ASSERT_EQ(cmd.scalabletopiclookup().session_id(), 7u); + ASSERT_FALSE(cmd.scalabletopiclookup().create_if_missing()); +} + +TEST(ScalableCommandsTest, testScalableTopicCloseRoundtrip) { + auto cmd = parseCommand(Commands::newScalableTopicClose(42)); + ASSERT_EQ(cmd.type(), proto::BaseCommand::SCALABLE_TOPIC_CLOSE); + ASSERT_TRUE(cmd.has_scalabletopicclose()); + ASSERT_EQ(cmd.scalabletopicclose().session_id(), 42u); +} + +TEST(ScalableCommandsTest, testMessageTypeNamesScalableCommands) { + // messageType() runs on every incoming command (debug logging) and throws on + // unknown enum values: every scalable type the broker can send or the client + // can log must have a name. + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_LOOKUP), "SCALABLE_TOPIC_LOOKUP"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_UPDATE), "SCALABLE_TOPIC_UPDATE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_CLOSE), "SCALABLE_TOPIC_CLOSE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_SUBSCRIBE), + "SCALABLE_TOPIC_SUBSCRIBE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_SUBSCRIBE_RESPONSE), + "SCALABLE_TOPIC_SUBSCRIBE_RESPONSE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::SCALABLE_TOPIC_ASSIGNMENT_UPDATE), + "SCALABLE_TOPIC_ASSIGNMENT_UPDATE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_SCALABLE_TOPICS), "WATCH_SCALABLE_TOPICS"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_SCALABLE_TOPICS_UPDATE), + "WATCH_SCALABLE_TOPICS_UPDATE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_SCALABLE_TOPICS_CLOSE), + "WATCH_SCALABLE_TOPICS_CLOSE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_TC_ASSIGNMENTS), "WATCH_TC_ASSIGNMENTS"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_TC_ASSIGNMENTS_UPDATE), + "WATCH_TC_ASSIGNMENTS_UPDATE"); + ASSERT_EQ(Commands::messageType(proto::BaseCommand::WATCH_TC_ASSIGNMENTS_CLOSE), + "WATCH_TC_ASSIGNMENTS_CLOSE"); +} diff --git a/tests/st/SchemaCodecTest.cc b/tests/st/SchemaCodecTest.cc new file mode 100644 index 00000000..a600c44b --- /dev/null +++ b/tests/st/SchemaCodecTest.cc @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace pulsar::st; + +namespace { + +// Encode + decode through the public Schema seam and require the value to +// survive the roundtrip. +template +void expectRoundtrip(const T& value) { + Schema schema; + std::vector encoded; + auto ok = schema.encode(value, encoded); + ASSERT_TRUE(ok); + auto decoded = schema.decode(std::span(encoded)); + ASSERT_TRUE(decoded); + ASSERT_EQ(*decoded, value); +} + +} // namespace + +TEST(SchemaCodecTest, testIntegerRoundtrips) { + expectRoundtrip(42); + expectRoundtrip(-1); + expectRoundtrip(0x1234); + expectRoundtrip(std::numeric_limits::min()); + expectRoundtrip(0x01020304); + expectRoundtrip(-1); + expectRoundtrip(std::numeric_limits::max()); + expectRoundtrip(0x0102030405060708LL); + expectRoundtrip(std::numeric_limits::min()); +} + +TEST(SchemaCodecTest, testFloatingPointRoundtrips) { + expectRoundtrip(3.5f); + expectRoundtrip(-0.0f); + expectRoundtrip(2.718281828459045); + expectRoundtrip(std::numeric_limits::max()); +} + +TEST(SchemaCodecTest, testStringRoundtrip) { + expectRoundtrip("hello scalable topics"); + expectRoundtrip(""); +} + +TEST(SchemaCodecTest, testBytesRoundtrip) { + Bytes payload = {std::byte{0x00}, std::byte{0xFF}, std::byte{0x7F}}; + expectRoundtrip(payload); +} + +TEST(SchemaCodecTest, testInt32IsBigEndianOnTheWire) { + Schema schema; + std::vector encoded; + ASSERT_TRUE(schema.encode(0x01020304, encoded)); + ASSERT_EQ(encoded.size(), 4u); + ASSERT_EQ(encoded[0], std::byte{0x01}); + ASSERT_EQ(encoded[1], std::byte{0x02}); + ASSERT_EQ(encoded[2], std::byte{0x03}); + ASSERT_EQ(encoded[3], std::byte{0x04}); +} + +TEST(SchemaCodecTest, testShortPayloadIsAnErrorNotUb) { + std::vector tooShort(2); + ASSERT_FALSE(Schema{}.decode(std::span(tooShort))); + ASSERT_FALSE(Schema{}.decode(std::span(tooShort))); + ASSERT_FALSE(Schema{}.decode(std::span(tooShort))); + ASSERT_FALSE(Schema{}.decode(std::span(tooShort))); + std::vector empty; + ASSERT_FALSE(Schema{}.decode(std::span(empty))); +} + +TEST(SchemaCodecTest, testBytesViewDecodeIsZeroCopy) { + std::vector buffer = {std::byte{1}, std::byte{2}, std::byte{3}}; + Schema schema; + auto view = schema.decode(std::span(buffer)); + ASSERT_TRUE(view); + ASSERT_EQ(view->data(), buffer.data()); // a view into the same buffer, no copy + ASSERT_EQ(view->size(), buffer.size()); +} + +TEST(SchemaCodecTest, testPrimitiveSchemaInfoNames) { + ASSERT_EQ(Schema{}.info().getName(), "STRING"); + ASSERT_EQ(Schema{}.info().getName(), "INT32"); + ASSERT_EQ(Schema{}.info().getName(), "INT64"); + ASSERT_EQ(Schema{}.info().getName(), "FLOAT"); + ASSERT_EQ(Schema{}.info().getName(), "DOUBLE"); + ASSERT_EQ(Schema{}.info().getName(), "BYTES"); +} + +TEST(SchemaCodecTest, testUnsetSchemaReportsErrors) { + struct NotAPrimitive { + int x = 0; + }; + Schema schema; // no SerDe supplied -> unset codec + std::vector out; + ASSERT_FALSE(schema.encode(NotAPrimitive{}, out)); + ASSERT_FALSE(schema.decode(std::span(out))); +} + +TEST(SchemaCodecTest, testCustomSerDeThroughTypeErasure) { + // A toy SerDe proving the pluggable seam: encodes an int as a decimal string. + struct DecimalSerDe { + SchemaInfo info() const { return SchemaInfo(SchemaType::STRING, "DECIMAL", ""); } + Expected encode(const int& v, std::vector& out) const { + const std::string s = std::to_string(v); + const auto* p = reinterpret_cast(s.data()); + out.assign(p, p + s.size()); + return {}; + } + Expected decode(std::span d) const { + return std::stoi(std::string(reinterpret_cast(d.data()), d.size())); + } + }; + Schema schema{DecimalSerDe{}}; + std::vector encoded; + ASSERT_TRUE(schema.encode(12345, encoded)); + ASSERT_EQ(encoded.size(), 5u); // "12345" + auto decoded = schema.decode(std::span(encoded)); + ASSERT_TRUE(decoded); + ASSERT_EQ(*decoded, 12345); +} diff --git a/tests/st/SegmentLayoutTest.cc b/tests/st/SegmentLayoutTest.cc new file mode 100644 index 00000000..b4192ac9 --- /dev/null +++ b/tests/st/SegmentLayoutTest.cc @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include +#include + +#include "PulsarApi.pb.h" +#include "lib/Murmur3_32Hash.h" +#include "lib/st/SegmentLayout.h" + +using namespace pulsar::st; +namespace proto = pulsar::proto; + +namespace { + +const std::string kTopic = "topic://public/default/orders"; + +void addSegment(proto::ScalableTopicDAG& dag, std::uint64_t id, std::uint32_t start, std::uint32_t end, + proto::SegmentState state, const std::string& legacyTopic = {}) { + auto* seg = dag.add_segments(); + seg->set_segment_id(id); + seg->set_hash_start(start); + seg->set_hash_end(end); + seg->set_state(state); + seg->set_created_at_epoch(1); + seg->set_created_at_ms(1000); + if (!legacyTopic.empty()) { + seg->set_legacy_topic_name(legacyTopic); + } +} + +void addBroker(proto::ScalableTopicDAG& dag, std::uint64_t segmentId, const std::string& url, + const std::string& urlTls = {}) { + auto* addr = dag.add_segment_brokers(); + addr->set_segment_id(segmentId); + addr->set_broker_url(url); + if (!urlTls.empty()) { + addr->set_broker_url_tls(urlTls); + } +} + +// A deterministic key whose 16-bit segment hash falls within [start, end]. +std::string findKeyInRange(std::uint32_t start, std::uint32_t end) { + for (int i = 0; i < 1000000; i++) { + std::string key = "key-" + std::to_string(i); + std::uint32_t hash = ScalableTopicHashing::segmentHash(ScalableTopicHashing::murmur(key)); + if (hash >= start && hash <= end) { + return key; + } + } + ADD_FAILURE() << "no key found in range [" << start << ", " << end << "]"; + return {}; +} + +} // namespace + +TEST(ScalableTopicHashingTest, testHashSplitsIntoIndependentHalves) { + const std::string key = "some-routing-key"; + std::uint32_t murmur = ScalableTopicHashing::murmur(key); + std::uint32_t segment = ScalableTopicHashing::segmentHash(murmur); + std::uint32_t bucket = ScalableTopicHashing::entryBucketHash(murmur); + ASSERT_LE(segment, 0xFFFFu); + ASSERT_LE(bucket, 0xFFFFu); + ASSERT_EQ((segment << 16) | bucket, murmur); +} + +TEST(ScalableTopicHashingTest, testRawHashIsUnmasked) { + // The classic makeHash clears bit 31; the raw hash must not, or the high + // half of the split would be confined to [0, 0x7FFF]. Find a key where the + // two differ, proving the raw path is in use. + bool foundHighBit = false; + for (int i = 0; i < 100000 && !foundHighBit; i++) { + std::string key = "key-" + std::to_string(i); + foundHighBit = (ScalableTopicHashing::murmur(key) & 0x80000000u) != 0; + } + ASSERT_TRUE(foundHighBit); +} + +TEST(SegmentLayoutTest, testFromProtoBuildsSortedLayout) { + proto::ScalableTopicDAG dag; + dag.set_epoch(7); + // Add out of order to verify sorting by range start. + addSegment(dag, 2, 0x8000, 0xFFFF, proto::ACTIVE); + addSegment(dag, 1, 0x0000, 0x7FFF, proto::ACTIVE); + addSegment(dag, 0, 0x0000, 0xFFFF, proto::SEALED); + addBroker(dag, 1, "pulsar://broker1:6650", "pulsar+ssl://broker1:6651"); + addBroker(dag, 2, "pulsar://broker2:6650"); + dag.set_controller_broker_url("pulsar://controller:6650"); + + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + ASSERT_EQ(layout->epoch(), 7u); + + ASSERT_EQ(layout->activeSegments().size(), 2u); + ASSERT_EQ(layout->activeSegments()[0].segmentId, 1u); // sorted by range start + ASSERT_EQ(layout->activeSegments()[1].segmentId, 2u); + ASSERT_EQ(layout->activeSegments()[0].range, (HashRange{0x0000, 0x7FFF})); + ASSERT_EQ(layout->activeSegments()[0].segmentTopicName, "segment://public/default/orders/0000-7fff-1"); + ASSERT_EQ(layout->activeSegments()[1].segmentTopicName, "segment://public/default/orders/8000-ffff-2"); + ASSERT_FALSE(layout->activeSegments()[0].isLegacy()); + ASSERT_EQ(layout->activeSegments()[0].attachTopicName(), layout->activeSegments()[0].segmentTopicName); + + ASSERT_EQ(layout->sealedSegments().size(), 1u); + ASSERT_EQ(layout->sealedSegments()[0].segmentId, 0u); + + ASSERT_NE(layout->brokerUrl(1), nullptr); + ASSERT_EQ(*layout->brokerUrl(1), "pulsar://broker1:6650"); + ASSERT_NE(layout->brokerUrlTls(1), nullptr); + ASSERT_EQ(*layout->brokerUrlTls(1), "pulsar+ssl://broker1:6651"); + ASSERT_EQ(layout->brokerUrlTls(2), nullptr); + ASSERT_EQ(layout->brokerUrl(42), nullptr); + + ASSERT_TRUE(layout->controllerBrokerUrl().has_value()); + ASSERT_EQ(*layout->controllerBrokerUrl(), "pulsar://controller:6650"); + ASSERT_FALSE(layout->controllerBrokerUrlTls().has_value()); +} + +TEST(SegmentLayoutTest, testFromProtoParsesLegacyAndBucketSplits) { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 0, 0x0000, 0xFFFF, proto::ACTIVE, "persistent://public/default/orders-partition-0"); + auto* seg = dag.mutable_segments(0); + seg->add_entry_bucket_splits(0x4000); + seg->add_entry_bucket_splits(0x8000); + + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + const Segment& segment = layout->activeSegments()[0]; + ASSERT_TRUE(segment.isLegacy()); + ASSERT_EQ(segment.attachTopicName(), "persistent://public/default/orders-partition-0"); + ASSERT_EQ(segment.entryBucketSplits, (std::vector{0x4000, 0x8000})); +} + +TEST(SegmentLayoutTest, testFromProtoRejectsBadTopicScheme) { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + auto layout = SegmentLayout::fromProto(dag, "persistent://public/default/orders"); + ASSERT_FALSE(layout); + ASSERT_EQ(layout.error().result, pulsar::ResultInvalidTopicName); +} + +TEST(SegmentLayoutTest, testFromProtoRejectsMalformedHashRange) { + { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 0, 0x8000, 0x7FFF, proto::ACTIVE); // end < start + ASSERT_FALSE(SegmentLayout::fromProto(dag, kTopic)); + } + { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 0, 0x0000, 0x10000, proto::ACTIVE); // end > 16-bit space + ASSERT_FALSE(SegmentLayout::fromProto(dag, kTopic)); + } +} + +namespace { + +Expected fourSegmentLayout() { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 0, 0x0000, 0x3FFF, proto::ACTIVE); + addSegment(dag, 1, 0x4000, 0x7FFF, proto::ACTIVE); + addSegment(dag, 2, 0x8000, 0xBFFF, proto::ACTIVE); + addSegment(dag, 3, 0xC000, 0xFFFF, proto::ACTIVE); + return SegmentLayout::fromProto(dag, kTopic); +} + +} // namespace + +TEST(SegmentRouterTest, testKeyedRoutingHitsOwningRange) { + auto layout = fourSegmentLayout(); + ASSERT_TRUE(layout); + SegmentRouter router; + + for (int i = 0; i < 200; i++) { + std::string key = "order-" + std::to_string(i); + auto id = router.route(key, *layout); + ASSERT_TRUE(id); + std::uint32_t hash = ScalableTopicHashing::segmentHash(ScalableTopicHashing::murmur(key)); + const Segment& segment = layout->activeSegments()[*id]; // ids == sorted positions here + ASSERT_TRUE(segment.range.contains(hash)) << "key " << key << " hash " << hash; + } +} + +TEST(SegmentRouterTest, testKeyedRoutingIsDeterministic) { + auto layout = fourSegmentLayout(); + ASSERT_TRUE(layout); + SegmentRouter router; + for (int i = 0; i < 20; i++) { + std::string key = "stable-key-" + std::to_string(i); + auto first = router.route(key, *layout); + auto second = router.route(key, *layout); + ASSERT_TRUE(first); + ASSERT_TRUE(second); + ASSERT_EQ(*first, *second); + } +} + +TEST(SegmentRouterTest, testSingleFullRangeSegmentGetsEverything) { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 9, 0x0000, 0xFFFF, proto::ACTIVE); + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + SegmentRouter router; + for (int i = 0; i < 50; i++) { + auto id = router.route("k" + std::to_string(i), *layout); + ASSERT_TRUE(id); + ASSERT_EQ(*id, 9u); + } +} + +TEST(SegmentRouterTest, testNoActiveSegmentsIsAnError) { + SegmentLayout empty; + SegmentRouter router; + auto id = router.route("key", empty); + ASSERT_FALSE(id); + ASSERT_EQ(id.error().result, pulsar::ResultServiceUnitNotReady); + auto rr = router.routeRoundRobin(empty); + ASSERT_FALSE(rr); +} + +TEST(SegmentRouterTest, testUncoveredHashIsAnError) { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + addSegment(dag, 0, 0x0000, 0x7FFF, proto::ACTIVE); // gap: [0x8000, 0xFFFF] uncovered + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + SegmentRouter router; + std::string uncoveredKey = findKeyInRange(0x8000, 0xFFFF); + auto id = router.route(uncoveredKey, *layout); + ASSERT_FALSE(id); + ASSERT_EQ(id.error().result, pulsar::ResultUnknownError); +} + +TEST(SegmentRouterTest, testAllLegacyRoutesModNLikeClassicPartitions) { + proto::ScalableTopicDAG dag; + dag.set_epoch(1); + const int n = 4; + for (int i = 0; i < n; i++) { + addSegment(dag, i, i * 0x4000, (i + 1) * 0x4000 - 1, proto::ACTIVE, + "persistent://public/default/orders-partition-" + std::to_string(i)); + } + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + SegmentRouter router; + + for (int i = 0; i < 100; i++) { + std::string key = "legacy-key-" + std::to_string(i); + auto id = router.route(key, *layout); + ASSERT_TRUE(id); + // Must match the classic partitioned-topic routing exactly. + int expected = pulsar::Murmur3_32Hash().makeHash(key) % n; + ASSERT_EQ(*id, static_cast(expected)) << "key " << key; + } +} + +TEST(SegmentRouterTest, testMixedLegacyAndRegularUsesRangeRouting) { + proto::ScalableTopicDAG dag; + dag.set_epoch(2); + addSegment(dag, 0, 0x0000, 0x7FFF, proto::ACTIVE, "persistent://public/default/orders-partition-0"); + addSegment(dag, 1, 0x8000, 0xFFFF, proto::ACTIVE); // one regular segment -> range routing + auto layout = SegmentLayout::fromProto(dag, kTopic); + ASSERT_TRUE(layout); + SegmentRouter router; + + std::string keyInUpperHalf = findKeyInRange(0x8000, 0xFFFF); + auto id = router.route(keyInUpperHalf, *layout); + ASSERT_TRUE(id); + ASSERT_EQ(*id, 1u); +} + +TEST(SegmentRouterTest, testRoundRobinCyclesAllActiveSegments) { + auto layout = fourSegmentLayout(); + ASSERT_TRUE(layout); + SegmentRouter router; + std::set seen; + for (int i = 0; i < 8; i++) { + auto id = router.routeRoundRobin(*layout); + ASSERT_TRUE(id); + seen.insert(*id); + } + ASSERT_EQ(seen.size(), 4u); // every active segment hit +} diff --git a/tests/st/StClientTest.cc b/tests/st/StClientTest.cc new file mode 100644 index 00000000..839b7ee1 --- /dev/null +++ b/tests/st/StClientTest.cc @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include + +// Broker-free tests for PulsarClientBuilder::build() and the client shell: they +// validate configuration mapping and lifecycle, never connecting anywhere. + +using namespace pulsar::st; + +TEST(StClientTest, testEmptyServiceUrlIsRejected) { + auto client = PulsarClient::builder().build(); + ASSERT_FALSE(client); + ASSERT_EQ(client.error().result, ResultInvalidUrl); +} + +TEST(StClientTest, testTlsEnabledRequiresTlsScheme) { + auto client = + PulsarClient::builder().serviceUrl("pulsar://localhost:6650").tlsPolicy({.enabled = true}).build(); + ASSERT_FALSE(client); + ASSERT_EQ(client.error().result, ResultInvalidConfiguration); +} + +TEST(StClientTest, testTlsEnabledAcceptsTlsScheme) { + auto client = PulsarClient::builder() + .serviceUrl("pulsar+ssl://localhost:6651") + .tlsPolicy({.enabled = true, .allowInsecureConnection = true}) + .build(); + ASSERT_TRUE(client); + ASSERT_TRUE(client->close()); +} + +TEST(StClientTest, testBuildAndCloseWithoutBroker) { + auto client = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build(); + ASSERT_TRUE(client); + ASSERT_TRUE(static_cast(*client)); + + auto closed = client->close(); + ASSERT_TRUE(closed); +} + +TEST(StClientTest, testBuildWithPoliciesWithoutBroker) { + using namespace std::chrono_literals; + auto client = PulsarClient::builder() + .serviceUrl("pulsar://localhost:6650") + .connectionPolicy({.connectionsPerBroker = 2, + .connectionTimeout = 5000ms, + .operationTimeout = 20000ms, + .keepAliveInterval = 30s}) + .threadPolicy({.ioThreads = 2, .messageListenerThreads = 2}) + .memoryPolicy({.limit = MemorySize::ofMiB(64)}) + .backoffPolicy({.initialBackoff = 100ms, .maxBackoff = 10s}) + .build(); + ASSERT_TRUE(client); + ASSERT_TRUE(client->close()); +} + +TEST(StClientTest, testProducerCreationReportsNotSupportedYet) { + auto client = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build(); + ASSERT_TRUE(client); + + auto producer = client->newProducer().topic("st-topic").create(); + ASSERT_FALSE(producer); + ASSERT_EQ(producer.error().result, ResultOperationNotSupported); + + ASSERT_TRUE(client->close()); +} + +TEST(StClientTest, testTransactionReportsNotSupportedYet) { + auto client = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build(); + ASSERT_TRUE(client); + + auto txn = client->newTransaction(); + ASSERT_FALSE(txn); + ASSERT_EQ(txn.error().result, ResultOperationNotSupported); + + ASSERT_TRUE(client->close()); +} + +TEST(StClientTest, testShutdownWithoutBroker) { + auto client = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build(); + ASSERT_TRUE(client); + client->shutdown(); // immediate teardown must not crash or hang +} diff --git a/tests/st/StHandleTest.cc b/tests/st/StHandleTest.cc new file mode 100644 index 00000000..339bdb0b --- /dev/null +++ b/tests/st/StHandleTest.cc @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include + +// These tests exercise symbols DEFINED in lib/st (not header-inline), proving +// the ST_OBJECT_LIB objects are merged into the libpulsar artifact this test +// binary links against. + +using namespace pulsar::st; + +TEST(StHandleTest, testDefaultMessageIdIsEmpty) { + MessageId id; + ASSERT_FALSE(static_cast(id)); +} + +TEST(StHandleTest, testDefaultCheckpointIsEmpty) { + Checkpoint checkpoint; + ASSERT_FALSE(static_cast(checkpoint)); +} + +TEST(StHandleTest, testEmptyHandlesAreCopyable) { + MessageId id; + MessageId copy = id; + ASSERT_FALSE(static_cast(copy)); + + Checkpoint checkpoint; + Checkpoint checkpointCopy = checkpoint; + ASSERT_FALSE(static_cast(checkpointCopy)); +} diff --git a/vcpkg-configuration.json b/vcpkg-configuration.json new file mode 100644 index 00000000..edf5c695 --- /dev/null +++ b/vcpkg-configuration.json @@ -0,0 +1,5 @@ +{ + "overlay-ports": [ + "./vcpkg-overlay" + ] +} diff --git a/vcpkg-overlay/reflectcpp/portfile.cmake b/vcpkg-overlay/reflectcpp/portfile.cmake new file mode 100644 index 00000000..e81fb51e --- /dev/null +++ b/vcpkg-overlay/reflectcpp/portfile.cmake @@ -0,0 +1,64 @@ +vcpkg_from_github( + OUT_SOURCE_PATH SOURCE_PATH + REPO getml/reflect-cpp + REF "v${VERSION}" + SHA512 4be84fc69efd6f4ce766d38cedc8b1d0fd0fa8170e69293383f7dbd59c6bce45797f0e7cf653ef9c839b15fd7da702c9daf30efd34c779555fe4e5bd5eb29481 + HEAD_REF main +) + +if(VCPKG_TARGET_IS_WINDOWS) + vcpkg_check_linkage(ONLY_STATIC_LIBRARY) +endif() +string(COMPARE EQUAL "${VCPKG_LIBRARY_LINKAGE}" "dynamic" REFLECTCPP_BUILD_SHARED) + +# Overlay of the upstream vcpkg port: adds the "avro" feature (REFLECTCPP_AVRO + +# avro-c), which the registry port does not expose yet. Drop this overlay once the +# upstream port grows the feature. +vcpkg_check_features(OUT_FEATURE_OPTIONS FEATURE_OPTIONS + FEATURES + avro REFLECTCPP_AVRO + bson REFLECTCPP_BSON + capnproto REFLECTCPP_CAPNPROTO + cbor REFLECTCPP_CBOR + csv REFLECTCPP_CSV + flexbuffers REFLECTCPP_FLEXBUFFERS + msgpack REFLECTCPP_MSGPACK + parquet REFLECTCPP_PARQUET + toml REFLECTCPP_TOML + ubjson REFLECTCPP_UBJSON + xml REFLECTCPP_XML + yaml REFLECTCPP_YAML +) + +vcpkg_cmake_configure( + SOURCE_PATH ${SOURCE_PATH} + OPTIONS + ${FEATURE_OPTIONS} + -DREFLECTCPP_BUILD_TESTS=OFF + -DREFLECTCPP_BUILD_SHARED=${REFLECTCPP_BUILD_SHARED} + -DREFLECTCPP_USE_BUNDLED_DEPENDENCIES=OFF +) + +vcpkg_cmake_install() + +vcpkg_cmake_config_fixup( + CONFIG_PATH "lib/cmake/${PORT}" +) + +if("avro" IN_LIST FEATURES) + # The Avro backend links the static avro-c archive, whose link interface + # carries jansson::jansson — but the upstream config has no Avro + # find_dependency block, so consumers fail at generate time with + # "the target was not found". Define it before the exports are included. + vcpkg_replace_string("${CURRENT_PACKAGES_DIR}/share/${PORT}/${PORT}-config.cmake" + "include(\${CMAKE_CURRENT_LIST_DIR}/reflectcpp-exports.cmake)" + "include(CMakeFindDependencyMacro)\nfind_dependency(jansson CONFIG)\ninclude(\${CMAKE_CURRENT_LIST_DIR}/reflectcpp-exports.cmake)") +endif() + +file(REMOVE_RECURSE + "${CURRENT_PACKAGES_DIR}/debug/include" + "${CURRENT_PACKAGES_DIR}/debug/share" +) + +vcpkg_install_copyright(FILE_LIST "${SOURCE_PATH}/LICENSE") +configure_file("${CMAKE_CURRENT_LIST_DIR}/usage" "${CURRENT_PACKAGES_DIR}/share/${PORT}/usage" COPYONLY) diff --git a/vcpkg-overlay/reflectcpp/usage b/vcpkg-overlay/reflectcpp/usage new file mode 100644 index 00000000..ebc57e43 --- /dev/null +++ b/vcpkg-overlay/reflectcpp/usage @@ -0,0 +1,4 @@ +reflect-cpp provides CMake targets: + + find_package(reflectcpp CONFIG REQUIRED) + target_link_libraries(main PRIVATE reflectcpp::reflectcpp) diff --git a/vcpkg-overlay/reflectcpp/vcpkg.json b/vcpkg-overlay/reflectcpp/vcpkg.json new file mode 100644 index 00000000..b27b750c --- /dev/null +++ b/vcpkg-overlay/reflectcpp/vcpkg.json @@ -0,0 +1,142 @@ +{ + "name": "reflectcpp", + "version": "0.24.0", + "port-version": 2, + "description": "A C++ library for serialization and deserialization using reflection. Supports JSON, Avro, BSON, Cap'n Proto, CBOR, CSV, flexbuffers, msgpack, parquet, TOML, UBJSON, XML, YAML.", + "homepage": "https://github.com/getml/reflect-cpp/", + "license": "MIT", + "dependencies": [ + { + "name": "ctre", + "version>=": "3.10.0" + }, + { + "name": "vcpkg-cmake", + "host": true + }, + { + "name": "vcpkg-cmake-config", + "host": true + }, + { + "name": "yyjson", + "version>=": "0.10.0" + } + ], + "features": { + "avro": { + "description": "Support for the Avro format", + "dependencies": [ + { + "name": "avro-c", + "version>=": "1.12.1" + } + ] + }, + "bson": { + "description": "Support for the BSON format", + "dependencies": [ + { + "name": "libbson", + "version>=": "1.25.1" + } + ] + }, + "capnproto": { + "description": "Support for the Cap'n Proto format", + "dependencies": [ + { + "name": "capnproto", + "version>=": "1.0.2#1" + } + ] + }, + "cbor": { + "description": "Support for the CBOR format", + "dependencies": [ + { + "name": "jsoncons", + "version>=": "1.4.0" + } + ] + }, + "csv": { + "description": "Enable CSV support", + "dependencies": [ + { + "name": "arrow", + "features": [ + "csv" + ], + "version>=": "21.0.0" + } + ] + }, + "flexbuffers": { + "description": "Support for the flexbuffers format (part of flatbuffers)", + "dependencies": [ + { + "name": "flatbuffers", + "version>=": "23.5.26#1" + } + ] + }, + "msgpack": { + "description": "Support for the msgpack format", + "dependencies": [ + { + "name": "msgpack-c", + "version>=": "6.0.0" + } + ] + }, + "parquet": { + "description": "Enable parquet support", + "dependencies": [ + { + "name": "arrow", + "features": [ + "parquet" + ], + "version>=": "21.0.0" + } + ] + }, + "toml": { + "description": "Support for the TOML format", + "dependencies": [ + { + "name": "tomlplusplus", + "version>=": "3.4.0#1" + } + ] + }, + "ubjson": { + "description": "Support for the UBJSON format", + "dependencies": [ + { + "name": "jsoncons", + "version>=": "1.4.0" + } + ] + }, + "xml": { + "description": "Support for the XML format", + "dependencies": [ + { + "name": "pugixml", + "version>=": "1.15" + } + ] + }, + "yaml": { + "description": "Support for the YAML format", + "dependencies": [ + { + "name": "yaml-cpp", + "version>=": "0.8.0#1" + } + ] + } + } +} diff --git a/vcpkg.json b/vcpkg.json index 3452492d..c10d8666 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -43,6 +43,13 @@ "name": "protobuf", "version>=": "6.33.4#1" }, + { + "name": "reflectcpp", + "features": [ + "avro" + ], + "version>=": "0.24.0" + }, { "name": "snappy", "version>=": "1.2.2"