Skip to content

Commit

Permalink
allow disabling send side zerocopy for rdma
Browse files Browse the repository at this point in the history
  • Loading branch information
Tuvie committed Jun 1, 2023
1 parent 76d9390 commit 726ee54
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
46 changes: 35 additions & 11 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ extern bool g_skip_rdma_init;

DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
DEFINE_bool(rdma_send_zerocopy, true, "Enable zerocopy for send side");
DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
Expand Down Expand Up @@ -801,29 +802,45 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
wr.sg_list = sglist;
wr.opcode = IBV_WR_SEND_WITH_IMM;

RdmaIOBuf* data = (RdmaIOBuf*)from[current];
size_t sge_index = 0;
while (sge_index < (uint32_t)max_sge &&
this_len < _remote_recv_block_size) {
if (data->size() == 0) {
if (from[current]->size() == 0) {
// The current IOBuf is empty, find next one
++current;
if (current == ndata) {
break;
}
data = (RdmaIOBuf*)from[current];
continue;
}

ssize_t len = data->cut_into_sglist_and_iobuf(
sglist, &sge_index, to, max_sge,
_remote_recv_block_size - this_len);
if (len < 0) {
return -1;
ssize_t len = 0;
if (FLAGS_rdma_send_zerocopy) {
ssize_t len = ((RdmaIOBuf*)from[current])->cut_into_sglist_and_iobuf(
sglist, &sge_index, to, max_sge,
_remote_recv_block_size - this_len);
if (len < 0) {
return -1;
}
this_len += len;
total_len += len;
} else {
len = _remote_recv_block_size - this_len;
void* buf = AllocBlock(len);
if (!buf) {
return -1;
}
len = from[current]->copy_to(buf, len);
from[current]->cutn(to, len);
sglist[sge_index].length = len;
sglist[sge_index].addr = (uint64_t)buf;
sglist[sge_index].lkey = GetLKey(buf);
++sge_index;
this_len += len;
total_len += len;
_sbuf_data[_sq_current] = buf;
break;
}
CHECK(len > 0);
this_len += len;
total_len += len;
}
if (this_len == 0) {
continue;
Expand Down Expand Up @@ -951,6 +968,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
uint32_t acks = butil::NetToHost32(wc.imm_data);
uint32_t num = acks;
while (num > 0) {
if (!FLAGS_rdma_send_zerocopy) {
DeallocBlock(_sbuf_data[_sq_sent]);
}
_sbuf[_sq_sent++].clear();
if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
_sq_sent = 0;
Expand Down Expand Up @@ -1139,6 +1159,10 @@ int RdmaEndpoint::AllocateResources() {
if (_rbuf.size() != _rq_size) {
return -1;
}
_sbuf_data.resize(_sq_size, NULL);
if (_sbuf_data.size() != _sq_size) {
return -1;
}
_rbuf_data.resize(_rq_size, NULL);
if (_rbuf_data.size() != _rq_size) {
return -1;
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/rdma/rdma_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ friend class brpc::Socket;
// Act as sendbuf and recvbuf, but requires no memcpy
std::vector<butil::IOBuf> _sbuf;
std::vector<butil::IOBuf> _rbuf;
// Data address of _sbuf
std::vector<void*> _sbuf_data;
// Data address of _rbuf
std::vector<void*> _rbuf_data;
// Remote block size for receiving
Expand Down

0 comments on commit 726ee54

Please sign in to comment.