Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UDP group #480

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ int main(int argc, char** argv) {
- TCP客户端: [evpp/TcpClient_test.cpp](evpp/TcpClient_test.cpp)
- UDP服务端: [evpp/UdpServer_test.cpp](evpp/UdpServer_test.cpp)
- UDP客户端: [evpp/UdpClient_test.cpp](evpp/UdpClient_test.cpp)
- UDP组播接收:[evpp/UdpGroupDest_test.cpp](evpp/UdpGroupDest_test.cpp)
- HTTP服务端: [examples/http_server_test.cpp](examples/http_server_test.cpp)
- HTTP客户端: [examples/http_client_test.cpp](examples/http_client_test.cpp)
- WebSocket服务端: [examples/websocket_server_test.cpp](examples/websocket_server_test.cpp)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ int main(int argc, char** argv) {
- [evpp/TcpClient_test.cpp](evpp/TcpClient_test.cpp)
- [evpp/UdpServer_test.cpp](evpp/UdpServer_test.cpp)
- [evpp/UdpClient_test.cpp](evpp/UdpClient_test.cpp)
- [evpp/UdpGroupDest_test.cpp](evpp/UdpGroupDest_test.cpp)
- [examples/http_server_test.cpp](examples/http_server_test.cpp)
- [examples/http_client_test.cpp](examples/http_client_test.cpp)
- [examples/websocket_server_test.cpp](examples/websocket_server_test.cpp)
Expand Down
25 changes: 25 additions & 0 deletions base/hsocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,31 @@ HV_INLINE int udp_broadcast(int sockfd, int on DEFAULT(1)) {
return setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, (const char*)&on, sizeof(int));
}

HV_INLINE int udp_joingroupv4(int sockfd, const char* group, const char* local_host) {
struct ip_mreq mreq;
mreq.imr_multiaddr.s_addr = inet_addr(group);
mreq.imr_interface.s_addr = inet_addr(local_host);
return setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mreq, sizeof(mreq));
}
HV_INLINE int udp_leavegroupv4(int sockfd, const char* group, const char* local_host) {
struct ip_mreq mreq;
mreq.imr_multiaddr.s_addr = inet_addr(group);
mreq.imr_interface.s_addr = inet_addr(local_host);
return setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const char*)&mreq, sizeof(mreq));
}
HV_INLINE int udp_joingroupv6(int sockfd, const char* group, ULONG interface) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ULONG是windows平台下特有的类型定义,其它平台没有,CI已经报错了

struct ipv6_mreq mreq;
mreq.ipv6mr_interface = interface;
inet_pton(AF_INET6, group, &mreq.ipv6mr_multiaddr);
return setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, (const char*)&mreq, sizeof(mreq));
}
HV_INLINE int udp_leavegroupv6(int sockfd, const char* group, ULONG interface) {
struct ipv6_mreq mreq;
mreq.ipv6mr_interface = interface;
inet_pton(AF_INET6, group, &mreq.ipv6mr_multiaddr);
return setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, (const char*)&mreq, sizeof(mreq));
}

HV_INLINE int ip_v6only(int sockfd, int on DEFAULT(1)) {
#ifdef IPV6_V6ONLY
return setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&on, sizeof(int));
Expand Down
4 changes: 2 additions & 2 deletions evpp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ hloop.h中的c接口被封装成了c++的类,参考了muduo和evpp。
├── TcpClient.h TCP客户端类
├── TcpServer.h TCP服务端类
├── UdpClient.h UDP客户端类
── UdpServer.h UDP服务端类

