From f73813a56402a38eeb65ef7443e214104d1295ca Mon Sep 17 00:00:00 2001 From: Cliff Jansen Date: Thu, 10 Aug 2023 06:01:35 -0700 Subject: [PATCH 1/6] PROTON-2748: Raw connection async close fix and tests. --- c/src/proactor/epoll_raw_connection.c | 105 +++--- c/src/proactor/raw_connection-internal.h | 2 + c/src/proactor/raw_connection.c | 32 +- c/tests/raw_wake_test.cpp | 417 ++++++++++++++++++++++- 4 files changed, 505 insertions(+), 51 deletions(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 9b85b15f1..644925916 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -50,7 +50,8 @@ struct praw_connection_t { struct addrinfo *ai; /* Current connect address */ bool connected; bool disconnected; - bool batch_empty; + bool hup_detected; + bool read_check; }; static void psocket_error(praw_connection_t *rc, int err, const char* msg) { @@ -304,7 +305,7 @@ void pn_raw_connection_write_close(pn_raw_connection_t *rc) { pni_raw_write_close(rc); } -static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) { +static pn_event_t *pni_epoll_raw_batch_next(pn_event_batch_t *batch, bool peek_only) { praw_connection_t *rc = containerof(batch, praw_connection_t, batch); pn_raw_connection_t *raw = &rc->raw_connection; @@ -318,12 +319,18 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) { unlock(&rc->task.mutex); if (waking) pni_raw_wake(raw); - pn_event_t *e = pni_raw_event_next(raw); - if (!e || pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED) - rc->batch_empty = true; + pn_event_t *e = peek_only ? pni_raw_event_peek(raw) : pni_raw_event_next(raw); return e; } +static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) { + return pni_epoll_raw_batch_next(batch, false); +} + +static pn_event_t *pni_raw_batch_peek(pn_event_batch_t *batch) { + return pni_epoll_raw_batch_next(batch, true); +} + task_t *pni_psocket_raw_task(psocket_t* ps) { return &containerof(ps, praw_connection_t, psocket)->task; } @@ -393,10 +400,10 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool if (rc->disconnected) { pni_raw_connect_failed(&rc->raw_connection); unlock(&rc->task.mutex); - rc->batch_empty = false; return &rc->batch; } if (events & (EPOLLHUP | EPOLLERR)) { + // Continuation of praw_connection_maybe_connect_lh() logic. // A wake can be the first event. Otherwise, wait for connection to complete. bool event_pending = task_wake || pni_raw_wake_is_pending(&rc->raw_connection) || pn_collector_peek(rc->raw_connection.collector); t->working = event_pending; @@ -408,32 +415,41 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool } unlock(&rc->task.mutex); - if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error); - if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error); - rc->batch_empty = false; + if (rc->connected) { + if (events & EPOLLERR) { + // Read and write sides closed via RST. Tear down immediately. + int soerr; + socklen_t soerrlen = sizeof(soerr); + int ec = getsockopt(fd, SOL_SOCKET, SO_ERROR, &soerr, &soerrlen); + if (ec == 0 && soerr) { + psocket_error(rc, soerr, "async disconnect"); + } + pni_raw_async_disconnect(&rc->raw_connection); + } else if (events & EPOLLHUP) { + rc->hup_detected = true; + } + + if (events & (EPOLLIN || EPOLLRDHUP) || rc->read_check) { + pni_raw_read(&rc->raw_connection, fd, rcv, set_error); + rc->read_check = false; + } + if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error); + } return &rc->batch; } void pni_raw_connection_done(praw_connection_t *rc) { bool notify = false; bool ready = false; - bool have_event = false; - - // If !batch_empty, can't be sure state machine up to date, so reschedule task if necessary. - if (!rc->batch_empty) { - if (pn_collector_peek(rc->raw_connection.collector)) - have_event = true; - else { - pn_event_t *e = pni_raw_batch_next(&rc->batch); - // State machine up to date. - if (e) { - have_event = true; - // Sole event. Can put back without order issues. - // Edge case, performance not important. - pn_collector_put(rc->raw_connection.collector, pn_event_class(e), pn_event_context(e), pn_event_type(e)); - } - } - } + pn_raw_connection_t *raw = &rc->raw_connection; + int fd = rc->psocket.epoll_io.fd; + + // Try write + if (pni_raw_can_write(raw)) pni_raw_write(raw, fd, snd, set_error); + pni_raw_process_shutdown(raw, fd, shutr, shutw); + + // Update state machine and check for possible pending event. + bool have_event = pni_raw_batch_peek(&rc->batch); lock(&rc->task.mutex); pn_proactor_t *p = rc->task.proactor; @@ -442,24 +458,31 @@ void pni_raw_connection_done(praw_connection_t *rc) { // The task may be in the ready state even if we've got no raw connection // wakes outstanding because we dealt with it already in pni_raw_batch_next() notify = (pni_task_wake_pending(&rc->task) || have_event) && schedule(&rc->task); - ready = rc->task.ready; + ready = rc->task.ready; // No need to poll. Already scheduled. unlock(&rc->task.mutex); - pn_raw_connection_t *raw = &rc->raw_connection; - int fd = rc->psocket.epoll_io.fd; - pni_raw_process_shutdown(raw, fd, shutr, shutw); - int wanted = - (pni_raw_can_read(raw) ? EPOLLIN : 0) | - (pni_raw_can_write(raw) ? EPOLLOUT : 0); - if (wanted) { - rc->psocket.epoll_io.wanted = wanted; - rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error + bool finished_disconnect = raw->state==conn_fini && !ready && !raw->disconnectpending; + if (finished_disconnect) { + // If we're closed and we've sent the disconnect then close + pni_raw_finalize(raw); + praw_connection_cleanup(rc); + } else if (ready) { + // Already scheduled to run. Skip poll. Remember if we want a read. + rc->read_check = pni_raw_can_read(raw); + } else if (!rc->connected) { + // Connect logic has already armed the socket. } else { - bool finished_disconnect = raw->state==conn_fini && !ready && !raw->disconnectpending; - if (finished_disconnect) { - // If we're closed and we've sent the disconnect then close - pni_raw_finalize(raw); - praw_connection_cleanup(rc); + // Must poll for iO. + int wanted = + (pni_raw_can_read(raw) ? (EPOLLIN | EPOLLRDHUP) : 0) | + (pni_raw_can_write(raw) ? EPOLLOUT : 0); + + // wanted == 0 implies we block until either application wake() or EPOLLHUP | EPOLLERR. + // If wanted == 0 and hup_detected, blocking not possible, so skip arming until + // application provides read buffers. + if (wanted || !rc->hup_detected) { + rc->psocket.epoll_io.wanted = wanted; + rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error } } diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h index 47b0ea925..9d7aee7ea 100644 --- a/c/src/proactor/raw_connection-internal.h +++ b/c/src/proactor/raw_connection-internal.h @@ -134,9 +134,11 @@ void pni_raw_write_close(pn_raw_connection_t *conn); void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int)); void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int)); void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int (*shutdown_rd)(int), int (*shutdown_wr)(int)); +void pni_raw_async_disconnect(pn_raw_connection_t *conn); bool pni_raw_can_read(pn_raw_connection_t *conn); bool pni_raw_can_write(pn_raw_connection_t *conn); pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn); +pn_event_t *pni_raw_event_peek(pn_raw_connection_t *conn); void pni_raw_initialize(pn_raw_connection_t *conn); void pni_raw_finalize(pn_raw_connection_t *conn); diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c index 3d8b976c6..1bcefeb21 100644 --- a/c/src/proactor/raw_connection.c +++ b/c/src/proactor/raw_connection.c @@ -669,12 +669,14 @@ bool pni_raw_can_write(pn_raw_connection_t *conn) { return !pni_raw_wdrained(conn) && conn->wbuffer_first_towrite; } -pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) { +static pn_event_t *pni_get_next_raw_event(pn_raw_connection_t *conn, bool peek_only) { + // Return event if available or advance state machine event stream. + // Note that pn_collector_next increments event refcount but peek does not. assert(conn); do { - pn_event_t *event = pn_collector_next(conn->collector); + pn_event_t *event = peek_only ? pn_collector_peek(conn->collector) : pn_collector_next(conn->collector); if (event) { - return pni_log_event(conn, event); + return peek_only ? event : pni_log_event(conn, event); } else if (conn->connectpending) { pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED); conn->connectpending = false; @@ -721,6 +723,14 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) { } while (true); } +pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) { + return pni_get_next_raw_event(conn, false); +} + +pn_event_t *pni_raw_event_peek(pn_raw_connection_t *conn) { + return pni_get_next_raw_event(conn, true); +} + void pni_raw_read_close(pn_raw_connection_t *conn) { // If already fully closed nothing to do if (pni_raw_rwclosed(conn)) { @@ -781,6 +791,22 @@ void pni_raw_close(pn_raw_connection_t *conn) { } } +void pni_raw_async_disconnect(pn_raw_connection_t *conn) { + if (pni_raw_rwclosed(conn)) + return; + + if (!pni_raw_rclosed(conn)) { + conn->state = pni_raw_new_state(conn, conn_read_closed); + conn->rclosedpending = true; + } + if (!pni_raw_wclosed(conn)) { + pni_raw_release_buffers(conn); + conn->state = pni_raw_new_state(conn, conn_write_closed); + conn->wclosedpending = true; + } + pni_raw_disconnect(conn); +} + bool pn_raw_connection_is_read_closed(pn_raw_connection_t *conn) { assert(conn); return pni_raw_rclosed(conn); diff --git a/c/tests/raw_wake_test.cpp b/c/tests/raw_wake_test.cpp index 4a5dc23d3..80ddc2028 100644 --- a/c/tests/raw_wake_test.cpp +++ b/c/tests/raw_wake_test.cpp @@ -28,11 +28,22 @@ #include #include #include +#include #endif #include -// WAKE tests require a running proactor. +// Raw connection tests driven by a proactor. + +// These tests often cheat by directly calling API functions that +// would normally be called in an event callback for thread safety +// reasons. This can usually work because the proactors and API calls +// are all called from a single thread so there is no contention, but +// the raw connection may require a wake so that the state machine and +// polling mask can be updated. Note that wakes stop working around +// the time the raw connection thinks it is about to be fully closed, +// so close operations may need to be done in event callbacks to +// avoid wake uncertainty. #include "../src/proactor/proactor-internal.h" #include "./pn_test_proactor.hpp" @@ -45,14 +56,32 @@ namespace { class common_handler : public handler { bool close_on_wake_; + bool write_close_on_wake_; + bool stop_on_wake_; + bool abort_on_wake_; + int closed_read_count_; + int closed_write_count_; + int disconnect_count_; + bool disconnect_error_; pn_raw_connection_t *last_server_; + pn_raw_buffer_t write_buff_; public: - explicit common_handler() : close_on_wake_(false), last_server_(0) {} + explicit common_handler() : close_on_wake_(false), write_close_on_wake_(0), stop_on_wake_(false), + abort_on_wake_(false), closed_read_count_(0), closed_write_count_(0), + disconnect_count_(0), disconnect_error_(false), + last_server_(0), write_buff_({0}) {} void set_close_on_wake(bool b) { close_on_wake_ = b; } - + void set_write_close_on_wake(bool b) { write_close_on_wake_ = b; } + void set_stop_on_wake(bool b) { stop_on_wake_ = b; } + void set_abort_on_wake(bool b) { abort_on_wake_ = b; } + int closed_read_count() { return closed_read_count_; } + int closed_write_count() { return closed_write_count_; } + int disconnect_count() { return disconnect_count_; } + bool disconnect_error() { return disconnect_error_; } pn_raw_connection_t *last_server() { return last_server_; } + void set_write_on_wake(pn_raw_buffer_t *b) { write_buff_ = *b; } bool handle(pn_event_t *e) override { switch (pn_event_type(e)) { @@ -71,13 +100,41 @@ class common_handler : public handler { } break; case PN_RAW_CONNECTION_WAKE: { - if (close_on_wake_) { - pn_raw_connection_t *rc = pn_event_raw_connection(e); + if (abort_on_wake_) abort(); + pn_raw_connection_t *rc = pn_event_raw_connection(e); + + if (write_buff_.size) { + // Add the buff for writing before any close operation. + CHECK(pn_raw_connection_write_buffers(rc, &write_buff_, 1) == 1); + write_buff_.size = 0; + } + if (write_close_on_wake_) + pn_raw_connection_write_close(rc); + if (close_on_wake_) pn_raw_connection_close(rc); + return stop_on_wake_; + } break; + + case PN_RAW_CONNECTION_DISCONNECTED: { + disconnect_count_++; + pn_raw_connection_t *rc = pn_event_raw_connection(e); + pn_condition_t *cond = pn_raw_connection_condition(rc); + if (disconnect_count_ == 1 && pn_condition_is_set(cond)) { + const char *nm = pn_condition_get_name(cond); + const char *ds = pn_condition_get_description(cond); + if (nm && strlen(nm) > 0 && ds && strlen(ds) > 0) + disconnect_error_ = true; } - return true; + return false; } break; + case PN_RAW_CONNECTION_CLOSED_READ: + closed_read_count_++; + return false; + + case PN_RAW_CONNECTION_CLOSED_WRITE: + closed_write_count_++; + return false; default: return false; @@ -85,9 +142,127 @@ class common_handler : public handler { } }; +static const size_t buffsz = 128; + +// Basic test consisting of +// client is an OS socket. +// server is a pn_raw_connection_t with one shared read/write buffer. +// pn_listener_t used to put the two together. +struct basic_test { + common_handler h; + proactor p; + pn_listener_t *l; + int sockfd; // client + pn_raw_connection_t *server_rc; + char buff[buffsz]; + bool buff_in_use; + + basic_test() : h(), p(&h) { + l = p.listen(); + REQUIRE_RUN(p, PN_LISTENER_OPEN); + sockfd = socket(AF_INET, SOCK_STREAM, 0); + REQUIRE(sockfd >= 0); + struct sockaddr_in laddr; + memset(&laddr, 0, sizeof(laddr)); + laddr.sin_family = AF_INET; + laddr.sin_port = htons(atoi(pn_test::listening_port(l).c_str())); + laddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + connect(sockfd, (const struct sockaddr*) &laddr, sizeof(laddr)); + + REQUIRE_RUN(p, PN_LISTENER_ACCEPT); + server_rc = h.last_server(); + REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); + pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0}; + CHECK(pn_raw_connection_give_read_buffers(server_rc, &rb, 1) == 1); + buff_in_use = true; + + pn_raw_connection_wake(server_rc); + REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE); + CHECK(pn_proactor_get(p) == NULL); /* idle */ + } + + ~basic_test() { + pn_listener_close(l); + REQUIRE_RUN(p, PN_LISTENER_CLOSE); + REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); + if (sockfd >= 0) close(sockfd); + bool sanity = h.closed_read_count() == 1 && h.closed_write_count() == 1 && + h.disconnect_count() == 1; + REQUIRE(sanity == true); + } + + void socket_write_close() { + if (sockfd < 0) return; + shutdown(sockfd, SHUT_WR); + } + + void socket_graceful_close() { + if (sockfd < 0) return; + close(sockfd); + sockfd = -1; + } + + bool socket_hard_close() { + // RST (not FIN), hard/abort close + if (sockfd < 0) return false; + struct linger lngr; + lngr.l_onoff = 1; + lngr.l_linger = 0; + if (sockfd < 0) return false; + if (setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &lngr, sizeof(lngr)) == 0) { + if (close(sockfd) == 0) { + sockfd = -1; + return true; + } + } + return false; + } + + void drain_read_buffer() { + assert(buff_in_use); + send(sockfd, "FOO", 3, 0); + REQUIRE_RUN(p, PN_RAW_CONNECTION_READ); + pn_raw_buffer_t rb = {0}; + REQUIRE(pn_raw_connection_take_read_buffers(server_rc, &rb, 1) == 1); + REQUIRE(rb.size == 3); + buff_in_use = false; + } + + void give_read_buffer() { + assert(!buff_in_use); + pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0}; + CHECK(pn_raw_connection_give_read_buffers(server_rc, &rb, 1) == 1); + buff_in_use = true; + } + + void write_next_wake(const char *m) { + assert(!buff_in_use); + pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0}; + size_t l = strlen(m); + assert(l < buffsz); + strcpy(rb.bytes, m); + rb.size = l; + h.set_write_on_wake(&rb); + } + + int drain_events() { + int ec = 0; + pn_event_batch_t *batch = NULL; + while (batch = pn_proactor_get(p.get())) { + pn_event_t *e; + while (e = pn_event_batch_next(batch)) { + ec++; + h.dispatch(e); + } + pn_proactor_done(p.get(), batch); + } + return ec; + } +}; } // namespace + // Test waking up a connection that is idle TEST_CASE("proactor_raw_connection_wake") { common_handler h; @@ -104,7 +279,7 @@ TEST_CASE("proactor_raw_connection_wake") { REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); CHECK(pn_proactor_get(p) == NULL); /* idle */ - pn_raw_connection_wake(rc); + pn_raw_connection_wake(rc); REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE); CHECK(pn_proactor_get(p) == NULL); /* idle */ @@ -119,3 +294,231 @@ TEST_CASE("proactor_raw_connection_wake") { REQUIRE_RUN(p, PN_LISTENER_CLOSE); REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); } + +// Normal close +TEST_CASE("raw_connection_graceful_close") { + struct basic_test x; + x.socket_graceful_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == false); +} + +// HARD close +TEST_CASE("raw_connection_hardclose") { + struct basic_test x; + x.socket_hard_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == true); +} + +// HARD close, no read buffer +TEST_CASE("raw_connection_hardclose_nrb") { + struct basic_test x; + // Drain read buffer without replenishing + x.drain_read_buffer(); + x.drain_events(); + CHECK(pn_proactor_get(x.p) == NULL); /* idle */ + x.socket_hard_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == true); +} + +// HARD close after read close +TEST_CASE("raw_connection_readclose_then_hardclose") { + struct basic_test x; + x.socket_write_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + x.drain_events(); + REQUIRE(x.h.disconnect_count() == 0); + x.socket_hard_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == true); +} + +// HARD close after read close, no read buffer +TEST_CASE("raw_connection_readclose_then_hardclose_nrb") { + struct basic_test x; + // Drain read buffer without replenishing + x.drain_read_buffer(); + x.drain_events(); + CHECK(pn_proactor_get(x.p) == NULL); /* idle */ + // Shut of read side should be ignored with no read buffer. + x.socket_write_close(); + CHECK(pn_proactor_get(x.p) == NULL); /* still idle */ + + // Confirm raw connection shuts down, even with no read buffer + x.socket_hard_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == true); +} + +// Normal close on socket delays CLOSED_READ event until application makes read buffers available +TEST_CASE("raw_connection_delay_readclose") { + struct basic_test x; + x.drain_read_buffer(); + x.socket_graceful_close(); + x.drain_events(); + REQUIRE(x.h.closed_read_count() == 0); + + x.give_read_buffer(); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + REQUIRE(x.h.closed_read_count() == 1); + + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); +} + +TEST_CASE("raw_connection_rst_on_write") { + struct basic_test x; + x.drain_read_buffer(); + + // Send some data + x.write_next_wake("foo"); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WRITTEN); + pn_raw_buffer_t rb = {0}; + CHECK(pn_raw_connection_take_written_buffers(x.server_rc, &rb, 1) == 1); + char b[buffsz]; + REQUIRE(recv(x.sockfd, b, buffsz, 0) == 3); + + // Repeat, with closed peer socket. + x.socket_graceful_close(); + x.write_next_wake("bar"); + pn_raw_connection_wake(x.server_rc); + // Write or subsequent poll should fail EPIPE + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == true); +} + +// One sided close. No cooperation from peer. +TEST_CASE("raw_connection_full_close") { + struct basic_test x; + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); +} + +// As above. No read buffer. +TEST_CASE("raw_connection_full_close_nrb") { + struct basic_test x; + x.drain_read_buffer(); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); +} + +// One sided close, pending write. +TEST_CASE("raw_connection_close_wdrain") { + struct basic_test x; + x.drain_read_buffer(); + // write and then close on next wake + x.write_next_wake("fubar"); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + // Now check fubar made it + char b[buffsz]; + REQUIRE(recv(x.sockfd, b, buffsz, 0) == 5); + REQUIRE(strncmp("fubar", b, 5) == 0); +} + +// One sided write_close then close. +TEST_CASE("raw_connection_wclose_full_close") { + struct basic_test x; + x.h.set_write_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_WRITE); + x.drain_events(); + REQUIRE(x.h.closed_read_count() == 0); + + x.h.set_write_close_on_wake(false); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); +} + +TEST_CASE("raw_connection_wclose_full_close_nrb") { + struct basic_test x; + x.drain_read_buffer(); + x.h.set_write_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_WRITE); + x.drain_events(); + REQUIRE(x.h.closed_read_count() == 0); + + x.h.set_write_close_on_wake(false); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); +} + +TEST_CASE("raw_connection_wclose_full_close_wdrain") { + struct basic_test x; + x.drain_read_buffer(); + // write and then wclose then close on next wake + x.write_next_wake("bar"); + x.h.set_write_close_on_wake(true); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + // Now check bar made it + char b[buffsz]; + REQUIRE(recv(x.sockfd, b, buffsz, 0) == 3); + REQUIRE(strncmp("bar", b, 3) == 0); +} + +// Half closes each direction. Raw connection then peer. +TEST_CASE("raw_connection_wclose_then_rclose") { + struct basic_test x; + x.h.set_write_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + x.drain_events(); + REQUIRE(x.h.closed_write_count() == 1); + REQUIRE(x.h.closed_read_count() == 0); + + char b[buffsz]; + REQUIRE(recv(x.sockfd, b, buffsz, 0) == 0); // EOF + x.socket_write_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.closed_read_count() == 1); +} + +// As above but peer first then raw connection. +TEST_CASE("raw_connection_rclose_then_wclose") { + struct basic_test x; + x.socket_write_close(); + x.drain_events(); + REQUIRE(x.h.closed_read_count() == 1); + REQUIRE(x.h.closed_write_count() == 0); + + x.h.set_write_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + char b[buffsz]; + REQUIRE(recv(x.sockfd, b, buffsz, 0) == 0); // EOF + REQUIRE(x.h.closed_write_count() == 1); +} + From 7c6fc14ae8bb5696e95118a10d217d3702a57a64 Mon Sep 17 00:00:00 2001 From: Cliff Jansen Date: Thu, 24 Aug 2023 23:14:17 -0700 Subject: [PATCH 2/6] PROTON-2748: pull request 402 review - replace incorrect boolean operator --- c/src/proactor/epoll_raw_connection.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 644925916..2e6d1e052 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -429,7 +429,7 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool rc->hup_detected = true; } - if (events & (EPOLLIN || EPOLLRDHUP) || rc->read_check) { + if (events & (EPOLLIN | EPOLLRDHUP) || rc->read_check) { pni_raw_read(&rc->raw_connection, fd, rcv, set_error); rc->read_check = false; } From 63985a9b5d0af9d20ee574e0762b7d3ac1ffcc80 Mon Sep 17 00:00:00 2001 From: Cliff Jansen Date: Wed, 6 Sep 2023 17:01:10 -0700 Subject: [PATCH 3/6] PROTON-2748: pull request 402 review - peek->has_events --- c/src/proactor/epoll_raw_connection.c | 17 +++--------- c/src/proactor/raw_connection-internal.h | 4 +-- c/src/proactor/raw_connection.c | 34 +++++++++++++----------- 3 files changed, 24 insertions(+), 31 deletions(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 2e6d1e052..9ede01895 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -305,7 +305,7 @@ void pn_raw_connection_write_close(pn_raw_connection_t *rc) { pni_raw_write_close(rc); } -static pn_event_t *pni_epoll_raw_batch_next(pn_event_batch_t *batch, bool peek_only) { +static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) { praw_connection_t *rc = containerof(batch, praw_connection_t, batch); pn_raw_connection_t *raw = &rc->raw_connection; @@ -319,16 +319,7 @@ static pn_event_t *pni_epoll_raw_batch_next(pn_event_batch_t *batch, bool peek_o unlock(&rc->task.mutex); if (waking) pni_raw_wake(raw); - pn_event_t *e = peek_only ? pni_raw_event_peek(raw) : pni_raw_event_next(raw); - return e; -} - -static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) { - return pni_epoll_raw_batch_next(batch, false); -} - -static pn_event_t *pni_raw_batch_peek(pn_event_batch_t *batch) { - return pni_epoll_raw_batch_next(batch, true); + return pni_raw_event_next(raw); } task_t *pni_psocket_raw_task(psocket_t* ps) { @@ -449,7 +440,7 @@ void pni_raw_connection_done(praw_connection_t *rc) { pni_raw_process_shutdown(raw, fd, shutr, shutw); // Update state machine and check for possible pending event. - bool have_event = pni_raw_batch_peek(&rc->batch); + bool have_event = pni_raw_batch_has_events(raw); lock(&rc->task.mutex); pn_proactor_t *p = rc->task.proactor; @@ -472,7 +463,7 @@ void pni_raw_connection_done(praw_connection_t *rc) { } else if (!rc->connected) { // Connect logic has already armed the socket. } else { - // Must poll for iO. + // Must poll for IO. int wanted = (pni_raw_can_read(raw) ? (EPOLLIN | EPOLLRDHUP) : 0) | (pni_raw_can_write(raw) ? EPOLLOUT : 0); diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h index 9d7aee7ea..7a2f56ab5 100644 --- a/c/src/proactor/raw_connection-internal.h +++ b/c/src/proactor/raw_connection-internal.h @@ -127,7 +127,7 @@ void pni_raw_connected(pn_raw_connection_t *conn); void pni_raw_connect_failed(pn_raw_connection_t *conn); void pni_raw_wake(pn_raw_connection_t *conn); bool pni_raw_wake_is_pending(pn_raw_connection_t *conn); -bool pni_raw_can_wake(pn_raw_connection_t *conn); +bool pni_raw_can_wake(pn_raw_connection_t *conn); // ZZZ void pni_raw_close(pn_raw_connection_t *conn); void pni_raw_read_close(pn_raw_connection_t *conn); void pni_raw_write_close(pn_raw_connection_t *conn); @@ -138,7 +138,7 @@ void pni_raw_async_disconnect(pn_raw_connection_t *conn); bool pni_raw_can_read(pn_raw_connection_t *conn); bool pni_raw_can_write(pn_raw_connection_t *conn); pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn); -pn_event_t *pni_raw_event_peek(pn_raw_connection_t *conn); +bool pni_raw_batch_has_events(pn_raw_connection_t *conn); void pni_raw_initialize(pn_raw_connection_t *conn); void pni_raw_finalize(pn_raw_connection_t *conn); diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c index 1bcefeb21..7d3e44f22 100644 --- a/c/src/proactor/raw_connection.c +++ b/c/src/proactor/raw_connection.c @@ -483,11 +483,6 @@ bool pni_raw_wake_is_pending(pn_raw_connection_t *conn) { return conn->wakepending; } -bool pni_raw_can_wake(pn_raw_connection_t *conn) { - // True if DISCONNECTED event has not yet been extracted from the batch. - return (conn->disconnect_state != disc_fini); -} - void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) { assert(conn); @@ -669,14 +664,14 @@ bool pni_raw_can_write(pn_raw_connection_t *conn) { return !pni_raw_wdrained(conn) && conn->wbuffer_first_towrite; } -static pn_event_t *pni_get_next_raw_event(pn_raw_connection_t *conn, bool peek_only) { - // Return event if available or advance state machine event stream. - // Note that pn_collector_next increments event refcount but peek does not. +bool pni_raw_batch_has_events(pn_raw_connection_t *conn) { + // If collector empty, advance state machine as necessary and generate next event. + // Return true if at least one event is available. assert(conn); do { - pn_event_t *event = peek_only ? pn_collector_peek(conn->collector) : pn_collector_next(conn->collector); + pn_event_t *event = pn_collector_peek(conn->collector); if (event) { - return peek_only ? event : pni_log_event(conn, event); + return true; } else if (conn->connectpending) { pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED); conn->connectpending = false; @@ -718,17 +713,18 @@ static pn_event_t *pni_get_next_raw_event(pn_raw_connection_t *conn, bool peek_o pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_READ_BUFFERS); conn->rrequestedbuffers = true; } else { - return NULL; + return false; } } while (true); } pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) { - return pni_get_next_raw_event(conn, false); -} - -pn_event_t *pni_raw_event_peek(pn_raw_connection_t *conn) { - return pni_get_next_raw_event(conn, true); + if (pni_raw_batch_has_events(conn)) { + pn_event_t* event = pn_collector_next(conn->collector); + assert(event); + return pni_log_event(conn, event); + } + return NULL; } void pni_raw_read_close(pn_raw_connection_t *conn) { @@ -840,3 +836,9 @@ pn_record_t *pn_raw_connection_attachments(pn_raw_connection_t *conn) { pn_raw_connection_t *pn_event_raw_connection(pn_event_t *event) { return (pn_event_class(event) == PN_CLASSCLASS(pn_raw_connection)) ? (pn_raw_connection_t*)pn_event_context(event) : NULL; } + +//ZZZ still used? +bool pni_raw_can_wake(pn_raw_connection_t *conn) { + // True if DISCONNECTED event has not yet been extracted from the batch. + return (conn->disconnect_state != disc_fini); +} From 4529406ce68cc1d2c6ae4b9f1f421e4a8e4b3161 Mon Sep 17 00:00:00 2001 From: Cliff Jansen Date: Fri, 8 Sep 2023 12:27:18 -0700 Subject: [PATCH 4/6] PROTON-2748: pull request 402 review - isolate !connected code, early return --- c/src/proactor/epoll_raw_connection.c | 36 +++++++++++++----------- c/src/proactor/raw_connection-internal.h | 2 +- c/src/proactor/raw_connection.c | 11 ++++---- 3 files changed, 25 insertions(+), 24 deletions(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 9ede01895..c0e731066 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -403,29 +403,31 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool } if (events & EPOLLOUT) praw_connection_connected_lh(rc); + unlock(&rc->task.mutex); + return &rc->batch; } unlock(&rc->task.mutex); - if (rc->connected) { - if (events & EPOLLERR) { - // Read and write sides closed via RST. Tear down immediately. - int soerr; - socklen_t soerrlen = sizeof(soerr); - int ec = getsockopt(fd, SOL_SOCKET, SO_ERROR, &soerr, &soerrlen); - if (ec == 0 && soerr) { - psocket_error(rc, soerr, "async disconnect"); - } - pni_raw_async_disconnect(&rc->raw_connection); - } else if (events & EPOLLHUP) { - rc->hup_detected = true; + if (events & EPOLLERR) { + // Read and write sides closed via RST. Tear down immediately. + int soerr; + socklen_t soerrlen = sizeof(soerr); + int ec = getsockopt(fd, SOL_SOCKET, SO_ERROR, &soerr, &soerrlen); + if (ec == 0 && soerr) { + psocket_error(rc, soerr, "async disconnect"); } + pni_raw_async_disconnect(&rc->raw_connection); + return &rc->batch; + } + if (events & EPOLLHUP) { + rc->hup_detected = true; + } - if (events & (EPOLLIN | EPOLLRDHUP) || rc->read_check) { - pni_raw_read(&rc->raw_connection, fd, rcv, set_error); - rc->read_check = false; - } - if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error); + if (events & (EPOLLIN | EPOLLRDHUP) || rc->read_check) { + pni_raw_read(&rc->raw_connection, fd, rcv, set_error); + rc->read_check = false; } + if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error); return &rc->batch; } diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h index 7a2f56ab5..cdfd5a852 100644 --- a/c/src/proactor/raw_connection-internal.h +++ b/c/src/proactor/raw_connection-internal.h @@ -127,7 +127,7 @@ void pni_raw_connected(pn_raw_connection_t *conn); void pni_raw_connect_failed(pn_raw_connection_t *conn); void pni_raw_wake(pn_raw_connection_t *conn); bool pni_raw_wake_is_pending(pn_raw_connection_t *conn); -bool pni_raw_can_wake(pn_raw_connection_t *conn); // ZZZ +bool pni_raw_can_wake(pn_raw_connection_t *conn); void pni_raw_close(pn_raw_connection_t *conn); void pni_raw_read_close(pn_raw_connection_t *conn); void pni_raw_write_close(pn_raw_connection_t *conn); diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c index 7d3e44f22..89fbbd1dd 100644 --- a/c/src/proactor/raw_connection.c +++ b/c/src/proactor/raw_connection.c @@ -483,6 +483,11 @@ bool pni_raw_wake_is_pending(pn_raw_connection_t *conn) { return conn->wakepending; } +bool pni_raw_can_wake(pn_raw_connection_t *conn) { + // True if DISCONNECTED event has not yet been extracted from the batch. + return (conn->disconnect_state != disc_fini); +} + void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) { assert(conn); @@ -836,9 +841,3 @@ pn_record_t *pn_raw_connection_attachments(pn_raw_connection_t *conn) { pn_raw_connection_t *pn_event_raw_connection(pn_event_t *event) { return (pn_event_class(event) == PN_CLASSCLASS(pn_raw_connection)) ? (pn_raw_connection_t*)pn_event_context(event) : NULL; } - -//ZZZ still used? -bool pni_raw_can_wake(pn_raw_connection_t *conn) { - // True if DISCONNECTED event has not yet been extracted from the batch. - return (conn->disconnect_state != disc_fini); -} From 546f046339fdc4ac0041ea7cc1f543e23a45351b Mon Sep 17 00:00:00 2001 From: Cliff Jansen Date: Thu, 21 Sep 2023 09:56:22 -0700 Subject: [PATCH 5/6] PROTON-2748: pull request 402 review - internal api tests --- c/tests/raw_connection_test.cpp | 76 +++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp index 7378d7541..ff59f7311 100644 --- a/c/tests/raw_connection_test.cpp +++ b/c/tests/raw_connection_test.cpp @@ -266,6 +266,31 @@ namespace { if (b.bytes) {b.capacity = s; b.size = s;} return b; } + + bool drain_events_to(pn_raw_connection_t *c, pn_event_type_t target) { + while (pn_event_t *e = pni_raw_event_next(c)) { + if (pn_event_type(e) == target) + return true; + } + return false; + } + + bool drain_events_until_both(pn_raw_connection_t *c, pn_event_type_t target1, pn_event_type_t target2) { + bool t1 = false; // target 1 seen + bool t2 = false; + while (pn_event_t *e = pni_raw_event_next(c)) { + if (pn_event_type(e) == target1) { + t1 = true; + if (t2) return true; + } + if (pn_event_type(e) == target2) { + t2 = true; + if (t1) return true; + } + } + return false; + } + } char message[] = @@ -812,3 +837,54 @@ TEST_CASE("raw connection") { } } } + +TEST_CASE("raw connection async close") { + auto_free p(mk_raw_connection()); + + REQUIRE(p); + CHECK_FALSE(pn_raw_connection_is_read_closed(p)); + CHECK_FALSE(pn_raw_connection_is_write_closed(p)); + pni_raw_connected(p); + REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CONNECTED); + + SECTION("Async close only") { + // nothing + } + + SECTION("Read close then async close") { + pni_raw_read_close(p); + REQUIRE(drain_events_to(p, PN_RAW_CONNECTION_CLOSED_READ)); + } + + SECTION("Write close then async close") { + pni_raw_write_close(p); + REQUIRE(drain_events_to(p, PN_RAW_CONNECTION_CLOSED_WRITE)); + } + + SECTION("Full close then async close") { + pni_raw_close(p); + REQUIRE(drain_events_until_both(p, PN_RAW_CONNECTION_CLOSED_READ, PN_RAW_CONNECTION_CLOSED_WRITE)); + } + + pni_raw_async_disconnect(p); + CHECK(pn_raw_connection_is_read_closed(p)); + CHECK(pn_raw_connection_is_write_closed(p)); + + SECTION("Async close then read close") { + pni_raw_read_close(p); + REQUIRE(drain_events_until_both(p, PN_RAW_CONNECTION_CLOSED_READ, PN_RAW_CONNECTION_CLOSED_WRITE)); + } + + SECTION("Async close then write close") { + pni_raw_write_close(p); + REQUIRE(drain_events_until_both(p, PN_RAW_CONNECTION_CLOSED_READ, PN_RAW_CONNECTION_CLOSED_WRITE)); + } + + SECTION("Async close then full close") { + pni_raw_close(p); + REQUIRE(drain_events_until_both(p, PN_RAW_CONNECTION_CLOSED_READ, PN_RAW_CONNECTION_CLOSED_WRITE)); + } + + REQUIRE(drain_events_to(p, PN_RAW_CONNECTION_DISCONNECTED)); + REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE); +} From 9c831822eb61c19fe23461e40c5967953a3de16f Mon Sep 17 00:00:00 2001 From: Cliff Jansen Date: Thu, 21 Sep 2023 10:23:59 -0700 Subject: [PATCH 6/6] PROTON-2748: pull request 402 review - rename test to raw_connection_proactor_test --- c/tests/CMakeLists.txt | 4 ++-- .../{raw_wake_test.cpp => raw_connection_proactor_test.cpp} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename c/tests/{raw_wake_test.cpp => raw_connection_proactor_test.cpp} (100%) diff --git a/c/tests/CMakeLists.txt b/c/tests/CMakeLists.txt index 2b9ec35c9..aec640407 100644 --- a/c/tests/CMakeLists.txt +++ b/c/tests/CMakeLists.txt @@ -83,8 +83,8 @@ if (CMAKE_CXX_COMPILER) target_link_libraries(c-raw-connection-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS}) if (PROACTOR_OK STREQUAL "epoll") - add_c_test(c-raw-wake-test raw_wake_test.cpp pn_test_proactor.cpp $) - target_link_libraries(c-raw-wake-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS}) + add_c_test(c-raw-connection-proactor-test raw_connection_proactor_test.cpp pn_test_proactor.cpp $) + target_link_libraries(c-raw-connection-proactor-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS}) endif() add_c_test(c-ssl-proactor-test pn_test_proactor.cpp ssl_proactor_test.cpp) diff --git a/c/tests/raw_wake_test.cpp b/c/tests/raw_connection_proactor_test.cpp similarity index 100% rename from c/tests/raw_wake_test.cpp rename to c/tests/raw_connection_proactor_test.cpp