Date: Fri, 28 Mar 2025 09:54:14 GMT From: Yuri Victorovich <yuri@FreeBSD.org> To: ports-committers@FreeBSD.org, dev-commits-ports-all@FreeBSD.org, dev-commits-ports-main@FreeBSD.org Subject: git: 1559e2b08bd0 - main - net-p2p/pulsar-client-cpp: Unbreak by backporting a patch Message-ID: <202503280954.52S9sE3F090365@gitrepo.freebsd.org>
index | next in thread | raw e-mail
The branch main has been updated by yuri: URL: https://cgit.FreeBSD.org/ports/commit/?id=1559e2b08bd0410065139656f1b0c76c8b0d5831 commit 1559e2b08bd0410065139656f1b0c76c8b0d5831 Author: Yuri Victorovich <yuri@FreeBSD.org> AuthorDate: 2025-03-28 09:21:23 +0000 Commit: Yuri Victorovich <yuri@FreeBSD.org> CommitDate: 2025-03-28 09:54:10 +0000 net-p2p/pulsar-client-cpp: Unbreak by backporting a patch ,,, that fixes boost compatibility. --- net-p2p/pulsar-client-cpp/Makefile | 1 - net-p2p/pulsar-client-cpp/distinfo | 2 +- .../patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 | 1111 ++++++++++++++++++++ 3 files changed, 1112 insertions(+), 2 deletions(-) diff --git a/net-p2p/pulsar-client-cpp/Makefile b/net-p2p/pulsar-client-cpp/Makefile index 23a17a8c156e..19d40f02baf2 100644 --- a/net-p2p/pulsar-client-cpp/Makefile +++ b/net-p2p/pulsar-client-cpp/Makefile @@ -13,7 +13,6 @@ LICENSE= APACHE20 LICENSE_FILE= ${WRKSRC}/LICENSE ONLY_FOR_ARCHS= amd64 i386 # due to requirement of instruction sets crc32, pclmul -BROKEN= compilation fails with boost-libs-1.87.0, see https://github.com/apache/pulsar-client-cpp/issues/475 BROKEN_i386= compilation fails due to overflow, see https://github.com/apache/pulsar-client-cpp/issues/449 BUILD_DEPENDS= ${LOCALBASE}/include/boost/algorithm/string.hpp:devel/boost-libs diff --git a/net-p2p/pulsar-client-cpp/distinfo b/net-p2p/pulsar-client-cpp/distinfo index 3a3e4dec1501..3d7c80b515e0 100644 --- a/net-p2p/pulsar-client-cpp/distinfo +++ b/net-p2p/pulsar-client-cpp/distinfo @@ -1,3 +1,3 @@ -TIMESTAMP = 1736572558 +TIMESTAMP = 1743152964 SHA256 (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 33d6ea82e1f03a2e77f85d3b6ee8e3ac37bfd760ea450537ec2e59ef122c4671 SIZE (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 1604627 diff --git a/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 b/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 new file mode 100644 index 000000000000..ca6cb6a02135 --- /dev/null +++ b/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 @@ -0,0 +1,1111 @@ +- backport of https://github.com/apache/pulsar-client-cpp/pull/477 unbreaking for boost 1.87+ + +diff --git CMakeLists.txt CMakeLists.txt +index b0046534..2efeec89 100644 +--- CMakeLists.txt ++++ CMakeLists.txt +@@ -19,15 +19,16 @@ + + cmake_minimum_required(VERSION 3.13) + +-option(USE_ASIO "Use Asio instead of Boost.Asio" OFF) +- + option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF) + if (INTEGRATE_VCPKG) +- set(USE_ASIO ON) ++ option(USE_ASIO "Use Asio instead of Boost.Asio" ON) + if (NOT CMAKE_TOOLCHAIN_FILE) + set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake") + endif () ++else () ++ option(USE_ASIO "Use Asio instead of Boost.Asio" OFF) + endif () ++message(STATUS "USE_ASIO: ${USE_ASIO}") + + option(BUILD_TESTS "Build tests" ON) + message(STATUS "BUILD_TESTS: " ${BUILD_TESTS}) +diff --git lib/AckGroupingTrackerEnabled.cc lib/AckGroupingTrackerEnabled.cc +index 7233b2c9..bc8da970 100644 +--- lib/AckGroupingTrackerEnabled.cc ++++ lib/AckGroupingTrackerEnabled.cc +@@ -117,8 +117,7 @@ void AckGroupingTrackerEnabled::close() { + this->flush(); + std::lock_guard<std::mutex> lock(this->mutexTimer_); + if (this->timer_) { +- ASIO_ERROR ec; +- this->timer_->cancel(ec); ++ this->timer_->cancel(); + } + } + +@@ -168,7 +167,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() { + + std::lock_guard<std::mutex> lock(this->mutexTimer_); + this->timer_ = this->executor_->createDeadlineTimer(); +- this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); ++ this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); + auto self = shared_from_this(); + this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void { + if (!ec) { +diff --git lib/ClientConnection.cc lib/ClientConnection.cc +index 2037722f..de226a85 100644 +--- lib/ClientConnection.cc ++++ lib/ClientConnection.cc +@@ -266,7 +266,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: + if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) { + LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port()); + std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host(); +- tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost)); ++ tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost)); + } + + LOG_DEBUG("TLS SNI Host: " << serviceUrl.host()); +@@ -309,7 +309,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC + // Only send keep-alive probes if the broker supports it + keepAliveTimer_ = executor_->createDeadlineTimer(); + if (keepAliveTimer_) { +- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_)); ++ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_)); + auto weakSelf = weak_from_this(); + keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { + auto self = weakSelf.lock(); +@@ -354,7 +354,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta + // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero + // Check if we have a timer still before we set the request timer to pop again. + if (consumerStatsRequestTimer_) { +- consumerStatsRequestTimer_->expires_from_now(operationsTimeout_); ++ consumerStatsRequestTimer_->expires_after(operationsTimeout_); + auto weakSelf = weak_from_this(); + consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) { + auto self = weakSelf.lock(); +@@ -388,129 +388,87 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE> tcp_kee + typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep_alive_idle; + #endif + +-/* +- * TCP Connect handler +- * +- * if async_connect without any error, connected_ would be set to true +- * at this point the connection is deemed valid to be used by clients of this class +- */ +-void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { +- if (!err) { +- std::stringstream cnxStringStream; +- try { +- cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() +- << "] "; +- cnxString_ = cnxStringStream.str(); +- } catch (const ASIO_SYSTEM_ERROR& e) { +- LOG_ERROR("Failed to get endpoint: " << e.what()); +- close(ResultRetryable); +- return; +- } +- if (logicalAddress_ == physicalAddress_) { +- LOG_INFO(cnxString_ << "Connected to broker"); +- } else { +- LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_ +- << ", proxy: " << proxyServiceUrl_ +- << ", physical address:" << physicalAddress_); +- } ++void ClientConnection::completeConnect(ASIO::ip::tcp::endpoint endpoint) { ++ std::stringstream cnxStringStream; ++ try { ++ cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] "; ++ cnxString_ = cnxStringStream.str(); ++ } catch (const ASIO_SYSTEM_ERROR& e) { ++ LOG_ERROR("Failed to get endpoint: " << e.what()); ++ close(ResultRetryable); ++ return; ++ } ++ if (logicalAddress_ == physicalAddress_) { ++ LOG_INFO(cnxString_ << "Connected to broker"); ++ } else { ++ LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_ ++ << ", proxy: " << proxyServiceUrl_ << ", physical address:" << physicalAddress_); ++ } + +- Lock lock(mutex_); +- if (isClosed()) { +- LOG_INFO(cnxString_ << "Connection already closed"); +- return; +- } +- state_ = TcpConnected; +- lock.unlock(); ++ Lock lock(mutex_); ++ if (isClosed()) { ++ LOG_INFO(cnxString_ << "Connection already closed"); ++ return; ++ } ++ state_ = TcpConnected; ++ lock.unlock(); + +- ASIO_ERROR error; +- socket_->set_option(tcp::no_delay(true), error); +- if (error) { +- LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message()); +- } ++ ASIO_ERROR error; ++ socket_->set_option(tcp::no_delay(true), error); ++ if (error) { ++ LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message()); ++ } + +- socket_->set_option(tcp::socket::keep_alive(true), error); +- if (error) { +- LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message()); +- } ++ socket_->set_option(tcp::socket::keep_alive(true), error); ++ if (error) { ++ LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message()); ++ } + +- // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this +- // should never happen, given that we're sending our own keep-alive probes (within the TCP +- // connection) every 30 seconds +- socket_->set_option(tcp_keep_alive_idle(1 * 60), error); +- if (error) { +- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message()); +- } ++ // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this ++ // should never happen, given that we're sending our own keep-alive probes (within the TCP ++ // connection) every 30 seconds ++ socket_->set_option(tcp_keep_alive_idle(1 * 60), error); ++ if (error) { ++ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message()); ++ } + +- // Send up to 10 probes before declaring the connection broken +- socket_->set_option(tcp_keep_alive_count(10), error); +- if (error) { +- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message()); +- } ++ // Send up to 10 probes before declaring the connection broken ++ socket_->set_option(tcp_keep_alive_count(10), error); ++ if (error) { ++ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message()); ++ } + +- // Interval between probes: 6 seconds +- socket_->set_option(tcp_keep_alive_interval(6), error); +- if (error) { +- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message()); +- } ++ // Interval between probes: 6 seconds ++ socket_->set_option(tcp_keep_alive_interval(6), error); ++ if (error) { ++ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message()); ++ } + +- if (tlsSocket_) { +- if (!isTlsAllowInsecureConnection_) { +- ASIO_ERROR err; +- Url service_url; +- if (!Url::parse(physicalAddress_, service_url)) { +- LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); +- close(); +- return; +- } +- } +- auto weakSelf = weak_from_this(); +- auto socket = socket_; +- auto tlsSocket = tlsSocket_; +- // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation +- // fault might happen +- auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) { +- auto self = weakSelf.lock(); +- if (self) { +- self->handleHandshake(err); +- } +- }; +- tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client, +- ASIO::bind_executor(strand_, callback)); +- } else { +- handleHandshake(ASIO_SUCCESS); +- } +- } else if (endpointIterator != tcp::resolver::iterator()) { +- LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message()); +- // The connection failed. Try the next endpoint in the list. +- ASIO_ERROR closeError; +- socket_->close(closeError); // ignore the error of close +- if (closeError) { +- LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); +- } +- connectTimeoutTask_->stop(); +- ++endpointIterator; +- if (endpointIterator != tcp::resolver::iterator()) { +- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); +- connectTimeoutTask_->start(); +- tcp::endpoint endpoint = *endpointIterator; +- auto weakSelf = weak_from_this(); +- socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) { +- auto self = weakSelf.lock(); +- if (self) { +- self->handleTcpConnected(err, endpointIterator); +- } +- }); +- } else { +- if (err == ASIO::error::operation_aborted) { +- // TCP connect timeout, which is not retryable ++ if (tlsSocket_) { ++ if (!isTlsAllowInsecureConnection_) { ++ ASIO_ERROR err; ++ Url service_url; ++ if (!Url::parse(physicalAddress_, service_url)) { ++ LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); + close(); +- } else { +- close(ResultRetryable); ++ return; + } + } ++ auto weakSelf = weak_from_this(); ++ auto socket = socket_; ++ auto tlsSocket = tlsSocket_; ++ // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation ++ // fault might happen ++ auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) { ++ auto self = weakSelf.lock(); ++ if (self) { ++ self->handleHandshake(err); ++ } ++ }; ++ tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client, ++ ASIO::bind_executor(strand_, callback)); + } else { +- LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); +- close(ResultRetryable); ++ handleHandshake(ASIO_SUCCESS); + } + } + +@@ -603,60 +561,71 @@ void ClientConnection::tcpConnectAsync() { + } + + LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port()); +- tcp::resolver::query query(service_url.host(), std::to_string(service_url.port())); ++ tcp::resolver::endpoint_type endpoint(ASIO::ip::make_address(service_url.host()), service_url.port()); + auto weakSelf = weak_from_this(); +- resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) { +- auto self = weakSelf.lock(); +- if (self) { +- self->handleResolve(err, iterator); +- } +- }); ++ resolver_->async_resolve( ++ endpoint, [this, weakSelf](const ASIO_ERROR& err, tcp::resolver::results_type results) { ++ auto self = weakSelf.lock(); ++ if (!self) { ++ return; ++ } ++ if (err) { ++ std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_; ++ LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); ++ close(); ++ return; ++ } ++ if (results.empty()) { ++ LOG_ERROR(cnxString_ << "No IP address found"); ++ close(); ++ return; ++ } ++ connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) { ++ ClientConnectionPtr ptr = weakSelf.lock(); ++ if (!ptr) { ++ // Connection was already destroyed ++ return; ++ } ++ ++ if (ptr->state_ != Ready) { ++ LOG_ERROR(ptr->cnxString_ << "Connection was not established in " ++ << ptr->connectTimeoutTask_->getPeriodMs() ++ << " ms, close the socket"); ++ PeriodicTask::ErrorCode err; ++ ptr->socket_->close(err); ++ if (err) { ++ LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message()); ++ } ++ } ++ ptr->connectTimeoutTask_->stop(); ++ }); ++ connectTimeoutTask_->start(); ++ std::vector<tcp::resolver::endpoint_type> endpoints; ++ for (const auto& result : results) { ++ endpoints.emplace_back(result.endpoint()); ++ } ++ asyncConnect(endpoints, 0); ++ }); + } + +-void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { +- if (err) { +- std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_; +- LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); +- close(); ++void ClientConnection::asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index) { ++ if (index >= endpoints.size()) { ++ close(ResultRetryable); + return; + } +- + auto weakSelf = weak_from_this(); +- connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) { +- ClientConnectionPtr ptr = weakSelf.lock(); +- if (!ptr) { +- // Connection was already destroyed ++ socket_->async_connect(endpoints[index], [this, weakSelf, endpoints, index](const ASIO_ERROR& err) { ++ auto self = weakSelf.lock(); ++ if (!self) { + return; + } +- +- if (ptr->state_ != Ready) { +- LOG_ERROR(ptr->cnxString_ << "Connection was not established in " +- << ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket"); +- PeriodicTask::ErrorCode err; +- ptr->socket_->close(err); +- if (err) { +- LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message()); +- } ++ if (err) { ++ LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); ++ asyncConnect(endpoints, index + 1); ++ return; + } +- ptr->connectTimeoutTask_->stop(); ++ completeConnect(endpoints[index]); + }); +- +- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); +- connectTimeoutTask_->start(); +- if (endpointIterator != tcp::resolver::iterator()) { +- LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() // +- << " to " << endpointIterator->endpoint()); +- socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) { +- auto self = weakSelf.lock(); +- if (self) { +- self->handleTcpConnected(err, endpointIterator); +- } +- }); +- } else { +- LOG_WARN(cnxString_ << "No IP address found"); +- close(); +- return; +- } + } + + void ClientConnection::readNextCommand() { +@@ -1058,7 +1027,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request + LookupRequestData requestData; + requestData.promise = promise; + requestData.timer = executor_->createDeadlineTimer(); +- requestData.timer->expires_from_now(operationsTimeout_); ++ requestData.timer->expires_after(operationsTimeout_); + auto weakSelf = weak_from_this(); + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +@@ -1174,8 +1143,9 @@ void ClientConnection::sendPendingCommands() { + PairSharedBuffer buffer = + Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); + +- // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the +- // callback is called, an invalid buffer range might be passed to the underlying socket send. ++ // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before ++ // the callback is called, an invalid buffer range might be passed to the underlying socket ++ // send. + asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { + handleSendPair(err); + })); +@@ -1198,7 +1168,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cm + + PendingRequestData requestData; + requestData.timer = executor_->createDeadlineTimer(); +- requestData.timer->expires_from_now(operationsTimeout_); ++ requestData.timer->expires_after(operationsTimeout_); + auto weakSelf = weak_from_this(); + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +@@ -1251,7 +1221,7 @@ void ClientConnection::handleKeepAliveTimeout() { + // be zero And we do not attempt to dereference the pointer. + Lock lock(mutex_); + if (keepAliveTimer_) { +- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_)); ++ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_)); + auto weakSelf = weak_from_this(); + keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { + auto self = weakSelf.lock(); +@@ -1430,7 +1400,7 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u + LastMessageIdRequestData requestData; + requestData.promise = promise; + requestData.timer = executor_->createDeadlineTimer(); +- requestData.timer->expires_from_now(operationsTimeout_); ++ requestData.timer->expires_after(operationsTimeout_); + auto weakSelf = weak_from_this(); + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +@@ -1478,7 +1448,7 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top + lock.unlock(); + + auto weakSelf = weak_from_this(); +- timer->expires_from_now(operationsTimeout_); ++ timer->expires_after(operationsTimeout_); + timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); + if (!self) { +@@ -2047,8 +2017,7 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) { + auto it = pendingRequests_.find(requestId); + if (it != pendingRequests_.end()) { + it->second.promise.setFailed(ResultDisconnected); +- ASIO_ERROR ec; +- it->second.timer->cancel(ec); ++ it->second.timer->cancel(); + pendingRequests_.erase(it); + } + } +diff --git lib/ClientConnection.h lib/ClientConnection.h +index 7646f85e..14e07652 100644 +--- lib/ClientConnection.h ++++ lib/ClientConnection.h +@@ -25,13 +25,13 @@ + #include <atomic> + #ifdef USE_ASIO + #include <asio/bind_executor.hpp> +-#include <asio/io_service.hpp> ++#include <asio/io_context.hpp> + #include <asio/ip/tcp.hpp> + #include <asio/ssl/stream.hpp> + #include <asio/strand.hpp> + #else + #include <boost/asio/bind_executor.hpp> +-#include <boost/asio/io_service.hpp> ++#include <boost/asio/io_context.hpp> + #include <boost/asio/ip/tcp.hpp> + #include <boost/asio/ssl/stream.hpp> + #include <boost/asio/strand.hpp> +@@ -231,13 +231,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien + DeadlineTimerPtr timer; + }; + +- /* +- * handler for connectAsync +- * creates a ConnectionPtr which has a valid ClientConnection object +- * although not usable at this point, since this is just tcp connection +- * Pulsar - Connect/Connected has yet to happen +- */ +- void handleTcpConnected(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator); ++ void asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index); ++ void completeConnect(ASIO::ip::tcp::endpoint endpoint); + + void handleHandshake(const ASIO_ERROR& err); + +@@ -260,8 +255,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien + + void handlePulsarConnected(const proto::CommandConnected& cmdConnected); + +- void handleResolve(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator); +- + void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd); + void handleSendPair(const ASIO_ERROR& err); + void sendPendingCommands(); +@@ -324,7 +317,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien + */ + SocketPtr socket_; + TlsSocketPtr tlsSocket_; +- ASIO::strand<ASIO::io_service::executor_type> strand_; ++ ASIO::strand<ASIO::io_context::executor_type> strand_; + + const std::string logicalAddress_; + /* +diff --git lib/ConsumerImpl.cc lib/ConsumerImpl.cc +index 250845b3..cfdb0b2d 100644 +--- lib/ConsumerImpl.cc ++++ lib/ConsumerImpl.cc +@@ -422,7 +422,7 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, b + } + + void ConsumerImpl::triggerCheckExpiredChunkedTimer() { +- checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); ++ checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); + std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()}; + checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void { + auto self = weakSelf.lock(); +@@ -1668,7 +1668,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time + } + remainTime -= next; + +- timer->expires_from_now(next); ++ timer->expires_after(next); + + auto self = shared_from_this(); + timer->async_wait([this, backoff, remainTime, timer, next, callback, +@@ -1791,9 +1791,8 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() { + } + + void ConsumerImpl::cancelTimers() noexcept { +- ASIO_ERROR ec; +- batchReceiveTimer_->cancel(ec); +- checkExpiredChunkedTimer_->cancel(ec); ++ batchReceiveTimer_->cancel(); ++ checkExpiredChunkedTimer_->cancel(); + unAckedMessageTrackerPtr_->stop(); + consumerStatsBasePtr_->stop(); + } +diff --git lib/ConsumerImplBase.cc lib/ConsumerImplBase.cc +index 098f2d5b..76d99370 100644 +--- lib/ConsumerImplBase.cc ++++ lib/ConsumerImplBase.cc +@@ -51,7 +51,7 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi + + void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) { + if (timeoutMs > 0) { +- batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs)); ++ batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs)); + std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()}; + batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +diff --git lib/ExecutorService.cc lib/ExecutorService.cc +index 794e3619..7f2a2c14 100644 +--- lib/ExecutorService.cc ++++ lib/ExecutorService.cc +@@ -18,6 +18,12 @@ + */ + #include "ExecutorService.h" + ++#ifdef USE_ASIO ++#include <asio/post.hpp> ++#else ++#include <boost/asio/post.hpp> ++#endif ++ + #include "LogUtils.h" + #include "TimeUtils.h" + DECLARE_LOG_OBJECT() +@@ -31,18 +37,13 @@ ExecutorService::~ExecutorService() { close(0); } + void ExecutorService::start() { + auto self = shared_from_this(); + std::thread t{[this, self] { +- LOG_DEBUG("Run io_service in a single thread"); +- ASIO_ERROR ec; ++ LOG_DEBUG("Run io_context in a single thread"); + while (!closed_) { +- io_service_.restart(); +- IOService::work work{getIOService()}; +- io_service_.run(ec); +- } +- if (ec) { +- LOG_ERROR("Failed to run io_service: " << ec.message()); +- } else { +- LOG_DEBUG("Event loop of ExecutorService exits successfully"); ++ io_context_.restart(); ++ auto work{ASIO::make_work_guard(io_context_)}; ++ io_context_.run(); + } ++ LOG_DEBUG("Event loop of ExecutorService exits successfully"); + { + std::lock_guard<std::mutex> lock{mutex_}; + ioServiceDone_ = true; +@@ -63,12 +64,12 @@ ExecutorServicePtr ExecutorService::create() { + } + + /* +- * factory method of ASIO::ip::tcp::socket associated with io_service_ instance ++ * factory method of ASIO::ip::tcp::socket associated with io_context_ instance + * @ returns shared_ptr to this socket + */ + SocketPtr ExecutorService::createSocket() { + try { +- return SocketPtr(new ASIO::ip::tcp::socket(io_service_)); ++ return SocketPtr(new ASIO::ip::tcp::socket(io_context_)); + } catch (const ASIO_SYSTEM_ERROR &e) { + restart(); + auto error = std::string("Failed to create socket: ") + e.what(); +@@ -82,12 +83,12 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, ASIO::ssl::cont + } + + /* +- * factory method of Resolver object associated with io_service_ instance ++ * factory method of Resolver object associated with io_context_ instance + * @returns shraed_ptr to resolver object + */ + TcpResolverPtr ExecutorService::createTcpResolver() { + try { +- return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_)); ++ return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_context_)); + } catch (const ASIO_SYSTEM_ERROR &e) { + restart(); + auto error = std::string("Failed to create resolver: ") + e.what(); +@@ -97,7 +98,7 @@ TcpResolverPtr ExecutorService::createTcpResolver() { + + DeadlineTimerPtr ExecutorService::createDeadlineTimer() { + try { +- return DeadlineTimerPtr(new ASIO::steady_timer(io_service_)); ++ return DeadlineTimerPtr(new ASIO::steady_timer(io_context_)); + } catch (const ASIO_SYSTEM_ERROR &e) { + restart(); + auto error = std::string("Failed to create steady_timer: ") + e.what(); +@@ -105,7 +106,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() { + } + } + +-void ExecutorService::restart() { io_service_.stop(); } ++void ExecutorService::restart() { io_context_.stop(); } + + void ExecutorService::close(long timeoutMs) { + bool expectedState = false; +@@ -113,12 +114,12 @@ void ExecutorService::close(long timeoutMs) { + return; + } + if (timeoutMs == 0) { // non-blocking +- io_service_.stop(); ++ io_context_.stop(); + return; + } + + std::unique_lock<std::mutex> lock{mutex_}; +- io_service_.stop(); ++ io_context_.stop(); + if (timeoutMs > 0) { + cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_; }); + } else { // < 0 +@@ -126,7 +127,7 @@ void ExecutorService::close(long timeoutMs) { + } + } + +-void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); } ++void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_context_, task); } + + ///////////////////// + +diff --git lib/ExecutorService.h lib/ExecutorService.h +index 89d06d30..626cb203 100644 +--- lib/ExecutorService.h ++++ lib/ExecutorService.h +@@ -23,11 +23,11 @@ + + #include <atomic> + #ifdef USE_ASIO +-#include <asio/io_service.hpp> ++#include <asio/io_context.hpp> + #include <asio/ip/tcp.hpp> + #include <asio/ssl.hpp> + #else +-#include <boost/asio/io_service.hpp> ++#include <boost/asio/io_context.hpp> + #include <boost/asio/ip/tcp.hpp> + #include <boost/asio/ssl.hpp> + #endif +@@ -46,7 +46,7 @@ typedef std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &> > TlsSocketPt + typedef std::shared_ptr<ASIO::ip::tcp::resolver> TcpResolverPtr; + class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<ExecutorService> { + public: +- using IOService = ASIO::io_service; ++ using IOService = ASIO::io_context; + using SharedPtr = std::shared_ptr<ExecutorService>; + + static SharedPtr create(); +@@ -67,14 +67,14 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut + // See TimeoutProcessor for the semantics of the parameter. + void close(long timeoutMs = 3000); + +- IOService &getIOService() { return io_service_; } ++ IOService &getIOService() { return io_context_; } + bool isClosed() const noexcept { return closed_; } + + private: + /* +- * io_service is our interface to os, io object schedule async ops on this object ++ * io_context is our interface to os, io object schedule async ops on this object + */ +- IOService io_service_; ++ IOService io_context_; + + std::atomic_bool closed_{false}; + std::mutex mutex_; +diff --git lib/HandlerBase.cc lib/HandlerBase.cc +index 65aa0db1..71902481 100644 +--- lib/HandlerBase.cc ++++ lib/HandlerBase.cc +@@ -50,9 +50,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, + redirectedClusterURI_("") {} + + HandlerBase::~HandlerBase() { +- ASIO_ERROR ignored; +- timer_->cancel(ignored); +- creationTimer_->cancel(ignored); ++ timer_->cancel(); ++ creationTimer_->cancel(); + } + + void HandlerBase::start() { +@@ -61,15 +60,14 @@ void HandlerBase::start() { + if (state_.compare_exchange_strong(state, Pending)) { + grabCnx(); + } +- creationTimer_->expires_from_now(operationTimeut_); ++ creationTimer_->expires_after(operationTimeut_); + std::weak_ptr<HandlerBase> weakSelf{shared_from_this()}; + creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) { + auto self = weakSelf.lock(); + if (self && !error) { + LOG_WARN("Cancel the pending reconnection due to the start timeout"); + connectionFailed(ResultTimeout); +- ASIO_ERROR ignored; +- timer_->cancel(ignored); ++ timer_->cancel(); + } + }); + } +@@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl) + connectionTimeMs_ = + duration_cast<milliseconds>(high_resolution_clock::now() - before).count(); + // Prevent the creationTimer_ from cancelling the timer_ in future +- ASIO_ERROR ignored; +- creationTimer_->cancel(ignored); ++ creationTimer_->cancel(); + LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms") + } else if (isResultRetryable(result)) { + scheduleReconnection(); +@@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const boost::optional<std::string>& assig + TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next(); + + LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s"); +- timer_->expires_from_now(delay); ++ timer_->expires_after(delay); + // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled + // so we will not run into the case where grabCnx is invoked on out of scope handler + auto name = getName(); +diff --git lib/MultiTopicsConsumerImpl.cc lib/MultiTopicsConsumerImpl.cc +index dddade5c..61fbf7b8 100644 +--- lib/MultiTopicsConsumerImpl.cc ++++ lib/MultiTopicsConsumerImpl.cc +@@ -962,7 +962,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() { + return numberOfConnectedConsumer; + } + void MultiTopicsConsumerImpl::runPartitionUpdateTask() { +- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); ++ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_); + auto weakSelf = weak_from_this(); + partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { + // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it +@@ -1115,8 +1115,7 @@ void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { + + void MultiTopicsConsumerImpl::cancelTimers() noexcept { + if (partitionsUpdateTimer_) { +- ASIO_ERROR ec; +- partitionsUpdateTimer_->cancel(ec); ++ partitionsUpdateTimer_->cancel(); + } + } + +diff --git lib/NegativeAcksTracker.cc lib/NegativeAcksTracker.cc +index e443496d..e50b4ca2 100644 +--- lib/NegativeAcksTracker.cc ++++ lib/NegativeAcksTracker.cc +@@ -50,7 +50,7 @@ void NegativeAcksTracker::scheduleTimer() { + return; + } + std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()}; +- timer_->expires_from_now(timerInterval_); ++ timer_->expires_after(timerInterval_); + timer_->async_wait([weakSelf](const ASIO_ERROR &ec) { + if (auto self = weakSelf.lock()) { + self->handleTimer(ec); +@@ -107,8 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) { + + void NegativeAcksTracker::close() { + closed_ = true; +- ASIO_ERROR ec; +- timer_->cancel(ec); ++ timer_->cancel(); + std::lock_guard<std::mutex> lock(mutex_); + nackedMessages_.clear(); + } +diff --git lib/PartitionedProducerImpl.cc lib/PartitionedProducerImpl.cc +index 4178096c..923c038b 100644 +--- lib/PartitionedProducerImpl.cc ++++ lib/PartitionedProducerImpl.cc +@@ -421,7 +421,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) { + + void PartitionedProducerImpl::runPartitionUpdateTask() { + auto weakSelf = weak_from_this(); +- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); ++ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_); + partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); + if (self) { +@@ -524,8 +524,7 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() { + + void PartitionedProducerImpl::cancelTimers() noexcept { + if (partitionsUpdateTimer_) { +- ASIO_ERROR ec; +- partitionsUpdateTimer_->cancel(ec); ++ partitionsUpdateTimer_->cancel(); + } + } + +diff --git lib/PatternMultiTopicsConsumerImpl.cc lib/PatternMultiTopicsConsumerImpl.cc +index 4fc7bb61..07d9a7bc 100644 +--- lib/PatternMultiTopicsConsumerImpl.cc ++++ lib/PatternMultiTopicsConsumerImpl.cc +@@ -48,7 +48,7 @@ const PULSAR_REGEX_NAMESPACE::regex PatternMultiTopicsConsumerImpl::getPattern() + + void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() { + autoDiscoveryRunning_ = false; +- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); ++ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod())); + + auto weakSelf = weak_from_this(); + autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { +@@ -228,7 +228,7 @@ void PatternMultiTopicsConsumerImpl::start() { + LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_."); + + if (conf_.getPatternAutoDiscoveryPeriod() > 0) { +- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); ++ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod())); + auto weakSelf = weak_from_this(); + autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { + if (auto self = weakSelf.lock()) { +@@ -248,7 +248,4 @@ void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { + MultiTopicsConsumerImpl::closeAsync(callback); + } + +-void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { +- ASIO_ERROR ec; +- autoDiscoveryTimer_->cancel(ec); +-} ++void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { autoDiscoveryTimer_->cancel(); } +diff --git lib/PeriodicTask.cc lib/PeriodicTask.cc +index 9fde012a..4b5f9621 100644 +--- lib/PeriodicTask.cc ++++ lib/PeriodicTask.cc +@@ -29,7 +29,7 @@ void PeriodicTask::start() { + state_ = Ready; + if (periodMs_ >= 0) { + std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()}; +- timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); ++ timer_->expires_after(std::chrono::milliseconds(periodMs_)); + timer_->async_wait([weakSelf](const ErrorCode& ec) { + auto self = weakSelf.lock(); + if (self) { +@@ -44,8 +44,7 @@ void PeriodicTask::stop() noexcept { + if (!state_.compare_exchange_strong(state, Closing)) { + return; + } +- ErrorCode ec; +- timer_->cancel(ec); ++ timer_->cancel(); + state_ = Pending; + } + +@@ -59,7 +58,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) { + // state_ may be changed in handleTimeout, so we check state_ again + if (state_ == Ready) { + auto self = shared_from_this(); +- timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); ++ timer_->expires_after(std::chrono::milliseconds(periodMs_)); + timer_->async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); }); + } + } +diff --git lib/ProducerImpl.cc lib/ProducerImpl.cc +index 4399ce5f..8b112bf1 100644 +--- lib/ProducerImpl.cc ++++ lib/ProducerImpl.cc +@@ -570,7 +570,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c + bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg); + bool isFull = batchMessageContainer_->add(msg, callback); + if (isFirstMessage) { +- batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs())); ++ batchTimer_->expires_after(milliseconds(conf_.getBatchingMaxPublishDelayMs())); + auto weakSelf = weak_from_this(); + batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +@@ -1007,9 +1007,8 @@ void ProducerImpl::shutdown() { + + void ProducerImpl::cancelTimers() noexcept { + dataKeyRefreshTask_.stop(); +- ASIO_ERROR ec; +- batchTimer_->cancel(ec); +- sendTimer_->cancel(ec); ++ batchTimer_->cancel(); ++ sendTimer_->cancel(); + } + + bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const { +@@ -1030,7 +1029,7 @@ void ProducerImpl::startSendTimeoutTimer() { + } + + void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) { +- sendTimer_->expires_from_now(expiryTime); ++ sendTimer_->expires_after(expiryTime); + + auto weakSelf = weak_from_this(); *** 171 LINES SKIPPED ***home | help
Want to link to this message? Use this
URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?202503280954.52S9sE3F090365>
