Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix max concurrency of thrift protocol and nshead protocol #2613

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/brpc/nshead_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "brpc/controller.h" // Controller
#include "brpc/nshead_message.h" // NsheadMessage
#include "brpc/describable.h"

#include "brpc/adaptive_max_concurrency.h"

namespace brpc {

Expand All @@ -40,7 +40,7 @@ class NsheadClosure : public google::protobuf::Closure {
explicit NsheadClosure(void* additional_space);

// [Required] Call this to send response back to the client.
void Run();
void Run() override;

// [Optional] Set the full method name. If unset, use name of the service.
void SetMethodName(const std::string& full_method_name);
Expand All @@ -59,7 +59,7 @@ class NsheadClosure : public google::protobuf::Closure {
friend void policy::ProcessNsheadRequest(InputMessageBase* msg_base);
friend class DeleteNsheadClosure;
// Only callable by Run().
~NsheadClosure();
~NsheadClosure() override;

const Server* _server;
int64_t _received_us;
Expand All @@ -84,8 +84,8 @@ struct NsheadServiceOptions {
class NsheadService : public Describable {
public:
NsheadService();
NsheadService(const NsheadServiceOptions&);
virtual ~NsheadService();
explicit NsheadService(const NsheadServiceOptions&);
~NsheadService() override;

// Implement this method to handle nshead requests. Notice that this
// method can be called with a failed Controller(something wrong with the
Expand All @@ -104,7 +104,7 @@ class NsheadService : public Describable {
NsheadClosure* done) = 0;

// Put descriptions into the stream.
void Describe(std::ostream &os, const DescribeOptions&) const;
void Describe(std::ostream &os, const DescribeOptions&) const override;

private:
DISALLOW_COPY_AND_ASSIGN(NsheadService);
Expand All @@ -118,6 +118,7 @@ friend class Server;

// Tracking status of non NsheadPbService
MethodStatus* _status;
AdaptiveMaxConcurrency _max_concurrency;
size_t _additional_space;
std::string _cached_name;
};
Expand Down
47 changes: 38 additions & 9 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,8 @@ static int get_port_from_fd(int fd) {
return ntohs(addr.sin_port);
}

static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out) {
bool Server::CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out) {
if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) {
*out = NULL;
return true;
Expand Down Expand Up @@ -1040,6 +1040,15 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
it->second.status->SetConcurrencyLimiter(cl);
}
}
if (0 != SetServiceMaxConcurrency(_options.nshead_service)) {
return -1;
}
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
if (0 != SetServiceMaxConcurrency(_options.thrift_service)) {
return -1;
}
#endif


// Create listening ports
if (port_range.min_port > port_range.max_port) {
Expand Down Expand Up @@ -2206,13 +2215,33 @@ int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
}

AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) {
MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_max_concurrency_of_method;
}
return MaxConcurrencyOf(mp);
do {
if (full_method_name == butil::class_name_str<NsheadService>()) {
if (NULL == options().nshead_service) {
break;
}
return options().nshead_service->_max_concurrency;
}
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
if (full_method_name == butil::class_name_str<ThriftService>()) {
if (NULL == options().thrift_service) {
break;
}
return options().thrift_service->_max_concurrency;
}
#endif

MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
break;
}
return MaxConcurrencyOf(mp);

} while (false);

LOG(ERROR) << "Fail to find method=" << full_method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_max_concurrency_of_method;
}

int Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) const {
Expand Down
21 changes: 21 additions & 0 deletions src/brpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "brpc/http2.h"
#include "brpc/redis.h"
#include "brpc/interceptor.h"
#include "brpc/concurrency_limiter.h"

namespace brpc {

Expand Down Expand Up @@ -674,6 +675,26 @@ friend class Controller;
AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*);
int MaxConcurrencyOf(const MethodProperty*) const;

static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out);

template <typename T>
int SetServiceMaxConcurrency(T* service) {
if (NULL != service) {
const AdaptiveMaxConcurrency* amc = &service->_max_concurrency;
if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
amc = &_options.method_max_concurrency;
}
ConcurrencyLimiter* cl = NULL;
if (!CreateConcurrencyLimiter(*amc, &cl)) {
LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
return -1;
}
service->_status->SetConcurrencyLimiter(cl);
}
return 0;
}

DISALLOW_COPY_AND_ASSIGN(Server);

// Put frequently-accessed data pool at first.
Expand Down
6 changes: 4 additions & 2 deletions src/brpc/thrift_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "brpc/controller.h" // Controller
#include "brpc/thrift_message.h" // ThriftFramedMessage
#include "brpc/describable.h"
#include "brpc/adaptive_max_concurrency.h"

namespace brpc {

Expand All @@ -38,7 +39,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base);
class ThriftService : public Describable {
public:
ThriftService();
virtual ~ThriftService();
~ThriftService() override;

// Implement this method to handle thrift_binary requests.
// Parameters:
Expand All @@ -53,7 +54,7 @@ class ThriftService : public Describable {
::google::protobuf::Closure* done) = 0;

// Put descriptions into the stream.
void Describe(std::ostream &os, const DescribeOptions&) const;
void Describe(std::ostream &os, const DescribeOptions&) const override;

private:
DISALLOW_COPY_AND_ASSIGN(ThriftService);
Expand All @@ -66,6 +67,7 @@ friend class Server;
void Expose(const butil::StringPiece& prefix);

MethodStatus* _status;
AdaptiveMaxConcurrency _max_concurrency;
};

} // namespace brpc
Expand Down