Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature: Inherit RPC socket from systemd. #1910

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ ARIA2_ARG_WITHOUT([libz])
ARIA2_ARG_WITH([tcmalloc])
ARIA2_ARG_WITH([jemalloc])
ARIA2_ARG_WITHOUT([libssh2])
ARIA2_ARG_WITHOUT([systemd])

ARIA2_ARG_DISABLE([ssl])
ARIA2_ARG_DISABLE([bittorrent])
Expand Down Expand Up @@ -494,6 +495,19 @@ if test "x$with_libssh2" = "xyes"; then
fi
fi

have_systemd=no
if test "x$with_systemd" = "xyes"; then
PKG_CHECK_MODULES([SYSTEMD], [libsystemd], [have_systemd=yes], [have_systemd=no])
if test "x$have_systemd" = "xyes"; then
AC_DEFINE([HAVE_SYSTEMD], [1], [Define to 1 if you have systemd.])
else
AC_MSG_WARN([$SYSTEMD_PKG_ERRORS])
if test "x$with_systemd_requested" = "xyes"; then
ARIA2_DEP_NOT_MET([systemd])
fi
fi
fi

have_libcares=no
if test "x$with_libcares" = "xyes"; then
PKG_CHECK_MODULES([LIBCARES], [libcares >= 1.7.0], [have_libcares=yes],
Expand Down Expand Up @@ -645,6 +659,9 @@ AM_CONDITIONAL([HAVE_SQLITE3], [test "x$have_sqlite3" = "xyes"])
# Set conditional for libssh2
AM_CONDITIONAL([HAVE_LIBSSH2], [test "x$have_libssh2" = "xyes"])

# Set conditional for systemd
AM_CONDITIONAL([HAVE_SYSTEMD], [test "x$have_systemd" = "xyes"])

case "$host" in
*solaris*)
save_LIBS=$LIBS
Expand Down Expand Up @@ -1112,6 +1129,7 @@ LibExpat: $have_libexpat (CFLAGS='$EXPAT_CFLAGS' LIBS='$EXPAT_LIBS')
LibCares: $have_libcares (CFLAGS='$LIBCARES_CFLAGS' LIBS='$LIBCARES_LIBS')
Zlib: $have_zlib (CFLAGS='$ZLIB_CFLAGS' LIBS='$ZLIB_LIBS')
Libssh2: $have_libssh2 (CFLAGS='$LIBSSH2_CFLAGS' LIBS='$LIBSSH2_LIBS')
Systemd: $have_systemd (CFLAGS='$SYSTEMD_CFLAGS' LIBS='$SYSTEMD_LIBS')
Tcmalloc: $have_tcmalloc (CFLAGS='$TCMALLOC_CFLAGS' LIBS='$TCMALLOC_LIBS')
Jemalloc: $have_jemalloc (CFLAGS='$JEMALLOC_CFLAGS' LIBS='$JEMALLOC_LIBS')
Epoll: $have_epoll
Expand Down
36 changes: 28 additions & 8 deletions src/DownloadEngineFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
#include "DownloadEngineFactory.h"

#include <algorithm>
#ifdef HAVE_SYSTEMD
#include <systemd/sd-daemon.h>
#endif // HAVE_SYSTEMD

