Skip to content

Commit

Permalink
Add curl easy socket backend: configure checks, handle STDIN_WAKE and…
Browse files Browse the repository at this point in the history
… TIMEOUT, fix tests.
  • Loading branch information
philljj committed Dec 10, 2023
1 parent c329ef7 commit ce679c9
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 35 deletions.
35 changes: 22 additions & 13 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,6 @@ AC_CHECK_LIB([wolfssl],[wolfCrypt_Init],,[AC_MSG_ERROR([libwolfssl is required a

fi

# libcurl support
AC_ARG_ENABLE([curl],
[AS_HELP_STRING([--enable-curl],[Enable curl easy socket backend (default: disabled)])],
[ ENABLED_CURL=$enableval ],
[ ENABLED_CURL=no ]
)

if test "x$ENABLED_CURL" = "xyes"; then
AM_CFLAGS="$AM_CFLAGS -DENABLE_MQTT_CURL"

AC_CHECK_LIB([curl],[curl_easy_init],,[AC_MSG_ERROR([libcurl is required and wasn't found on the system. It can be obtained from https://curl.se/download.html.])])
fi


# Non-Blocking support
AC_ARG_ENABLE([nonblock],
Expand Down Expand Up @@ -255,6 +242,28 @@ then
fi


# libcurl support
AC_ARG_ENABLE([curl],
[AS_HELP_STRING([--enable-curl],[Enable curl easy socket backend (default: disabled)])],
[ ENABLED_CURL=$enableval ],
[ ENABLED_CURL=no ]
)

if test "x$ENABLED_CURL" = "xyes"; then
if test "x$ENABLED_ALL" = "xyes"; then
AC_MSG_ERROR([--enable-all and --enable-curl are incompatible])
fi

if test "x$ENABLED_SN" = "xyes"; then
AC_MSG_ERROR([--enable-sn and --enable-curl are incompatible])
fi

AM_CFLAGS="$AM_CFLAGS -DENABLE_MQTT_CURL"

AC_CHECK_LIB([curl],[curl_easy_init],,[AC_MSG_ERROR([libcurl is required and wasn't found on the system. It can be obtained from https://curl.se/download.html.])])
fi


# MQTT v5.0
AC_ARG_ENABLE([v5],
[AS_HELP_STRING([--enable-v5],[Enable MQTT v5.0 support (default: disabled)])],
Expand Down
94 changes: 76 additions & 18 deletions examples/mqttnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,8 @@ static int NetRead(void *context, byte* buf, int buf_len,
#define MQTT_CURL_NUM_RETRY (2)

static int
mqttcurl_wait(curl_socket_t sockfd, int for_recv, int timeout_ms)
mqttcurl_wait(curl_socket_t sockfd, int for_recv, int timeout_ms,
int test_mode)
{
struct timeval tv;
fd_set infd;
Expand All @@ -406,6 +407,11 @@ mqttcurl_wait(curl_socket_t sockfd, int for_recv, int timeout_ms)

if (for_recv) {
FD_SET(sockfd, &infd);
#ifdef WOLFMQTT_ENABLE_STDIN_CAP
if (!test_mode) {
FD_SET(STDIN, &infd);
}
#endif /* WOLFMQTT_ENABLE_STDIN_CAP */
}
else {
FD_SET(sockfd, &outfd);
Expand All @@ -415,17 +421,25 @@ mqttcurl_wait(curl_socket_t sockfd, int for_recv, int timeout_ms)

if (rc > 0) {
if (for_recv && FD_ISSET(sockfd, &infd)) {
rc = 1;
return MQTT_CODE_CONTINUE;
}
else if (!for_recv && FD_ISSET(sockfd, &outfd)) {
rc = 1;
return MQTT_CODE_CONTINUE;
}
#ifdef WOLFMQTT_ENABLE_STDIN_CAP
else if (for_recv && !test_mode && FD_ISSET(STDIN, &infd)) {
return MQTT_CODE_STDIN_WAKE;
}
#endif /* WOLFMQTT_ENABLE_STDIN_CAP */
else if (FD_ISSET(sockfd, &errfd)) {
rc = -1;
return MQTT_CODE_ERROR_NETWORK;
}
}
else if (rc == 0) {
return MQTT_CODE_ERROR_TIMEOUT;
}

return rc;
return MQTT_CODE_ERROR_NETWORK;
}

static int
Expand All @@ -434,7 +448,7 @@ mqttcurl_connect(SocketContext * sock, const char* host, word16 port,
{
CURLcode res = CURLE_OK;

if (sock == NULL) {
if (sock == NULL || sock->curl == NULL) {
return MQTT_CODE_ERROR_BAD_ARG;
}

Expand Down Expand Up @@ -583,6 +597,34 @@ mqttcurl_connect(SocketContext * sock, const char* host, word16 port,
}
#endif /* ENABLE_MQTT_TLS */

#if 0
/* Set proxy options.
* Unused at the moment. */
if (sock->mqttCtx->use_proxy != NULL) {
/* Set the proxy hostname or ip address string. Append
* ":[port num]" to the string to specify a port. */
res = curl_easy_setopt(sock->curl, CURLOPT_PROXY,
sock->mqttCtx->proxy_str);

if (res != CURLE_OK) {
PRINTF("error: curl_easy_setopt(CURLOPT_PROXY, %s) returned: %d",
res, sock->mqttCtx->proxy_str);
return MQTT_CODE_ERROR_CURL;
}

/* Set the proxy type. E.g. CURLPROXY_HTTP, CURLPROXY_HTTPS,
* CURLPROXY_HTTPS2, etc. */
res = curl_easy_setopt(sock->curl, CURLOPT_PROXYTYPE,
CURLPROXY_HTTP);

if (res != CURLE_OK) {
PRINTF("error: curl_easy_setopt(CURLOPT_PROXYTYPE) returned: %d",
res);
return MQTT_CODE_ERROR_CURL;
}
}
#endif

res = curl_easy_setopt(sock->curl, CURLOPT_CONNECT_ONLY, 1);

if (res != CURLE_OK) {
Expand Down Expand Up @@ -648,6 +690,7 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
SocketContext * sock = (SocketContext*)context;
size_t sent = 0;
curl_socket_t sockfd = 0;
int wait_rc = 0;

if (context == NULL || buf == NULL || buf_len == 0) {
return MQTT_CODE_ERROR_BAD_ARG;
Expand All @@ -672,27 +715,34 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
#endif

/* A very simple retry with timeout example. This assumes the entire
* payload will be transferred in a single shot without buffering. */
* payload will be transferred in a single shot without buffering.
* todo: add buffering? */
for (size_t i = 0; i < MQTT_CURL_NUM_RETRY; ++i) {
res = curl_easy_send(sock->curl, buf, buf_len, &sent);

if (res == CURLE_OK) {
#if defined(WOLFMQTT_DEBUG_SOCKET)
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("info: curl_easy_send(%d) returned: %d, %s", buf_len, res,
curl_easy_strerror(res));
#endif
#endif
break;
}

if (res == CURLE_AGAIN) {
#if defined(WOLFMQTT_DEBUG_SOCKET)
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("info: curl_easy_send(%d) returned: %d, %s", buf_len, res,
curl_easy_strerror(res));
#endif
#endif

if (mqttcurl_wait(sockfd, 0, timeout_ms) >= 0) {
wait_rc = mqttcurl_wait(sockfd, 0, timeout_ms,
sock->mqttCtx->test_mode);

if (wait_rc == MQTT_CODE_CONTINUE) {
continue;
}
else {
return wait_rc;
}
}

PRINTF("error: curl_easy_send(%d) returned: %d, %s", buf_len, res,
Expand All @@ -715,6 +765,7 @@ static int NetRead(void *context, byte* buf, int buf_len,
SocketContext * sock = (SocketContext*)context;
size_t recvd = 0;
curl_socket_t sockfd = 0;
int wait_rc = 0;

if (context == NULL || buf == NULL || buf_len == 0) {
return MQTT_CODE_ERROR_BAD_ARG;
Expand All @@ -739,27 +790,34 @@ static int NetRead(void *context, byte* buf, int buf_len,
#endif

/* A very simple retry with timeout example. This assumes the entire
* payload will be transferred in a single shot without buffering. */
* payload will be transferred in a single shot without buffering.
* todo: add buffering? */
for (size_t i = 0; i < MQTT_CURL_NUM_RETRY; ++i) {
res = curl_easy_recv(sock->curl, buf, buf_len, &recvd);

if (res == CURLE_OK) {
#if defined(WOLFMQTT_DEBUG_SOCKET)
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("info: curl_easy_recv(%d) returned: %d, %s", buf_len, res,
curl_easy_strerror(res));
#endif
#endif
break;
}

if (res == CURLE_AGAIN) {
#if defined(WOLFMQTT_DEBUG_SOCKET)
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("info: curl_easy_recv(%d) returned: %d, %s", buf_len, res,
curl_easy_strerror(res));
#endif
#endif

wait_rc = mqttcurl_wait(sockfd, 1, timeout_ms,
sock->mqttCtx->test_mode);

if (mqttcurl_wait(sockfd, 1, timeout_ms) >= 0) {
if (wait_rc == MQTT_CODE_CONTINUE) {
continue;
}
else {
return wait_rc;
}
}

PRINTF("error: curl_easy_recv(%d) returned: %d, %s", buf_len, res,
Expand Down
9 changes: 5 additions & 4 deletions scripts/nbclient.test
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ generate_port() { # function to produce a random port number

# Check for TLS support
has_tls=no
./examples/multithread/multithread -? 2>&1 | grep -- 'Enable TLS'
./examples/nbclient/nbclient -? 2>&1 | grep -- 'Enable TLS'
if [ $? -eq 0 ]; then
has_tls=yes
fi
Expand Down Expand Up @@ -74,6 +74,7 @@ then
def_args="${def_args} -h localhost"
tls_port_args="-p 18883"
port_args="-p ${port}"
cacert_args="-A scripts/broker_test/ca-cert.pem"
fi

echo -e "Base args: $def_args $port_args"
Expand All @@ -94,15 +95,15 @@ RESULT=$?
if test $has_tls == yes
then
# Run with TLS and QoS 0-2
./examples/nbclient/nbclient $def_args $tls_port_args -t -q 0 $1
./examples/nbclient/nbclient $def_args $cacert_args $tls_port_args -t -q 0 $1
RESULT=$?
[ $RESULT -ne 0 ] && echo -e "\n\nNon-blocking Client failed! TLS=On, QoS=0" && do_cleanup "-1"

./examples/nbclient/nbclient $def_args $tls_port_args -t -q 1 $1
./examples/nbclient/nbclient $def_args $cacert_args $tls_port_args -t -q 1 $1
RESULT=$?
[ $RESULT -ne 0 ] && echo -e "\n\nNon-blocking Client failed! TLS=On, QoS=1" && do_cleanup "-1"

./examples/nbclient/nbclient $def_args $tls_port_args -t -q 2 $1
./examples/nbclient/nbclient $def_args $cacert_args $tls_port_args -t -q 2 $1
RESULT=$?
[ $RESULT -ne 0 ] && echo -e "\n\nNon-blocking Client failed! TLS=On, QoS=2" && do_cleanup "-1"
fi
Expand Down

0 comments on commit ce679c9

Please sign in to comment.