From a28f4f0b6f82620ab89824a58c50f68383494d67 Mon Sep 17 00:00:00 2001 From: jordan Date: Wed, 20 Mar 2024 10:05:34 -0500 Subject: [PATCH] Fix thread errors, and add multirecv debug test. --- .gitignore | 1 + examples/include.am | 24 + examples/multirecv/multirecv.c | 890 +++++++++++++++++++++++++++++++++ examples/multirecv/multirecv.h | 43 ++ scripts/multirecv.test | 93 ++++ src/mqtt_client.c | 12 +- 6 files changed, 1060 insertions(+), 3 deletions(-) create mode 100644 examples/multirecv/multirecv.c create mode 100644 examples/multirecv/multirecv.h create mode 100755 scripts/multirecv.test diff --git a/.gitignore b/.gitignore index 3fdeac4dc..87409b306 100644 --- a/.gitignore +++ b/.gitignore @@ -80,6 +80,7 @@ examples/multithread/multithread examples/wiot/wiot examples/pub-sub/mqtt-pub examples/pub-sub/mqtt-sub +examples/multirecv/multirecv # eclipse .cproject diff --git a/examples/include.am b/examples/include.am index dd0548eae..62a86d431 100644 --- a/examples/include.am +++ b/examples/include.am @@ -18,6 +18,9 @@ noinst_PROGRAMS += examples/sn-client/sn-client \ examples/sn-client/sn-client_qos-1 \ examples/sn-client/sn-multithread endif +if BUILD_STRESS +noinst_PROGRAMS += examples/multirecv/multirecv +endif noinst_HEADERS += examples/mqttclient/mqttclient.h \ examples/mqttsimple/mqttsimple.h \ @@ -36,6 +39,10 @@ noinst_HEADERS += examples/mqttclient/mqttclient.h \ if BUILD_SN noinst_HEADERS += examples/sn-client/sn-client.h endif +if BUILD_STRESS +noinst_HEADERS += examples/multirecv/multirecv.h +endif + # MQTT Client Example examples_mqttclient_mqttclient_SOURCES = examples/mqttclient/mqttclient.c \ @@ -136,6 +143,16 @@ examples_sn_client_sn_multithread_DEPENDENCIES = src/libwolfmqtt.la examples_sn_client_sn_multithread_CPPFLAGS = -I$(top_srcdir)/examples $(AM_CPPFLAGS) endif +if BUILD_STRESS +# multirecv Example +examples_multirecv_multirecv_SOURCES = examples/multirecv/multirecv.c \ + examples/mqttnet.c \ + examples/mqttexample.c +examples_multirecv_multirecv_LDADD = src/libwolfmqtt.la +examples_multirecv_multirecv_DEPENDENCIES = src/libwolfmqtt.la +examples_multirecv_multirecv_CPPFLAGS = -I$(top_srcdir)/examples $(AM_CPPFLAGS) +endif + # MQTT pub and sub clients examples_pub_sub_mqtt_pub_SOURCES = examples/pub-sub/mqtt-pub.c \ examples/mqttnet.c \ @@ -172,6 +189,9 @@ dist_example_DATA+= examples/sn-client/sn-multithread.c endif dist_example_DATA+= examples/pub-sub/mqtt-pub.c dist_example_DATA+= examples/pub-sub/mqtt-sub.c +if BUILD_STRESS +dist_example_DATA+= examples/multirecv/multirecv.c +endif DISTCLEANFILES+= examples/mqttclient/.libs/mqttclient \ examples/firmware/.libs/fwpush \ @@ -189,6 +209,10 @@ DISTCLEANFILES+= examples/sn-client/.libs/sn-client \ examples/sn-client/.libs/sn-multithread endif +if BUILD_STRESS +DISTCLEANFILES+= examples/multirecv/.libs/multirecv +endif + EXTRA_DIST+= examples/mqttuart.c \ examples/publish.dat \ examples/mqttclient/mqttclient.vcxproj \ diff --git a/examples/multirecv/multirecv.c b/examples/multirecv/multirecv.c new file mode 100644 index 000000000..68b1f0bc9 --- /dev/null +++ b/examples/multirecv/multirecv.c @@ -0,0 +1,890 @@ +/* multirecv.c + * + * Copyright (C) 2006-2023 wolfSSL Inc. + * + * This file is part of wolfMQTT. + * + * wolfMQTT is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * wolfMQTT is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335, USA + */ + +/* Include the autoconf generated config.h */ +#ifdef HAVE_CONFIG_H + #include +#endif + +#include "wolfmqtt/mqtt_client.h" + +#include "multirecv.h" +#include "examples/mqttnet.h" +#include "examples/mqttexample.h" + +#include + + +#ifdef WOLFMQTT_MULTITHREAD + +/* Configuration */ + +/* Number of publish tasks. Each will send a unique message to the broker. */ +#if !defined(NUM_PUB_TASKS) && !defined(NUM_PUB_PER_TASK) + #define NUM_PUB_TASKS 5 + #define NUM_PUB_PER_TASK 2 +#endif + +/* Maximum size for network read/write callbacks. There is also a v5 define that + describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ +#ifndef MAX_BUFFER_SIZE + #define MAX_BUFFER_SIZE 1024 +#endif + +/* Total size of test message to build */ +#define TEST_MESSAGE_SIZE 1048 /* span more than one max packet */ + +/* Locals */ +static char mTestMessage[TEST_MESSAGE_SIZE]; +static int mStopRead = 0; +static int mNumMsgsRecvd; +static int mNumMsgsDone; + +#ifdef USE_WINDOWS_API + /* Windows Threading */ + #include + #include + typedef HANDLE THREAD_T; + #define THREAD_CREATE(h, f, c) ((*h = CreateThread(NULL, 0, f, c, 0, NULL)) == NULL) + #define THREAD_JOIN(h, c) WaitForMultipleObjects(c, h, TRUE, INFINITE) + #define THREAD_EXIT(e) ExitThread(e) +#else + /* Posix (Linux/Mac) */ + #include + #include + #include + typedef pthread_t THREAD_T; + #define THREAD_CREATE(h, f, c) ({ int ret = pthread_create(h, NULL, f, c); if (ret) { errno = ret; } ret; }) + #define THREAD_JOIN(h, c) ({ int ret, x; for(x=0;xctx; + (void)mqttCtx; + + if (wm_SemLock(&mtLock) == 0) { + if (msg_new) { + /* Determine min size to dump */ + len = msg->topic_name_len; + if (len > PRINT_BUFFER_SIZE) { + len = PRINT_BUFFER_SIZE; + } + XMEMCPY(buf, msg->topic_name, len); + buf[len] = '\0'; /* Make sure its null terminated */ + + /* Print incoming message */ + PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u", + buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len, + msg->buffer_pos); + } + + /* Print message payload */ + len = msg->buffer_len; + if (len > PRINT_BUFFER_SIZE) { + len = PRINT_BUFFER_SIZE; + } + XMEMCPY(buf, msg->buffer, len); + buf[len] = '\0'; /* Make sure its null terminated */ + PRINTF("Payload (%d - %d) printing %d bytes:" LINE_END "%s", + msg->buffer_pos, msg->buffer_pos + msg->buffer_len, len, buf); + + if (msg_done) { + /* for test mode: count the number of messages received */ + if (mqttCtx->test_mode) { + if (msg->buffer_pos + msg->buffer_len == + (word32)sizeof(mTestMessage) && + XMEMCMP(&mTestMessage[msg->buffer_pos], msg->buffer, + msg->buffer_len) == 0) + { + mNumMsgsRecvd++; + } + } + + PRINTF("MQTT Message: Done"); + } + wm_SemUnlock(&mtLock); + } + /* Return negative to terminate publish processing */ + return MQTT_CODE_SUCCESS; +} + +static void client_cleanup(MQTTCtx *mqttCtx) +{ + /* Free resources */ + if (mqttCtx->tx_buf) WOLFMQTT_FREE(mqttCtx->tx_buf); + if (mqttCtx->rx_buf) WOLFMQTT_FREE(mqttCtx->rx_buf); + + /* Cleanup network */ + MqttClientNet_DeInit(&mqttCtx->net); + + MqttClient_DeInit(&mqttCtx->client); +} + +WOLFMQTT_NORETURN static void client_exit(MQTTCtx *mqttCtx) +{ + client_cleanup(mqttCtx); + exit(1); +} + +static void client_disconnect(MQTTCtx *mqttCtx) +{ + int rc; + + do { + /* Disconnect */ + rc = MqttClient_Disconnect_ex(&mqttCtx->client, + &mqttCtx->disconnect); + } while (rc == MQTT_CODE_CONTINUE); + + PRINTF("MQTT Disconnect: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + + rc = MqttClient_NetDisconnect(&mqttCtx->client); + + PRINTF("MQTT Socket Disconnect: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + + client_cleanup(mqttCtx); +} + +static int multirecv_test_init(MQTTCtx *mqttCtx) +{ + int rc = MQTT_CODE_SUCCESS; + word32 startSec; + + mNumMsgsRecvd = 0; + mNumMsgsDone = 0; + + /* Create a demo mutex for making packet id values */ + rc = wm_SemInit(&mtLock); + if (rc != 0) { + client_exit(mqttCtx); + } + rc = wm_SemInit(&pingSignal); + if (rc != 0) { + wm_SemFree(&mtLock); + client_exit(mqttCtx); + } + + PRINTF("MQTT Client: QoS %d, Use TLS %d", mqttCtx->qos, + mqttCtx->use_tls); + + PRINTF("Use \"Ctrl+c\" to exit."); + + /* Initialize Network */ + rc = MqttClientNet_Init(&mqttCtx->net, mqttCtx); + PRINTF("MQTT Net Init: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + if (rc != MQTT_CODE_SUCCESS) { + client_exit(mqttCtx); + } + + /* setup tx/rx buffers */ + mqttCtx->tx_buf = (byte*)WOLFMQTT_MALLOC(MAX_BUFFER_SIZE); + mqttCtx->rx_buf = (byte*)WOLFMQTT_MALLOC(MAX_BUFFER_SIZE); + + /* Initialize MqttClient structure */ + rc = MqttClient_Init(&mqttCtx->client, &mqttCtx->net, + mqtt_message_cb, + mqttCtx->tx_buf, MAX_BUFFER_SIZE, + mqttCtx->rx_buf, MAX_BUFFER_SIZE, + mqttCtx->cmd_timeout_ms); + + PRINTF("MQTT Init: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + if (rc != MQTT_CODE_SUCCESS) { + client_exit(mqttCtx); + } + /* The client.ctx will be stored in the cert callback ctx during + MqttSocket_Connect for use by mqtt_tls_verify_cb */ + mqttCtx->client.ctx = mqttCtx; +#ifdef WOLFMQTT_NONBLOCK + mqttCtx->useNonBlockMode = 1; +#endif + +#ifdef WOLFMQTT_DISCONNECT_CB + /* setup disconnect callback */ + rc = MqttClient_SetDisconnectCallback(&mqttCtx->client, + mqtt_disconnect_cb, NULL); + if (rc != MQTT_CODE_SUCCESS) { + client_exit(mqttCtx); + } +#endif + + /* Connect to broker */ + startSec = 0; + do { + rc = MqttClient_NetConnect(&mqttCtx->client, mqttCtx->host, + mqttCtx->port, DEFAULT_CON_TIMEOUT_MS, mqttCtx->use_tls, mqtt_tls_cb); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_CONNECT, + DEFAULT_CON_TIMEOUT_MS); + } while (rc == MQTT_CODE_CONTINUE); + + PRINTF("MQTT Socket Connect: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + if (rc != MQTT_CODE_SUCCESS) { + client_exit(mqttCtx); + } + + /* Build connect packet */ + XMEMSET(&mqttCtx->connect, 0, sizeof(MqttConnect)); + mqttCtx->connect.keep_alive_sec = mqttCtx->keep_alive_sec; + mqttCtx->connect.clean_session = mqttCtx->clean_session; + mqttCtx->connect.client_id = mqttCtx->client_id; + + /* Last will and testament sent by broker to subscribers + of topic when broker connection is lost */ + XMEMSET(&mqttCtx->lwt_msg, 0, sizeof(mqttCtx->lwt_msg)); + mqttCtx->connect.lwt_msg = &mqttCtx->lwt_msg; + mqttCtx->connect.enable_lwt = mqttCtx->enable_lwt; + if (mqttCtx->enable_lwt) { + /* Send client id in LWT payload */ + mqttCtx->lwt_msg.qos = mqttCtx->qos; + mqttCtx->lwt_msg.retain = 0; + mqttCtx->lwt_msg.topic_name = WOLFMQTT_TOPIC_NAME"lwttopic"; + mqttCtx->lwt_msg.buffer = (byte*)mqttCtx->client_id; + mqttCtx->lwt_msg.total_len = + (word16)XSTRLEN(mqttCtx->client_id); + } + /* Optional authentication */ + mqttCtx->connect.username = mqttCtx->username; + mqttCtx->connect.password = mqttCtx->password; + + /* Send Connect and wait for Connect Ack */ + startSec = 0; + do { + rc = MqttClient_Connect(&mqttCtx->client, &mqttCtx->connect); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_CONNECT, + DEFAULT_CON_TIMEOUT_MS); + } while (rc == MQTT_CODE_CONTINUE); + if (rc != MQTT_CODE_SUCCESS) { + MqttClient_CancelMessage(&mqttCtx->client, + (MqttObject*)&mqttCtx->connect); + } + + PRINTF("MQTT Connect: Proto (%s), %s (%d)", + MqttClient_GetProtocolVersionString(&mqttCtx->client), + MqttClient_ReturnCodeToString(rc), rc); + if (rc != MQTT_CODE_SUCCESS) { + client_disconnect(mqttCtx); + } + + /* Validate Connect Ack info */ + PRINTF("MQTT Connect Ack: Return Code %u, Session Present %d", + mqttCtx->connect.ack.return_code, + (mqttCtx->connect.ack.flags & + MQTT_CONNECT_ACK_FLAG_SESSION_PRESENT) ? + 1 : 0 + ); + + return rc; +} + +static int multirecv_test_finish(MQTTCtx *mqttCtx) +{ + client_disconnect(mqttCtx); + + wm_SemFree(&pingSignal); + wm_SemFree(&mtLock); + + PRINTF("MQTT Client Done: %d", mqttCtx->return_code); + + if (mStopRead && mqttCtx->return_code == MQTT_CODE_TEST_EXIT) { + /* this is okay, we requested termination */ + mqttCtx->return_code = MQTT_CODE_SUCCESS; + } + + return mqttCtx->return_code; +} + +/* this task subscribes to topic */ +#ifdef USE_WINDOWS_API +static DWORD WINAPI subscribe_task( LPVOID param ) +#else +static void *subscribe_task(void *param) +#endif +{ + int rc = MQTT_CODE_SUCCESS; + uint16_t i; + MQTTCtx *mqttCtx = (MQTTCtx*)param; + word32 startSec = 0; + + /* Build list of topics */ + XMEMSET(&mqttCtx->subscribe, 0, sizeof(MqttSubscribe)); + i = 0; + mqttCtx->topics[i].topic_filter = mqttCtx->topic_name; + mqttCtx->topics[i].qos = mqttCtx->qos; + +#ifdef WOLFMQTT_V5 + if (mqttCtx->subId_not_avail != 1) { + /* Subscription Identifier */ + MqttProp* prop; + prop = MqttClient_PropsAdd(&mqttCtx->subscribe.props); + prop->type = MQTT_PROP_SUBSCRIPTION_ID; + prop->data_int = DEFAULT_SUB_ID; + } +#endif + + /* Subscribe Topic */ + mqttCtx->subscribe.packet_id = mqtt_get_packetid_threadsafe(); + mqttCtx->subscribe.topic_count = + sizeof(mqttCtx->topics) / sizeof(MqttTopic); + mqttCtx->subscribe.topics = mqttCtx->topics; + + do { + rc = MqttClient_Subscribe(&mqttCtx->client, &mqttCtx->subscribe); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_SUBSCRIBE, + mqttCtx->cmd_timeout_ms); + } while (rc == MQTT_CODE_CONTINUE); + if (rc != MQTT_CODE_SUCCESS) { + MqttClient_CancelMessage(&mqttCtx->client, + (MqttObject*)&mqttCtx->subscribe); + } + + PRINTF("MQTT Subscribe: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + + if (rc == MQTT_CODE_SUCCESS) { + /* show subscribe results */ + for (i = 0; i < mqttCtx->subscribe.topic_count; i++) { + MqttTopic *topic = &mqttCtx->subscribe.topics[i]; + PRINTF(" Topic %s, Qos %u, Return Code %u", + topic->topic_filter, + topic->qos, topic->return_code); + } + } + +#ifdef WOLFMQTT_V5 + if (mqttCtx->subscribe.props != NULL) { + MqttClient_PropsFree(mqttCtx->subscribe.props); + } +#endif + + THREAD_EXIT(0); +} + +static int TestIsDone(int rc, MQTTCtx* mqttCtx) +{ + int isDone = 0; + /* check if we are in test mode and done */ + if (wm_SemLock(&mtLock) == 0) { + if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode && + mNumMsgsDone == (2 * NUM_PUB_PER_TASK) && + mNumMsgsRecvd == (2 * NUM_PUB_PER_TASK) + #ifdef WOLFMQTT_NONBLOCK + && !MqttClient_IsMessageActive(&mqttCtx->client, NULL) + #endif + ) { + isDone = 1; /* done */ + } + + wm_SemUnlock(&mtLock); + + if (isDone) { + mqtt_stop_set(); + } + } + + return isDone; +} + +/* This task waits for messages */ +#ifdef USE_WINDOWS_API +static DWORD WINAPI waitMessage_task( LPVOID param ) +#else +static void *waitMessage_task(void *param) +#endif +{ + int rc = 0; + MQTTCtx *mqttCtx = (MQTTCtx*)param; + word32 startSec; + word32 cmd_timeout_ms = mqttCtx->cmd_timeout_ms; + + /* Read Loop */ + PRINTF("MQTT Waiting for message..."); + + startSec = 0; + do { + if (TestIsDone(rc, mqttCtx)) { + rc = 0; /* success */ + break; + } + + /* if blocking, use short timeout in test mode */ + if (mqttCtx->test_mode + #ifdef WOLFMQTT_NONBLOCK + && !mqttCtx->useNonBlockMode + #endif + ){ + cmd_timeout_ms = 1000; /* short timeout */ + } + + /* Try and read packet */ + rc = MqttClient_WaitMessage_ex(&mqttCtx->client, &mqttCtx->client.msg, + cmd_timeout_ms); + if (mqttCtx->test_mode && rc == MQTT_CODE_ERROR_TIMEOUT) { + rc = 0; + } + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_ANY, + cmd_timeout_ms); + if (rc != MQTT_CODE_SUCCESS && rc != MQTT_CODE_CONTINUE) { + MqttClient_CancelMessage(&mqttCtx->client, + (MqttObject*)&mqttCtx->client.msg); + } + + /* check return code */ + if (rc == MQTT_CODE_CONTINUE) { + continue; + } + #ifdef WOLFMQTT_ENABLE_STDIN_CAP + else if (rc == MQTT_CODE_STDIN_WAKE) { + XMEMSET(mqttCtx->rx_buf, 0, MAX_BUFFER_SIZE); + if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, + stdin) != NULL) + { + rc = (int)XSTRLEN((char*)mqttCtx->rx_buf); + + /* Publish Topic */ + mqttCtx->stat = WMQ_PUB; + XMEMSET(&mqttCtx->publish, 0, sizeof(MqttPublish)); + mqttCtx->publish.retain = 0; + mqttCtx->publish.qos = mqttCtx->qos; + mqttCtx->publish.duplicate = 0; + mqttCtx->publish.topic_name = mqttCtx->topic_name; + mqttCtx->publish.packet_id = mqtt_get_packetid_threadsafe(); + mqttCtx->publish.buffer = mqttCtx->rx_buf; + mqttCtx->publish.total_len = (word16)rc; + do { + rc = MqttClient_Publish(&mqttCtx->client, + &mqttCtx->publish); + } while (rc == MQTT_CODE_CONTINUE); + if (rc != MQTT_CODE_SUCCESS) { + MqttClient_CancelMessage(&mqttCtx->client, + (MqttObject*)&mqttCtx->publish); + } + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, + MqttClient_ReturnCodeToString(rc), rc); + } + } + #endif + else if (rc == MQTT_CODE_ERROR_TIMEOUT) { + if (mqttCtx->test_mode) { + /* timeout in test mode should exit */ + mqtt_stop_set(); + PRINTF("MQTT Exiting timeout..."); + break; + } + } + else if (rc != MQTT_CODE_SUCCESS) { + /* There was an error */ + PRINTF("MQTT Message Wait Error: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + break; + } + startSec = 0; + } while (!mqtt_stop_get()); + + mqttCtx->return_code = rc; + THREAD_EXIT(0); +} + +/* This task publishes a message to the broker. The task will be created + NUM_PUB_TASKS times, sending a unique message each time. */ +#ifdef USE_WINDOWS_API +static DWORD WINAPI publish_task( LPVOID param ) +#else +static void *publish_task(void *param) +#endif +{ + int rc[NUM_PUB_PER_TASK], i; + MQTTCtx *mqttCtx = (MQTTCtx*)param; + MqttPublish publish[NUM_PUB_PER_TASK]; + word32 startSec[NUM_PUB_PER_TASK]; + + /* Build publish */ + for (i=0; iqos; + publish[i].duplicate = 0; + publish[i].topic_name = mqttCtx->topic_name; + publish[i].packet_id = mqtt_get_packetid_threadsafe(); + publish[i].buffer = (byte*)mTestMessage; + publish[i].total_len = sizeof(mTestMessage); + + rc[i] = MQTT_CODE_CONTINUE; + startSec[i] = 0; + } + + /* Send until != continue */ + for (i=0; iclient, &publish[i], + NULL); + rc[i] = check_response(mqttCtx, rc[i], &startSec[i], + MQTT_PACKET_TYPE_PUBLISH, mqttCtx->cmd_timeout_ms); + } + } + + /* Report result */ + for (i=0; iclient, (MqttObject*)&publish[i]); + } + + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + publish[i].topic_name, publish[i].packet_id, + MqttClient_ReturnCodeToString(rc[i]), rc[i]); + + wm_SemLock(&mtLock); + mNumMsgsDone++; + wm_SemUnlock(&mtLock); + } + + THREAD_EXIT(0); +} + +#ifdef USE_WINDOWS_API +static DWORD WINAPI ping_task( LPVOID param ) +#else +static void *ping_task(void *param) +#endif +{ + int rc; + MQTTCtx *mqttCtx = (MQTTCtx*)param; + MqttPing ping; + word32 startSec; + + do { + if (wm_SemLock(&pingSignal) != 0) { + break; + } + if (mqtt_stop_get()) { + break; + } + + /* Keep Alive Ping */ + PRINTF("Sending ping keep-alive"); + + startSec = 0; + XMEMSET(&ping, 0, sizeof(ping)); + + do { + rc = MqttClient_Ping_ex(&mqttCtx->client, &ping); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PING_REQ, + mqttCtx->cmd_timeout_ms); + } while (rc == MQTT_CODE_CONTINUE); + if (rc != MQTT_CODE_SUCCESS) { + MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&ping); + } + + if (rc != MQTT_CODE_SUCCESS) { + PRINTF("MQTT Ping Keep Alive Error: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + break; + } + + wm_SemUnlock(&pingSignal); + } while (!mqtt_stop_get()); + + THREAD_EXIT(0); +} + +static int unsubscribe_do(MQTTCtx *mqttCtx) +{ + int rc; + word32 startSec = 0; + + /* Unsubscribe Topics */ + XMEMSET(&mqttCtx->unsubscribe, 0, sizeof(MqttUnsubscribe)); + mqttCtx->unsubscribe.packet_id = mqtt_get_packetid_threadsafe(); + mqttCtx->unsubscribe.topic_count = + sizeof(mqttCtx->topics) / sizeof(MqttTopic); + mqttCtx->unsubscribe.topics = mqttCtx->topics; + + /* Unsubscribe Topics */ + do { + rc = MqttClient_Unsubscribe(&mqttCtx->client, &mqttCtx->unsubscribe); + rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_UNSUBSCRIBE, + mqttCtx->cmd_timeout_ms); + } while (rc == MQTT_CODE_CONTINUE); + if (rc != MQTT_CODE_SUCCESS) { + MqttClient_CancelMessage(&mqttCtx->client, + (MqttObject*)&mqttCtx->unsubscribe); + } + + PRINTF("MQTT Unsubscribe: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + + return rc; +} + +int multirecv_test(MQTTCtx *mqttCtx) +{ + int rc = 0, threadCount = 0; + int i = 0; + THREAD_T threadList[6]; + + /* Build test message */ + rc = mqtt_fill_random_hexstr(mTestMessage, (word32)sizeof(mTestMessage)); + if (rc != 0) { + return rc; + } + + rc = multirecv_test_init(mqttCtx); + if (rc == 0) { + if (THREAD_CREATE(&threadList[threadCount++], subscribe_task, mqttCtx)) { + PRINTF("THREAD_CREATE failed: %d", errno); + return -1; + } + + /* for test mode, we must complete subscribe to track number of pubs received */ + if (mqttCtx->test_mode) { + if (THREAD_JOIN(threadList, threadCount)) { + PRINTF("THREAD_JOIN failed: %d", errno); + return -1; + } + threadCount = 0; + } + + /* Create 2 threads to wait for messages */ + for (i = 0; i < 2; ++i) { + if (THREAD_CREATE(&threadList[threadCount++], waitMessage_task, + mqttCtx)) { + PRINTF("THREAD_CREATE failed: %d", errno); + return -1; + } + } + + /* Ping */ + if (0) { + if (THREAD_CREATE(&threadList[threadCount++], ping_task, mqttCtx)) { + PRINTF("THREAD_CREATE failed: %d", errno); + return -1; + } + } + + /* Create 2 threads that publish unique messages */ + + for (i = 0; i < 2; ++i) { + if (THREAD_CREATE(&threadList[threadCount++], publish_task, + mqttCtx)) { + PRINTF("THREAD_CREATE failed: %d", errno); + return -1; + } + } + + /* Join threads - wait for completion */ + if (THREAD_JOIN(threadList, threadCount)) { +#ifdef __GLIBC__ + /* "%m" is specific to glibc/uclibc/musl, and FreeBSD (as of 2018). + * Uses errno and not argument required */ + PRINTF("THREAD_JOIN failed: %m"); +#else + PRINTF("THREAD_JOIN failed: %d", errno); +#endif + } + + (void)unsubscribe_do(mqttCtx); + + rc = multirecv_test_finish(mqttCtx); + } + return rc; +} +#endif /* WOLFMQTT_MULTITHREAD */ + +/* so overall tests can pull in test function */ + #ifdef USE_WINDOWS_API + #include /* for ctrl handler */ + + static BOOL CtrlHandler(DWORD fdwCtrlType) + { + if (fdwCtrlType == CTRL_C_EVENT) { + mqtt_stop_set(); + PRINTF("Received Ctrl+c"); + #ifdef WOLFMQTT_ENABLE_STDIN_CAP + MqttClientNet_Wake(&gMqttCtx.net); + #endif + return TRUE; + } + return FALSE; + } + #elif HAVE_SIGNAL + #include + static void sig_handler(int signo) + { + if (signo == SIGINT) { + #ifdef WOLFMQTT_MULTITHREAD + mqtt_stop_set(); + #endif + PRINTF("Received SIGINT"); + #if defined(WOLFMQTT_MULTITHREAD) && defined(WOLFMQTT_ENABLE_STDIN_CAP) + MqttClientNet_Wake(&gMqttCtx.net); + #endif + } + } + #endif + +#if defined(NO_MAIN_DRIVER) +int multirecv_main(int argc, char** argv) +#else +int main(int argc, char** argv) +#endif +{ + int rc; +#ifdef WOLFMQTT_MULTITHREAD + /* init defaults */ + mqtt_init_ctx(&gMqttCtx); + gMqttCtx.app_name = "wolfMQTT multirecv client"; + + /* parse arguments */ + rc = mqtt_parse_args(&gMqttCtx, argc, argv); + if (rc != 0) { + return rc; + } + #ifdef WOLFMQTT_STRESS + /* Forbid running stress test against anything but localhost. */ + if (XSTRCMP(gMqttCtx.host, "localhost") != 0) { + PRINTF("error: stress build may only run against localhost: host=%s", + gMqttCtx.host); + return -1; + } + #endif +#endif +#ifdef USE_WINDOWS_API + if (SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, + TRUE) == FALSE) + { + PRINTF("Error setting Ctrl Handler! Error %d", (int)GetLastError()); + } +#elif HAVE_SIGNAL + if (signal(SIGINT, sig_handler) == SIG_ERR) { + PRINTF("Can't catch SIGINT"); + } +#endif +#ifdef WOLFMQTT_MULTITHREAD + rc = multirecv_test(&gMqttCtx); + + mqtt_free_ctx(&gMqttCtx); +#else + (void)argc; + (void)argv; + + /* This example requires multithread mode to be enabled + ./configure --enable-mt */ + PRINTF("Example not compiled in!"); + rc = 0; /* return success, so make check passes with TLS disabled */ +#endif /* WOLFMQTT_MULTITHREAD */ + + return (rc == 0) ? 0 : EXIT_FAILURE; +} + diff --git a/examples/multirecv/multirecv.h b/examples/multirecv/multirecv.h new file mode 100644 index 000000000..23eead74f --- /dev/null +++ b/examples/multirecv/multirecv.h @@ -0,0 +1,43 @@ +/* multirecv.h + * + * Copyright (C) 2006-2023 wolfSSL Inc. + * + * This file is part of wolfMQTT. + * + * wolfMQTT is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * wolfMQTT is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335, USA + */ + +#ifndef WOLFMQTT_MULTIRECV_H +#define WOLFMQTT_MULTIRECV_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "examples/mqttexample.h" + + +/* Exposed functions */ +int multirecv_test(MQTTCtx *mqttCtx); + +#if defined(NO_MAIN_DRIVER) +int multirecv_main(int argc, char** argv); +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* WOLFMQTT_MULTIRECV_H */ diff --git a/scripts/multirecv.test b/scripts/multirecv.test new file mode 100755 index 000000000..ab7c2654f --- /dev/null +++ b/scripts/multirecv.test @@ -0,0 +1,93 @@ +#!/bin/bash + +# MQTT Multirecv Client test + +name="Multirecv Client" +prog="examples/multirecv/multirecv" +timeout_ms=250 +no_pid=-1 +broker_pid=$no_pid + +if [ $# -eq 1 ]; then + timeout_ms=$1 +fi + +# Require mosquitto to run. +if ! command -v mosquitto; then + echo "error: this test requires local mosquitto broker" + exit 1 +fi + +source scripts/test_common.sh + +# This test is local host only! +def_args="-h localhost -T -C $timeout_ms" + +bwrap_path="$(command -v bwrap)" +if [ -n "$bwrap_path" ]; then + # bwrap only if using a local mosquitto instance + if [ "${AM_BWRAPPED-}" != "yes" ]; then + echo "Using bwrap" + export AM_BWRAPPED=yes + exec "$bwrap_path" --unshare-net --dev-bind / / "$0" "$@" + fi + unset AM_BWRAPPED + + broker_args="-c scripts/broker_test/mosquitto.conf" + port=11883 +else + # mosquitto broker custom port non-TLS only + has_tls=no + generate_port + broker_args="-p $port" +fi +mosquitto $broker_args & +broker_pid=$! +echo "Broker PID is $broker_pid" +sleep 0.1 + +tls_port_args="-p 18883" +port_args="-p ${port}" +cacert_args="-A scripts/broker_test/ca-cert.pem" + +echo -e "Base args: $def_args $port_args" + +# Only interested in testing TLS with QoS 1 right now. +# This script is only meant to aid debugging thread errors, +# not be a CI test. +# +# Run without TLS and QoS 0-2 +#./$prog $def_args $port_args -q 0 +#RESULT=$? +#[ $RESULT -ne 0 ] && echo -e "\n\n$name failed! TLS=Off, QoS=0" && do_cleanup "-1" +# +#./$prog $def_args $port_args -q 1 +#RESULT=$? +#[ $RESULT -ne 0 ] && echo -e "\n\n$name failed! TLS=Off, QoS=1" && do_cleanup "-1" +# +#./$prog $def_args $port_args -q 2 +#RESULT=$? +#[ $RESULT -ne 0 ] && echo -e "\n\n$name failed! TLS=Off, QoS=2" && do_cleanup "-1" + +if test $has_tls == yes +then +# # Run with TLS and QoS 0-2 +# ./$prog $def_args $cacert_args $tls_port_args -t -q 0 +# RESULT=$? +# [ $RESULT -ne 0 ] && echo -e "\n\n$name failed! TLS=On, QoS=0" && do_cleanup "-1" + + ./$prog $def_args $cacert_args $tls_port_args -t -q 1 + RESULT=$? + [ $RESULT -ne 0 ] && echo -e "\n\n$name failed! TLS=On, QoS=1" && do_cleanup "-1" + +# ./$prog $def_args $cacert_args $tls_port_args -t -q 2 +# RESULT=$? +# [ $RESULT -ne 0 ] && echo -e "\n\n$name failed! TLS=On, QoS=2" && do_cleanup "-1" +fi + +# End broker +do_cleanup "0" + +echo -e "\n\nMultirecv MQTT Client Tests Passed" + +exit 0 diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 5aff6e807..07f6c90e3 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -126,6 +126,7 @@ static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg); #endif return 0; } + int wm_SemLock(wm_Sem *s) { pthread_mutex_lock(&s->mutex); #ifndef WOLFMQTT_NO_COND_SIGNAL @@ -275,11 +276,14 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) { int rc = MQTT_CODE_SUCCESS; -#ifdef WOLFMQTT_DEBUG_CLIENT +#if defined(WOLFMQTT_DEBUG_CLIENT) || !defined(WOLFMQTT_ALLOW_NODATA_UNLOCK) + #ifdef WOLFMQTT_DEBUG_CLIENT if (stat->isReadActive) { MQTT_TRACE_MSG("Warning, recv already locked!"); rc = MQTT_CODE_ERROR_SYSTEM; } +#endif /* WOLFMQTT_DEBUG_CLIENT */ + #ifndef WOLFMQTT_ALLOW_NODATA_UNLOCK /* detect if a read is already in progress */ #ifdef WOLFMQTT_MULTITHREAD if (wm_SemLock(&client->lockClient) == 0) @@ -293,9 +297,11 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) wm_SemUnlock(&client->lockClient); #endif } - if (rc != 0) + #endif /* WOLFMQTT_ALLOW_NODATA_UNLOCK */ + if (rc != MQTT_CODE_SUCCESS) { return rc; -#endif /* WOLFMQTT_DEBUG_CLIENT */ + } +#endif /* WOLFMQTT_DEBUG_CLIENT || !WOLFMQTT_ALLOW_NODATA_UNLOCK */ #ifdef WOLFMQTT_MULTITHREAD rc = wm_SemLock(&client->lockRecv);