Skip to content

Commit

Permalink
Support start KeepWrite bthread urgent
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Apr 6, 2024
1 parent ca92daa commit 33d7312
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
wopt.auth_flags = _auth_flags;
wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED);
wopt.write_in_background = write_to_socket_in_background();
wopt.keep_write_urgent = keep_write_urgent();
int rc;
size_t packet_size = 0;
if (user_packet_guard) {
Expand Down
6 changes: 6 additions & 0 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20);
static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21);
static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22);
static const uint32_t FLAGS_KEEP_WRITE_URGENT = (1 << 23);

public:
struct Inheritable {
Expand Down Expand Up @@ -388,6 +389,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
void set_write_to_socket_in_background(bool f) { set_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND, f); }
bool write_to_socket_in_background() const { return has_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND); }

// Create a KEEPWRITE bthread to write to socket for
// requests or responses of RPCs.
void set_keep_write_urgent(bool f) { set_flag(FLAGS_KEEP_WRITE_URGENT, f); }
bool keep_write_urgent() const { return has_flag(FLAGS_KEEP_WRITE_URGENT); }

// ------------------------------------------------------------------------
// Server-side methods.
// These calls shall be made from the server side only. Their results are
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ void SendRpcResponse(int64_t correlation_id,
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = cntl->keep_write_urgent();
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ HttpResponseSender::~HttpResponseSender() {
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = _cntl->keep_write_urgent();
if (is_http2) {
if (is_grpc) {
// Append compressed and length before body
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/hulu_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ static void SendHuluResponse(int64_t correlation_id,
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = cntl->keep_write_urgent();
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/mongo_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ void SendMongoResponse::Run() {
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = cntl.keep_write_urgent();
if (socket->Write(&res_buf, &wopt) != 0) {
PLOG(WARNING) << "Fail to write into " << *socket;
return;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/nshead_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ void NsheadClosure::Run() {
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = _controller.keep_write_urgent();
if (sock->Write(&write_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/sofa_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ static void SendSofaResponse(int64_t correlation_id,
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = cntl->keep_write_urgent();
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
Expand Down
8 changes: 6 additions & 2 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1737,8 +1737,12 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
KEEPWRITE_IN_BACKGROUND:
ReAddress(&ptr_for_keep_write);
req->socket = ptr_for_keep_write.release();
if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
KeepWrite, req) != 0) {
if (opt.keep_write_urgent) {
ret = bthread_start_urgent(&th, &BTHREAD_ATTR_NORMAL, KeepWrite, req);
} else {
ret = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, KeepWrite, req);
}
if (ret != 0) {
LOG(FATAL) << "Fail to start KeepWrite";
KeepWrite(req);
}
Expand Down
8 changes: 7 additions & 1 deletion src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,16 @@ friend class policy::H2GlobalStreamCreator;
// performance.
bool write_in_background;

// KeepWrite with `bthread_start_urgent' or `bthread_start_background'.
// Default: false
bool keep_write_urgent;

WriteOptions()
: id_wait(INVALID_BTHREAD_ID), abstime(NULL)
, pipelined_count(0), auth_flags(0)
, ignore_eovercrowded(false), write_in_background(false) {}
, ignore_eovercrowded(false)
, write_in_background(false)
, keep_write_urgent(false) {}
};
int Write(butil::IOBuf *msg, const WriteOptions* options = NULL);

Expand Down

0 comments on commit 33d7312

Please sign in to comment.