Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ set(SAMPLE_ST_SOURCES
st/SampleStQueueConsumer.cc
st/SampleStCheckpointConsumer.cc
)
# reflect-cpp powers jsonSchema<T>() (reflection-based JSON SerDe + schema). It is
# optional for this API-only PR: when the package is present the JSON sample is
# added and linked against it; when absent, only that one sample is skipped. (The
# reflectcpp vcpkg port does not yet ship an Avro backend, so it is not yet wired
# into the manifest; it will be added with the lib/st implementation.)
# reflect-cpp powers jsonSchema<T>() and avroSchema<T>() (reflection-based SerDe +
# derived schema). The vcpkg manifest carries reflectcpp plus avro-c (the C library
# behind reflect-cpp's header-only Avro backend); the find_package guard only
# matters for non-manifest builds (e.g. the CodeQL job), where the JSON sample is
# skipped.
find_package(reflectcpp CONFIG QUIET)
if (reflectcpp_FOUND)
list(APPEND SAMPLE_ST_SOURCES st/SampleStJsonSchema.cc)
Expand Down
9 changes: 5 additions & 4 deletions include/pulsar/st/AvroSchema.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ namespace pulsar::st {
namespace detail {
template <typename T>
struct AvroSerDe {
SchemaInfo info() const { return SchemaInfo(SchemaType::AVRO, "AVRO", rfl::avro::to_schema<T>()); }
SchemaInfo info() const {
return SchemaInfo(SchemaType::AVRO, "AVRO", rfl::avro::to_schema<T>().json_str());
}
Expected<void> encode(const T& value, std::vector<std::byte>& out) const {
try {
const std::string s = rfl::avro::write(value);
const std::vector<char> s = rfl::avro::write(value);
const auto* p = reinterpret_cast<const std::byte*>(s.data());
out.assign(p, p + s.size());
return {};
Expand All @@ -58,8 +60,7 @@ struct AvroSerDe {
}
Expected<T> decode(std::span<const std::byte> data) const {
try {
return rfl::avro::read<T>(std::string(reinterpret_cast<const char*>(data.data()), data.size()))
.value();
return rfl::avro::read<T>(reinterpret_cast<const char*>(data.data()), data.size()).value();
} catch (const std::exception& e) {
return unexpected(pulsar::ResultInvalidMessage, e.what());
}
Expand Down
34 changes: 31 additions & 3 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,37 @@ target_include_directories(PULSAR_OBJECT_LIB PUBLIC
"${CMAKE_SOURCE_DIR}/include"
"${CMAKE_BINARY_DIR}/include")

# --- Scalable topics (pulsar::st) implementation ----------------------------
# The st API (include/pulsar/st) targets C++20 while the classic client above
# stays C++17, so its implementation compiles in a separate object library with
# a per-target standard. The objects are merged into the same libpulsar
# shared/static artifacts below; a C++20-compiled object archive links fine
# into C++17 consumers of the classic API.
file(GLOB PULSAR_ST_SOURCES st/*.cc st/*.h)
add_library(ST_OBJECT_LIB OBJECT ${PULSAR_ST_SOURCES})
set_property(TARGET ST_OBJECT_LIB PROPERTY POSITION_INDEPENDENT_CODE 1)
set_target_properties(ST_OBJECT_LIB PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON)
if (INTEGRATE_VCPKG)
target_link_libraries(ST_OBJECT_LIB PROTO_OBJECTS)
# The st sources #include the generated PulsarApi.pb.h. Linking PROTO_OBJECTS
# propagates its include dir, but the OBJECT->OBJECT link does not reliably
# order the build under the Visual Studio generator, so the st sources can
# compile before the proto is generated. Force the ordering explicitly.
add_dependencies(ST_OBJECT_LIB PROTO_OBJECTS)
else ()
# Legacy (non-vcpkg) build generates the proto as part of PULSAR_OBJECT_LIB;
# ensure it exists before the st sources that include it compile.
add_dependencies(ST_OBJECT_LIB PULSAR_OBJECT_LIB)
endif ()
target_include_directories(ST_OBJECT_LIB PRIVATE ${LIB_AUTOGEN_DIR})
target_include_directories(ST_OBJECT_LIB PUBLIC
"${CMAKE_SOURCE_DIR}"
"${CMAKE_SOURCE_DIR}/include"
"${CMAKE_BINARY_DIR}/include")

include(CheckCXXSymbolExists)
if (BUILD_DYNAMIC_LIB)
add_library(pulsarShared SHARED $<TARGET_OBJECTS:PULSAR_OBJECT_LIB>)
add_library(pulsarShared SHARED $<TARGET_OBJECTS:PULSAR_OBJECT_LIB> $<TARGET_OBJECTS:ST_OBJECT_LIB>)
set_property(TARGET pulsarShared PROPERTY OUTPUT_NAME ${LIB_NAME_SHARED})
set_property(TARGET pulsarShared PROPERTY VERSION ${LIBRARY_VERSION})
target_link_libraries(pulsarShared PRIVATE ${COMMON_LIBS} ${CMAKE_DL_LIBS})
Expand All @@ -102,7 +130,7 @@ if(HAVE_AUXV_GETAUXVAL)
endif()

if (BUILD_STATIC_LIB)
add_library(pulsarStatic STATIC $<TARGET_OBJECTS:PULSAR_OBJECT_LIB>)
add_library(pulsarStatic STATIC $<TARGET_OBJECTS:PULSAR_OBJECT_LIB> $<TARGET_OBJECTS:ST_OBJECT_LIB>)
target_include_directories(pulsarStatic PUBLIC
${CMAKE_BINARY_DIR}/include
${CMAKE_SOURCE_DIR}
Expand Down Expand Up @@ -139,7 +167,7 @@ if (LINK_STATIC AND BUILD_STATIC_LIB)
set(${OUTLIST} ${TEMP_OUT} PARENT_SCOPE)
endfunction(remove_libtype)

add_library(pulsarStaticWithDeps STATIC ${PULSAR_SOURCES})
add_library(pulsarStaticWithDeps STATIC ${PULSAR_SOURCES} $<TARGET_OBJECTS:ST_OBJECT_LIB>)
target_include_directories(pulsarStaticWithDeps PRIVATE ${dlfcn-win32_INCLUDE_DIRS})
if (VCPKG_TRIPLET)
# Collect ALL vcpkg-installed static archives so every transitive dependency
Expand Down
48 changes: 48 additions & 0 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
LOG_DEBUG("Current max message size is: " << maxMessageSize_);
}

if (cmdConnected.has_feature_flags()) {
supportsScalableTopics_.store(cmdConnected.feature_flags().supports_scalable_topics(),
std::memory_order_release);
}

Lock lock(mutex_);

if (isClosed()) {
Expand Down Expand Up @@ -988,6 +993,10 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
handleAckResponse(incomingCmd.ackresponse());
break;

case BaseCommand::SCALABLE_TOPIC_UPDATE:
handleScalableTopicUpdate(incomingCmd.scalabletopicupdate());
break;

default:
LOG_WARN(cnxString() << "Received invalid message from server");
close(Error{ResultDisconnected, cnxString() + "Received invalid message from server"});
Expand Down Expand Up @@ -1267,6 +1276,7 @@ const std::future<void>& ClientConnection::close(Error&& error, bool switchClust
auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_);
auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_);
auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_);
auto scalableTopicSessions = std::move(scalableTopicSessions_);

numOfPendingLookupRequest_ = 0;

Expand Down Expand Up @@ -1341,6 +1351,11 @@ const std::future<void>& ClientConnection::close(Error&& error, bool switchClust
for (auto& kv : pendingLookupRequests) {
kv.second->fail(error);
}
// Notify scalable-topic DAG-watch sessions so they can re-establish on a
// new connection.
for (auto& kv : scalableTopicSessions) {
kv.second(error.result, nullptr);
}
for (auto& kv : pendingConsumerStatsMap) {
LOG_ERROR(cnxString() << " Closing Client Connection, please try again later");
kv.second.setFailed(result);
Expand Down Expand Up @@ -1379,6 +1394,39 @@ void ClientConnection::removeConsumer(int consumerId) {
consumers_.erase(consumerId);
}

bool ClientConnection::registerScalableTopicSession(uint64_t sessionId,
ScalableTopicUpdateListener listener) {
Lock lock(mutex_);
if (isClosed()) {
return false;
}
scalableTopicSessions_[sessionId] = std::move(listener);
return true;
}

void ClientConnection::removeScalableTopicSession(uint64_t sessionId) {
Lock lock(mutex_);
scalableTopicSessions_.erase(sessionId);
}

void ClientConnection::handleScalableTopicUpdate(const proto::CommandScalableTopicUpdate& update) {
ScalableTopicUpdateListener listener;
{
Lock lock(mutex_);
auto it = scalableTopicSessions_.find(update.session_id());
if (it != scalableTopicSessions_.end()) {
listener = it->second;
}
}
if (listener) {
listener(ResultOk, &update);
} else {
// A push may race with a just-closed session; drop it rather than
// treating it as a protocol violation.
LOG_WARN(cnxString() << "Received SCALABLE_TOPIC_UPDATE for unknown session " << update.session_id());
}
}

const std::string& ClientConnection::brokerAddress() const { return physicalAddress_; }

int ClientConnection::getServerProtocolVersion() const { return serverProtocolVersion_; }
Expand Down
24 changes: 24 additions & 0 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class CommandGetLastMessageIdResponse;
class CommandLookupTopicResponse;
class CommandPartitionedTopicMetadataResponse;
class CommandProducerSuccess;
class CommandScalableTopicUpdate;
class CommandSendReceipt;
class CommandSendError;
class CommandSuccess;
Expand Down Expand Up @@ -187,6 +188,23 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void removeProducer(int producerId);
void removeConsumer(int consumerId);

// Scalable topics (pulsar::st): a DAG-watch session registered on this
// connection. The listener is invoked with (ResultOk, &update) for every
// CommandScalableTopicUpdate whose session_id matches (the initial lookup
// response and later pushes alike), and with (error, nullptr) once when the
// connection closes, after which the registration is gone.
typedef std::function<void(Result, const proto::CommandScalableTopicUpdate*)> ScalableTopicUpdateListener;

/**
* Register a DAG-watch session. Returns false (without registering) if the
* connection is already closed — the caller should acquire a new connection.
*/
bool registerScalableTopicSession(uint64_t sessionId, ScalableTopicUpdateListener listener);
void removeScalableTopicSession(uint64_t sessionId);

/** Whether the broker advertised scalable-topics support on CONNECTED. */
bool supportsScalableTopics() const { return supportsScalableTopics_.load(std::memory_order_acquire); }

/**
* Send a request with a specific Id over the connection. The future will be
* triggered when the response for this request is received
Expand Down Expand Up @@ -354,6 +372,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
typedef std::unordered_map<long, ConsumerImplWeakPtr> ConsumersMap;
ConsumersMap consumers_;

// Scalable topics: DAG-watch sessions by client-assigned session id.
typedef std::map<uint64_t, ScalableTopicUpdateListener> ScalableTopicSessionsMap;
ScalableTopicSessionsMap scalableTopicSessions_;
std::atomic<bool> supportsScalableTopics_{false};

typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> PendingConsumerStatsMap;
PendingConsumerStatsMap pendingConsumerStatsMap_;

Expand Down Expand Up @@ -436,6 +459,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void handleGetTopicOfNamespaceResponse(const proto::CommandGetTopicsOfNamespaceResponse&);
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
void handleAckResponse(const proto::CommandAckResponse&);
void handleScalableTopicUpdate(const proto::CommandScalableTopicUpdate&);
optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&);
Expand Down
56 changes: 56 additions & 0 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ using proto::CommandLookupTopic;
using proto::CommandPartitionedTopicMetadata;
using proto::CommandProducer;
using proto::CommandRedeliverUnacknowledgedMessages;
using proto::CommandScalableTopicLookup;
using proto::CommandSeek;
using proto::CommandSend;
using proto::CommandSubscribe;
Expand Down Expand Up @@ -285,6 +286,7 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const
FeatureFlags* flags = connect->mutable_feature_flags();
flags->set_supports_auth_refresh(true);
flags->set_supports_broker_entry_metadata(true);
flags->set_supports_scalable_topics(true);
if (connectingThroughProxy) {
Url logicalAddressUrl;
Url::parse(logicalAddress, logicalAddressUrl);
Expand Down Expand Up @@ -552,6 +554,24 @@ SharedBuffer Commands::newCloseConsumer(uint64_t consumerId, uint64_t requestId)
return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newScalableTopicLookup(uint64_t sessionId, const std::string& topic,
bool createIfMissing) {
BaseCommand cmd;
cmd.set_type(BaseCommand::SCALABLE_TOPIC_LOOKUP);
CommandScalableTopicLookup* lookup = cmd.mutable_scalabletopiclookup();
lookup->set_session_id(sessionId);
lookup->set_topic(topic);
lookup->set_create_if_missing(createIfMissing);
return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newScalableTopicClose(uint64_t sessionId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::SCALABLE_TOPIC_CLOSE);
cmd.mutable_scalabletopicclose()->set_session_id(sessionId);
return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newPing() {
BaseCommand cmd;
cmd.set_type(BaseCommand::PING);
Expand Down Expand Up @@ -808,6 +828,42 @@ std::string Commands::messageType(BaseCommand_Type type) {
case BaseCommand::WATCH_TOPIC_LIST_CLOSE:
return "WATCH_TOPIC_LIST_CLOSE";
break;
case BaseCommand::SCALABLE_TOPIC_LOOKUP:
return "SCALABLE_TOPIC_LOOKUP";
break;
case BaseCommand::SCALABLE_TOPIC_UPDATE:
return "SCALABLE_TOPIC_UPDATE";
break;
case BaseCommand::SCALABLE_TOPIC_CLOSE:
return "SCALABLE_TOPIC_CLOSE";
break;
case BaseCommand::SCALABLE_TOPIC_SUBSCRIBE:
return "SCALABLE_TOPIC_SUBSCRIBE";
break;
case BaseCommand::SCALABLE_TOPIC_SUBSCRIBE_RESPONSE:
return "SCALABLE_TOPIC_SUBSCRIBE_RESPONSE";
break;
case BaseCommand::SCALABLE_TOPIC_ASSIGNMENT_UPDATE:
return "SCALABLE_TOPIC_ASSIGNMENT_UPDATE";
break;
case BaseCommand::WATCH_SCALABLE_TOPICS:
return "WATCH_SCALABLE_TOPICS";
break;
case BaseCommand::WATCH_SCALABLE_TOPICS_UPDATE:
return "WATCH_SCALABLE_TOPICS_UPDATE";
break;
case BaseCommand::WATCH_SCALABLE_TOPICS_CLOSE:
return "WATCH_SCALABLE_TOPICS_CLOSE";
break;
case BaseCommand::WATCH_TC_ASSIGNMENTS:
return "WATCH_TC_ASSIGNMENTS";
break;
case BaseCommand::WATCH_TC_ASSIGNMENTS_UPDATE:
return "WATCH_TC_ASSIGNMENTS_UPDATE";
break;
case BaseCommand::WATCH_TC_ASSIGNMENTS_CLOSE:
return "WATCH_TC_ASSIGNMENTS_CLOSE";
break;
};
BOOST_THROW_EXCEPTION(std::logic_error("Invalid BaseCommand enumeration value"));
}
Expand Down
7 changes: 7 additions & 0 deletions lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ class Commands {
static SharedBuffer newRedeliverUnacknowledgedMessages(uint64_t consumerId,
const std::set<MessageId>& messageIds);

// Scalable topics (pulsar::st): open/close a DAG-watch session. The broker
// answers (and later pushes) CommandScalableTopicUpdate correlated by the
// client-assigned sessionId.
static SharedBuffer newScalableTopicLookup(uint64_t sessionId, const std::string& topic,
bool createIfMissing);
static SharedBuffer newScalableTopicClose(uint64_t sessionId);

static std::string messageType(BaseCommand_Type type);

static void initBatchMessageMetadata(const Message& msg, pulsar::proto::MessageMetadata& batchMetadata);
Expand Down
2 changes: 2 additions & 0 deletions lib/Murmur3_32Hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ int32_t Murmur3_32Hash::makeHash(const std::string &key) {
return static_cast<int32_t>(makeHash(&key.front(), key.length()) & std::numeric_limits<int32_t>::max());
}

uint32_t Murmur3_32Hash::makeRawHash(const void *key, int64_t len) { return makeHash(key, len); }

uint32_t Murmur3_32Hash::makeHash(const void *key, const int64_t len) {
const uint8_t *data = reinterpret_cast<const uint8_t *>(key);
const int nblocks = len / MACRO_CHUNK_SIZE;
Expand Down
5 changes: 5 additions & 0 deletions lib/Murmur3_32Hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class PULSAR_PUBLIC Murmur3_32Hash : public Hash {

int32_t makeHash(const std::string& key);

// The raw (unmasked) 32-bit hash. Unlike makeHash(), bit 31 is NOT cleared;
// the scalable-topics key hashing splits this into two independent 16-bit
// halves and needs the high half full-range.
uint32_t makeRawHash(const void* key, int64_t len);

private:
uint32_t seed;

Expand Down
27 changes: 27 additions & 0 deletions lib/st/Checkpoint.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <pulsar/st/Checkpoint.h>

namespace pulsar::st {

// An empty checkpoint: impl_ stays null, so the handle is falsy under
// operator bool and must not be used as a start position.
Checkpoint::Checkpoint() = default;

} // namespace pulsar::st
Loading
Loading