── UdpServer.h UDP服务端类
└── UdpGroup.h UDP组播类
```
170 changes: 170 additions & 0 deletions evpp/UdpGroup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#ifndef HV_UDP_GROUP_HPP_
#define HV_UDP_GROUP_HPP_
#include "hsocket.h"

#include "EventLoopThreadPool.h"
#include "Channel.h"

namespace hv {

template<class TSocketChannel = SocketChannel>
class UdpGroupEventLoopTmpl {
public:
typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;

UdpGroupEventLoopTmpl(EventLoopPtr loop = NULL) {
loop_ = loop ? loop : std::make_shared<EventLoop>();
port = 0;
#if WITH_KCP
kcp_setting = NULL;
#endif
}

virtual ~UdpGroupEventLoopTmpl() {
#if WITH_KCP
HV_FREE(kcp_setting);
#endif
}

const EventLoopPtr& loop() {
return loop_;
}

//use createsocket() for server OR use createsocketRemote() for client
//@retval >=0 bindfd, <0 error
int createsocket(int port, const char* host = "0.0.0.0") {
hio_t* io = hloop_create_udp_server(loop_->loop(), host, port);
if (io == NULL) return -1;
this->host = host;
this->port = port;
channel = std::make_shared<TSocketChannel>(io);
return channel->fd();
}

// join group
int joinGroup(const char* g){
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看了下,就添加了joinGroup、leaveGroup两个方法,其它代码都是拷贝自UdpServer,不如直接加在UdpServer里,没必要单独搞个UdpGroup类了吧

if(channel == NULL || channel->isClosed()){
return -1;
}
return udp_joingroupv4(channel->fd(), g, host.c_str());
}
// leave group
int leaveGroup(const char* g){
if(channel == NULL || channel->isClosed()){
return -1;
}
return udp_leavegroupv4(channel->fd(), g, host.c_str());
}

// closesocket thread-safe
void closesocket() {
if (channel) {
channel->close(true);
}
}

int startRecv() {
if (channel == NULL || channel->isClosed()) {
return -1;
}
channel->onread = [this](Buffer* buf) {
if (onMessage) {
onMessage(channel, buf);
}
};
#if WITH_KCP
if (kcp_setting) {
hio_set_kcp(channel->io(), kcp_setting);
}
#endif
return channel->startRead();
}

int stopRecv() {
if (channel == NULL) return -1;
return channel->stopRead();
}

// start thread-safe
void start() {
loop_->runInLoop(std::bind(&UdpGroupEventLoopTmpl::startRecv, this));
}

#if WITH_KCP
void setKcp(kcp_setting_t* setting) {
if (setting == NULL) {
HV_FREE(kcp_setting);
return;
}
if (kcp_setting == NULL) {
HV_ALLOC_SIZEOF(kcp_setting);
}
*kcp_setting = *setting;
}
#endif

public:
std::string host;
int port;
TSocketChannelPtr channel;
#if WITH_KCP
kcp_setting_t* kcp_setting;
#endif
// Callback
std::function<void(const TSocketChannelPtr&, Buffer*)> onMessage;
private:
EventLoopPtr loop_;
};

template<class TSocketChannel = SocketChannel>
class UdpGroupTmpl : private EventLoopThread, public UdpGroupEventLoopTmpl<TSocketChannel> {
public:
UdpGroupTmpl(EventLoopPtr loop = NULL)
: EventLoopThread(loop)
, UdpGroupEventLoopTmpl<TSocketChannel>(EventLoopThread::loop())
, is_loop_owner(loop == NULL)
{}
virtual ~UdpGroupTmpl() {
stop(true);
}

const EventLoopPtr& loop() {
return EventLoopThread::loop();
}

// join group
int joinGroup(const char* g){
return UdpGroupEventLoopTmpl<TSocketChannel>::joinGroup(g);
}

// leave group
int leaveGroup(const char* g){
return UdpGroupEventLoopTmpl<TSocketChannel>::leaveGroup(g);
}

// start thread-safe
void start(bool wait_threads_started = true) {
if (isRunning()) {
UdpGroupEventLoopTmpl<TSocketChannel>::start();
} else {
EventLoopThread::start(wait_threads_started, std::bind(&UdpGroupTmpl::startRecv, this));
}
}

// stop thread-safe
void stop(bool wait_threads_stopped = true) {
UdpGroupEventLoopTmpl<TSocketChannel>::closesocket();
if (is_loop_owner) {
EventLoopThread::stop(wait_threads_stopped);
}
}

private:
bool is_loop_owner;
};

typedef UdpGroupTmpl<SocketChannel> UdpGroup;

}

#endif // HV_UDP_GROUP_HPP_
57 changes: 57 additions & 0 deletions evpp/UdpGroupDest_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* UdpGroupDest_test.cpp
*
* @build make evpp
* @client bin/UdpClient_test 9997 230.1.1.25
* @server bin/UdpGroupDest_test 9997 230.1.1.25
*
*/

#include <iostream>

#include "UdpGroup.h"

using namespace hv;

int main(int argc, char* argv[]) {
if (argc < 2) {
printf("Usage: %s port\n", argv[0]);
return -10;
}
int port = atoi(argv[1]);
const char* group = "230.1.1.25";
if (argc > 2) {
group = argv[2];
}

UdpGroup ug;
int bindfd = ug.createsocket(port);
if (bindfd < 0) {
return -20;
}
int rst = ug.joinGroup(group);
printf("udp group bind on port %d, bindfd=%d rst=%d ...\n", port, bindfd, rst);
ug.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
// echo
printf(" >>> %.*s\n", (int)buf->size(), (char*)buf->data());
};
ug.start();

std::string str;
while (std::getline(std::cin, str)) {
if (str == "close") {
ug.closesocket();
break;
} else if (str == "leave") {
int c = ug.leaveGroup(group);
printf("leave group rst=%d\n", c);
} else if (str == "join") {
int c = ug.joinGroup(group);
printf("join group rst=%d\n", c);
} else {
break;
}
}

return 0;
}