Skip to content

Commit

Permalink
Fix max concurrency of thrift protocol and nshead protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Apr 25, 2024
1 parent bbd88a8 commit 28470bb
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 17 deletions.
13 changes: 7 additions & 6 deletions src/brpc/nshead_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "brpc/controller.h" // Controller
#include "brpc/nshead_message.h" // NsheadMessage
#include "brpc/describable.h"

#include "brpc/adaptive_max_concurrency.h"

namespace brpc {

Expand All @@ -40,7 +40,7 @@ class NsheadClosure : public google::protobuf::Closure {
explicit NsheadClosure(void* additional_space);

// [Required] Call this to send response back to the client.
void Run();
void Run() override;

// [Optional] Set the full method name. If unset, use name of the service.
void SetMethodName(const std::string& full_method_name);
Expand All @@ -59,7 +59,7 @@ class NsheadClosure : public google::protobuf::Closure {
friend void policy::ProcessNsheadRequest(InputMessageBase* msg_base);
friend class DeleteNsheadClosure;
// Only callable by Run().
~NsheadClosure();
~NsheadClosure() override;

const Server* _server;
int64_t _received_us;
Expand All @@ -84,8 +84,8 @@ struct NsheadServiceOptions {
class NsheadService : public Describable {
public:
NsheadService();
NsheadService(const NsheadServiceOptions&);
virtual ~NsheadService();
explicit NsheadService(const NsheadServiceOptions&);
~NsheadService() override;

// Implement this method to handle nshead requests. Notice that this
// method can be called with a failed Controller(something wrong with the
Expand All @@ -104,7 +104,7 @@ class NsheadService : public Describable {
NsheadClosure* done) = 0;

// Put descriptions into the stream.
void Describe(std::ostream &os, const DescribeOptions&) const;
void Describe(std::ostream &os, const DescribeOptions&) const override;

private:
DISALLOW_COPY_AND_ASSIGN(NsheadService);
Expand All @@ -118,6 +118,7 @@ friend class Server;

// Tracking status of non NsheadPbService
MethodStatus* _status;
AdaptiveMaxConcurrency _max_concurrency;
size_t _additional_space;
std::string _cached_name;
};
Expand Down
47 changes: 38 additions & 9 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,8 @@ static int get_port_from_fd(int fd) {
return ntohs(addr.sin_port);
}

static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out) {
bool Server::CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out) {
if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) {
*out = NULL;
return true;
Expand Down Expand Up @@ -1040,6 +1040,15 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
it->second.status->SetConcurrencyLimiter(cl);
}
}
if (0 != SetServiceMaxConcurrency(_options.nshead_service)) {
return -1;
}
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
if (0 != SetServiceMaxConcurrency(_options.thrift_service)) {
return -1;
}
#endif


// Create listening ports
if (port_range.min_port > port_range.max_port) {
Expand Down Expand Up @@ -2206,13 +2215,33 @@ int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
}

AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) {
MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_max_concurrency_of_method;
}
return MaxConcurrencyOf(mp);
do {
if (full_method_name == butil::class_name_str<NsheadService>()) {
if (NULL == options().nshead_service) {
break;
}
return options().nshead_service->_max_concurrency;
}
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
if (full_method_name == butil::class_name_str<ThriftService>()) {
if (NULL == options().thrift_service) {
break;
}
return options().thrift_service->_max_concurrency;
}
#endif

MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
break;
}
return MaxConcurrencyOf(mp);

} while (false);

LOG(ERROR) << "Fail to find method=" << full_method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_max_concurrency_of_method;
}

int Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) const {
Expand Down
21 changes: 21 additions & 0 deletions src/brpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "brpc/http2.h"
#include "brpc/redis.h"
#include "brpc/interceptor.h"
#include "brpc/concurrency_limiter.h"

namespace brpc {

Expand Down Expand Up @@ -674,6 +675,26 @@ friend class Controller;
AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*);
int MaxConcurrencyOf(const MethodProperty*) const;

static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out);

template <typename T>
int SetServiceMaxConcurrency(T* service) {
if (NULL != service) {
const AdaptiveMaxConcurrency* amc = &service->_max_concurrency;
if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
amc = &_options.method_max_concurrency;
}
ConcurrencyLimiter* cl = NULL;
if (!CreateConcurrencyLimiter(*amc, &cl)) {
LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
return -1;
}
service->_status->SetConcurrencyLimiter(cl);
}
return 0;
}

DISALLOW_COPY_AND_ASSIGN(Server);

// Put frequently-accessed data pool at first.
Expand Down
6 changes: 4 additions & 2 deletions src/brpc/thrift_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "brpc/controller.h" // Controller
#include "brpc/thrift_message.h" // ThriftFramedMessage
#include "brpc/describable.h"
#include "brpc/adaptive_max_concurrency.h"

namespace brpc {

Expand All @@ -38,7 +39,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base);
class ThriftService : public Describable {
public:
ThriftService();
virtual ~ThriftService();
~ThriftService() override;

// Implement this method to handle thrift_binary requests.
// Parameters:
Expand All @@ -53,7 +54,7 @@ class ThriftService : public Describable {
::google::protobuf::Closure* done) = 0;

// Put descriptions into the stream.
void Describe(std::ostream &os, const DescribeOptions&) const;
void Describe(std::ostream &os, const DescribeOptions&) const override;

private:
DISALLOW_COPY_AND_ASSIGN(ThriftService);
Expand All @@ -66,6 +67,7 @@ friend class Server;
void Expose(const butil::StringPiece& prefix);

MethodStatus* _status;
AdaptiveMaxConcurrency _max_concurrency;
};

} // namespace brpc
Expand Down

0 comments on commit 28470bb

Please sign in to comment.