#include "Option.h"
#include "RequestGroup.h"
Expand Down Expand Up @@ -205,14 +208,31 @@ std::unique_ptr<DownloadEngine> DownloadEngineFactory::newDownloadEngine(
if (secure) {
A2_LOG_NOTICE("RPC transport will be encrypted.");
}
static int families[] = {AF_INET, AF_INET6};
size_t familiesLength = op->getAsBool(PREF_DISABLE_IPV6) ? 1 : 2;
for (size_t i = 0; i < familiesLength; ++i) {
auto httpListenCommand = make_unique<HttpListenCommand>(
e->newCUID(), e.get(), families[i], secure);
if (httpListenCommand->bindPort(op->getAsInt(PREF_RPC_LISTEN_PORT))) {
e->addCommand(std::move(httpListenCommand));
ok = true;
#ifdef HAVE_SYSTEMD
int fds = sd_listen_fds(1);
if (0 < fds) {
for (int fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + fds; fd++) {
auto httpListenCommand = make_unique<HttpListenCommand>(
e->newCUID(), e.get(), fd, secure);
if (httpListenCommand->listen()) {
e->addCommand(std::move(httpListenCommand));
ok = true;
}
}
} else
#endif // HAVE_SYSTEMD
{
static int families[] = {AF_INET, AF_INET6};
size_t familiesLength = op->getAsBool(PREF_DISABLE_IPV6) ? 1 : 2;
for (size_t i = 0; i < familiesLength; ++i) {
auto httpListenCommand = make_unique<HttpListenCommand>(
e->newCUID(), e.get(), families[i],
op->getAsInt(PREF_RPC_LISTEN_PORT),
secure);
if (httpListenCommand->listen()) {
e->addCommand(std::move(httpListenCommand));
ok = true;
}
}
}
if (!ok) {
Expand Down
33 changes: 23 additions & 10 deletions src/HttpListenCommand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,14 @@
namespace aria2 {

HttpListenCommand::HttpListenCommand(cuid_t cuid, DownloadEngine* e, int family,
uint16_t port, bool secure)
: Command(cuid), e_(e), fd_(-1), family_(family), port_(port), secure_(secure)
{
}

HttpListenCommand::HttpListenCommand(cuid_t cuid, DownloadEngine* e, int fd,
bool secure)
: Command(cuid), e_(e), family_(family), secure_(secure)
: Command(cuid), e_(e), fd_(fd), family_(AF_INET), port_(0), secure_(secure)
{
}

Expand Down Expand Up @@ -89,27 +95,34 @@ bool HttpListenCommand::execute()
return false;
}

bool HttpListenCommand::bindPort(uint16_t port)
bool HttpListenCommand::listen()
{
if (serverSocket_) {
e_->deleteSocketForReadCheck(serverSocket_, this);
}
serverSocket_ = std::make_shared<SocketCore>();
const int ipv = (family_ == AF_INET) ? 4 : 6;
std::string sListen;
try {
int flags = 0;
if (e_->getOption()->getAsBool(PREF_RPC_LISTEN_ALL)) {
flags = AI_PASSIVE;
if (fd_ < 0) {
int flags = 0;
const int ipv = (family_ == AF_INET) ? 4 : 6;
sListen = fmt("TCP/IPv%d port %u", ipv, port_);
if (e_->getOption()->getAsBool(PREF_RPC_LISTEN_ALL)) {
flags = AI_PASSIVE;
}
serverSocket_->bind(nullptr, port_, family_, flags);
} else {
sListen = fmt("file descriptor fd=%d", fd_);
serverSocket_->bindExistingFd(fd_);
}
serverSocket_->bind(nullptr, port, family_, flags);
serverSocket_->beginListen();
A2_LOG_INFO(fmt(MSG_LISTENING_PORT, getCuid(), port));
A2_LOG_INFO(fmt(MSG_LISTENING_RPC, getCuid(), sListen.c_str()));
e_->addSocketForReadCheck(serverSocket_, this);
A2_LOG_NOTICE(fmt(_("IPv%d RPC: listening on TCP port %u"), ipv, port));
A2_LOG_NOTICE(fmt(_("RPC: listening on %s"), sListen.c_str()));
return true;
}
catch (RecoverableException& e) {
A2_LOG_ERROR_EX(fmt("IPv%d RPC: failed to bind TCP port %u", ipv, port), e);
A2_LOG_ERROR_EX(fmt("RPC: failed to listen on %s", sListen.c_str()), e);
serverSocket_->closeConnection();
}
return false;
Expand Down
8 changes: 6 additions & 2 deletions src/HttpListenCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,22 @@ class SocketCore;
class HttpListenCommand : public Command {
private:
DownloadEngine* e_;
int fd_;
int family_;
uint16_t port_;
std::shared_ptr<SocketCore> serverSocket_;
bool secure_;

public:
HttpListenCommand(cuid_t cuid, DownloadEngine* e, int family, bool secure);
HttpListenCommand(cuid_t cuid, DownloadEngine* e, int family, uint16_t port, bool secure);

HttpListenCommand(cuid_t cuid, DownloadEngine* e, int fd, bool secure);

virtual ~HttpListenCommand();

virtual bool execute() CXX11_OVERRIDE;

bool bindPort(uint16_t port);
bool listen();
};

} // namespace aria2
Expand Down
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ AM_CPPFLAGS = \
@LIBGMP_CFLAGS@ \
@LIBGCRYPT_CFLAGS@ \
@LIBSSH2_CFLAGS@ \
@SYSTEMD_CFLAGS@ \
@LIBCARES_CFLAGS@ \
@WSLAY_CFLAGS@ \
@TCMALLOC_CFLAGS@ \
Expand All @@ -735,6 +736,7 @@ EXTLDADD = @ALLOCA@ \
@LIBGMP_LIBS@ \
@LIBGCRYPT_LIBS@ \
@LIBSSH2_LIBS@ \
@SYSTEMD_LIBS@ \
@LIBCARES_LIBS@ \
@WSLAY_LIBS@ \
@TCMALLOC_LIBS@ \
Expand Down
12 changes: 12 additions & 0 deletions src/SocketCore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,18 @@ void SocketCore::bind(const struct sockaddr* addr, socklen_t addrlen)
sockfd_ = fd;
}

void SocketCore::bindExistingFd(int fd)
{
std::string error;
if (fd < 0) {
error = fmt("Invalid file descriptor fd=%d", fd);
throw DL_ABORT_EX(fmt(EX_SOCKET_BIND, error.c_str()));
}
util::make_fd_cloexec(fd);
applySocketBufferSize(fd);
sockfd_ = fd;
}

void SocketCore::beginListen()
{
if (listen(sockfd_, 1024) == -1) {
Expand Down
5 changes: 5 additions & 0 deletions src/SocketCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ class SocketCore {
void bind(const char* addrp, uint16_t port, int family,
int flags = AI_PASSIVE);

/**
* Bind to an existing file descriptor.
*/
void bindExistingFd(int fd);

/**
* Listens form connection on it.
* Call bind(uint16_t) before calling this function.
Expand Down
4 changes: 2 additions & 2 deletions src/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@
#define MSG_CONTENT_DISPOSITION_DETECTED \
"CUID#%" PRId64 " - Content-Disposition detected. Use %s as filename"
#define MSG_PEER_BANNED "CUID#%" PRId64 " - Peer %s:%d banned."
#define MSG_LISTENING_PORT \
"CUID#%" PRId64 " - Using port %d for accepting new connections"
#define MSG_LISTENING_RPC \
"CUID#%" PRId64 " - Using %s for accepting new connections"
#define MSG_BIND_FAILURE "CUID#%" PRId64 " - An error occurred while binding port=%d"
#define MSG_INCOMING_PEER_CONNECTION \
"CUID#%" PRId64 " - Incoming connection, adding new command CUID#%" PRId64 ""
Expand Down