Skip to content

Commit

Permalink
allow disabling send side zerocopy for rdma
Browse files Browse the repository at this point in the history
  • Loading branch information
Tuvie committed Jun 2, 2023
1 parent 76d9390 commit 9f7bce0
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 18 deletions.
8 changes: 7 additions & 1 deletion example/rdma_performance/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ DEFINE_int32(expected_qps, 0, "The expected QPS");
DEFINE_int32(max_thread_num, 16, "The max number of threads are used");
DEFINE_int32(attachment_size, -1, "Attachment size is used (in Bytes)");
DEFINE_bool(echo_attachment, false, "Select whether attachment should be echo");
DEFINE_bool(attachment_as_userdata, false, "Append attachment as user_data");
DEFINE_string(connection_type, "single", "Connection type of the channel");
DEFINE_string(protocol, "baidu_std", "Protocol type.");
DEFINE_string(servers, "0.0.0.0:8002+0.0.0.0:8002", "IP Address of servers");
Expand Down Expand Up @@ -86,7 +87,12 @@ class PerformanceTest {
if (attachment_size > 0) {
_addr = malloc(attachment_size);
butil::fast_rand_bytes(_addr, attachment_size);
_attachment.append(_addr, attachment_size);
if (FLAGS_attachment_as_userdata) {
brpc::rdma::RegisterMemoryForRdma(_addr, attachment_size);
_attachment.append_user_data(_addr, attachment_size, NULL);
} else {
_attachment.append(_addr, attachment_size);
}
}
_echo_attachment = echo_attachment;
}
Expand Down
46 changes: 35 additions & 11 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ extern bool g_skip_rdma_init;

DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
DEFINE_bool(rdma_send_zerocopy, true, "Enable zerocopy for send side");
DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
Expand Down Expand Up @@ -801,29 +802,45 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
wr.sg_list = sglist;
wr.opcode = IBV_WR_SEND_WITH_IMM;

RdmaIOBuf* data = (RdmaIOBuf*)from[current];
size_t sge_index = 0;
while (sge_index < (uint32_t)max_sge &&
this_len < _remote_recv_block_size) {
if (data->size() == 0) {
if (from[current]->size() == 0) {
// The current IOBuf is empty, find next one
++current;
if (current == ndata) {
break;
}
data = (RdmaIOBuf*)from[current];
continue;
}

ssize_t len = data->cut_into_sglist_and_iobuf(
sglist, &sge_index, to, max_sge,
_remote_recv_block_size - this_len);
if (len < 0) {
return -1;
ssize_t len = 0;
if (FLAGS_rdma_send_zerocopy) {
ssize_t len = ((RdmaIOBuf*)from[current])->cut_into_sglist_and_iobuf(
sglist, &sge_index, to, max_sge,
_remote_recv_block_size - this_len);
if (len < 0) {
return -1;
}
this_len += len;
total_len += len;
} else {
len = _remote_recv_block_size - this_len;
void* buf = AllocBlock(len);
if (!buf) {
return -1;
}
len = from[current]->copy_to(buf, len);
from[current]->cutn(to, len);
sglist[sge_index].length = len;
sglist[sge_index].addr = (uint64_t)buf;
sglist[sge_index].lkey = GetLKey(buf);
++sge_index;
this_len += len;
total_len += len;
_sbuf_data[_sq_current] = buf;
break;
}
CHECK(len > 0);
this_len += len;
total_len += len;
}
if (this_len == 0) {
continue;
Expand Down Expand Up @@ -951,6 +968,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
uint32_t acks = butil::NetToHost32(wc.imm_data);
uint32_t num = acks;
while (num > 0) {
if (!FLAGS_rdma_send_zerocopy) {
DeallocBlock(_sbuf_data[_sq_sent]);
}
_sbuf[_sq_sent++].clear();
if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
_sq_sent = 0;
Expand Down Expand Up @@ -1139,6 +1159,10 @@ int RdmaEndpoint::AllocateResources() {
if (_rbuf.size() != _rq_size) {
return -1;
}
_sbuf_data.resize(_sq_size, NULL);
if (_sbuf_data.size() != _sq_size) {
return -1;
}
_rbuf_data.resize(_rq_size, NULL);
if (_rbuf_data.size() != _rq_size) {
return -1;
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/rdma/rdma_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ friend class brpc::Socket;
// Act as sendbuf and recvbuf, but requires no memcpy
std::vector<butil::IOBuf> _sbuf;
std::vector<butil::IOBuf> _rbuf;
// Data address of _sbuf
std::vector<void*> _sbuf_data;
// Data address of _rbuf
std::vector<void*> _rbuf_data;
// Remote block size for receiving
Expand Down
13 changes: 8 additions & 5 deletions src/brpc/rdma/rdma_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,12 +643,15 @@ ibv_pd* GetRdmaPd() {
}

uint32_t GetLKey(void* buf) {
BAIDU_SCOPED_LOCK(*g_user_mrs_lock);
ibv_mr** mr_ptr = g_user_mrs->seek(buf);
if (mr_ptr) {
return (*mr_ptr)->lkey;
uint32_t lkey = GetRegionId(buf);
if (lkey == 0) {
BAIDU_SCOPED_LOCK(*g_user_mrs_lock);
ibv_mr** mr_ptr = g_user_mrs->seek(buf);
if (mr_ptr) {
return (*mr_ptr)->lkey;
}
}
return 0;
return lkey;
}

ibv_gid GetRdmaGid() {
Expand Down
7 changes: 6 additions & 1 deletion test/brpc_rdma_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct HelloMessage {
};

DECLARE_bool(rdma_trace_verbose);
DECLARE_bool(rdma_send_zerocopy);
DECLARE_int32(rdma_memory_pool_max_regions);
extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
extern int (*IbvDestroyCq)(ibv_cq*);
Expand Down Expand Up @@ -1873,7 +1874,11 @@ TEST_F(RdmaTest, send_rpcs_with_user_defined_iobuf) {
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[0], &req[0], &res[0], done);
bthread_id_join(cntl[0].call_id());
ASSERT_EQ(ERDMAMEM, cntl[0].ErrorCode());
if (rdma::FLAGS_rdma_send_zerocopy) {
ASSERT_EQ(ERDMAMEM, cntl[0].ErrorCode());
} else {
ASSERT_EQ(0, cntl[0].ErrorCode());
}
attach.clear();
sleep(2); // wait for client recover from EHOSTDOWN
cntl[0].Reset();
Expand Down

0 comments on commit 9f7bce0

Please sign in to comment.