Skip to content

Commit

Permalink
Os task refactor issue 2526 (#2672)
Browse files Browse the repository at this point in the history
* Task refactor WIP -- compiles, but does not link

* Adding isCooperative method - WIP

* more WIP

* Initial Os::Posix::Task implementaion

* Adding makeDelegate functions; Fixing existing UTs

* Touching up comments

* Fixing Linux issues

* Adding type_traits import

* Removing TaskId and reworking handle storage

* Do not need to assert that a reference is not nullptr; compiler will get angry

* Starting UT development

* Start test works

* Working start and join rules; random test

* Adding state, delay, and count tests

* Adding stub interface tests

* sp

* Registry tests

* Fixing UTs

* Fixing PTHREAD_MIN_STACK for linux

* Missing newlines

* Fixing GPIO driver's task

* More build fixes

* Fixes for CI

* Correcting errors in GNU only code

* Fixing review comments

* Fixing sp

* Fixing RPI (again)

* Removing divergent open rules from interface testing. Fixes #2733

* Fixing review comments

---------

Co-authored-by: Kevin F. Ortega <[email protected]>
  • Loading branch information
LeStarch and kevin-f-ortega committed May 22, 2024
1 parent 0fe467c commit 640cf44
Show file tree
Hide file tree
Showing 74 changed files with 2,573 additions and 959 deletions.
3 changes: 3 additions & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ ect
edu
EGB
EHAs
eip
elist
ELOG
Elts
Expand Down Expand Up @@ -1013,6 +1014,7 @@ svipc
swcaegitadmin
synchronicity
synopsys
sysconf
SYSCONFDIR
SYSFS
sysinfo
Expand Down Expand Up @@ -1111,6 +1113,7 @@ toolset
topologyapp
Torvalds
totalram
tparam
treeview
trimwhitespace
trinomials
Expand Down
19 changes: 10 additions & 9 deletions Drv/Ip/SocketReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@ SocketReadTask::SocketReadTask() : m_reconnect(false), m_stop(false) {}

SocketReadTask::~SocketReadTask() {}

void SocketReadTask::startSocketTask(const Fw::StringBase &name,
void SocketReadTask::start(const Fw::StringBase &name,
const bool reconnect,
const Os::Task::ParamType priority,
const Os::Task::ParamType stack,
const Os::Task::ParamType cpuAffinity) {
FW_ASSERT(not m_task.isStarted()); // It is a coding error to start this task multiple times
FW_ASSERT(m_task.getState() == Os::Task::State::NOT_STARTED); // It is a coding error to start this task multiple times
FW_ASSERT(not this->m_stop); // It is a coding error to stop the thread before it is started
m_reconnect = reconnect;
// Note: the first step is for the IP socket to open the port
Os::Task::TaskStatus stat = m_task.start(name, SocketReadTask::readTask, this, priority, stack, cpuAffinity);
FW_ASSERT(Os::Task::TASK_OK == stat, static_cast<NATIVE_INT_TYPE>(stat));
Os::Task::Arguments arguments(name, SocketReadTask::readTask, this, priority, stack, cpuAffinity);
Os::Task::Status stat = m_task.start(arguments);
FW_ASSERT(Os::Task::OP_OK == stat, static_cast<NATIVE_INT_TYPE>(stat));
}

SocketIpStatus SocketReadTask::startup() {
Expand All @@ -57,11 +58,11 @@ void SocketReadTask::close() {
this->getSocketHandler().close();
}

Os::Task::TaskStatus SocketReadTask::joinSocketTask(void** value_ptr) {
return m_task.join(value_ptr);
Os::Task::Status SocketReadTask::join() {
return m_task.join();
}

void SocketReadTask::stopSocketTask() {
void SocketReadTask::stop() {
this->m_stop = true;
this->getSocketHandler().shutdown(); // Break out of any receives and fully shutdown
}
Expand All @@ -78,7 +79,7 @@ void SocketReadTask::readTask(void* pointer) {
"[WARNING] Failed to open port with status %d and errno %d\n",
static_cast<POINTER_CAST>(status),
static_cast<POINTER_CAST>(errno));
(void) Os::Task::delay(SOCKET_RETRY_INTERVAL_MS);
(void) Os::Task::delay(SOCKET_RETRY_INTERVAL);
continue;
}

Expand All @@ -89,7 +90,7 @@ void SocketReadTask::readTask(void* pointer) {
"[WARNING] Failed to open port with status %d and errno %d\n",
static_cast<POINTER_CAST>(status),
static_cast<POINTER_CAST>(errno));
(void) Os::Task::delay(SOCKET_RETRY_INTERVAL_MS);
(void) Os::Task::delay(SOCKET_RETRY_INTERVAL);
continue;
}

Expand Down
8 changes: 4 additions & 4 deletions Drv/Ip/SocketReadTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SocketReadTask {
* \param stack: stack size provided to the task. See: Os::Task::start. Default: TASK_DEFAULT, posix threads default
* \param cpuAffinity: cpu affinity provided to task. See: Os::Task::start. Default: TASK_DEFAULT, don't care
*/
void startSocketTask(const Fw::StringBase &name,
void start(const Fw::StringBase &name,
const bool reconnect = true,
const Os::Task::ParamType priority = Os::Task::TASK_DEFAULT,
const Os::Task::ParamType stack = Os::Task::TASK_DEFAULT,
Expand Down Expand Up @@ -106,17 +106,17 @@ class SocketReadTask {
* Called to stop the socket read task. It is an error to call this before the thread has been started using the
* startSocketTask call. This will stop the read task and close the client socket.
*/
void stopSocketTask();
void stop();

/**
* \brief joins to the stopping read task to wait for it to close
*
* Called to join with the read socket task. This will block and return after the task has been stopped with a call
* to the stopSocketTask method.
* \param value_ptr: a pointer to fill with data. Passed to the Os::Task::join call. NULL to ignore.
* \return: Os::Task::TaskStatus passed back from the Os::Task::join call.
* \return: Os::Task::Status passed back from the Os::Task::join call.
*/
Os::Task::TaskStatus joinSocketTask(void** value_ptr);
Os::Task::Status join();


PROTECTED:
Expand Down
10 changes: 8 additions & 2 deletions Drv/Ip/test/ut/SocketTestHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <unistd.h>
#include <cerrno>
#include <arpa/inet.h>
#include <IpCfg.hpp>

namespace Drv {
namespace Test {
Expand Down Expand Up @@ -65,7 +66,7 @@ bool wait_on_change(Drv::IpSocket &socket, bool open, U32 iterations) {
if (open == socket.isOpened()) {
return true;
}
Os::Task::delay(10);
Os::Task::delay(Fw::Time(0, 10000));
}
return false;
}
Expand All @@ -75,10 +76,15 @@ bool wait_on_started(Drv::IpSocket &socket, bool open, U32 iterations) {
if (open == socket.isStarted()) {
return true;
}
Os::Task::delay(10);
Os::Task::delay(Fw::Time(0, 10000));
}
return false;
}

U64 get_configured_delay_ms() {
return (static_cast<U64>(SOCKET_RETRY_INTERVAL.getSeconds()) * 1000) +
(static_cast<U64>(SOCKET_RETRY_INTERVAL.getUSeconds()) / 1000);
}

};
};
6 changes: 6 additions & 0 deletions Drv/Ip/test/ut/SocketTestHelper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ bool wait_on_change(Drv::IpSocket &socket, bool open, U32 iterations);
*/
bool wait_on_started(Drv::IpSocket &socket, bool open, U32 iterations);

/**
* Get the configured delay, converted to milliseconds
* @return SOCKET_RETRY_INTERVAL converted to milliseconds
*/
U64 get_configured_delay_ms();

};
};
#endif
9 changes: 5 additions & 4 deletions Drv/LinuxGpioDriver/LinuxGpioDriverComponentImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,14 @@ namespace Drv {

}

Os::Task::TaskStatus LinuxGpioDriverComponentImpl ::
startIntTask(NATIVE_UINT_TYPE priority, NATIVE_UINT_TYPE stackSize, NATIVE_UINT_TYPE cpuAffinity) {
Os::Task::Status LinuxGpioDriverComponentImpl ::
startIntTask(Os::Task::ParamType priority, Os::Task::ParamType stackSize, Os::Task::ParamType cpuAffinity) {
Os::TaskString name;
name.format("GPINT_%s",this->getObjName()); // The task name can only be 16 chars including null
Os::Task::TaskStatus stat = this->m_intTask.start(name, LinuxGpioDriverComponentImpl::intTaskEntry, this, priority, stackSize, cpuAffinity);
Os::Task::Arguments arguments(name, LinuxGpioDriverComponentImpl::intTaskEntry, this, priority, stackSize, cpuAffinity);
Os::Task::Status stat = this->m_intTask.start(arguments);

if (stat != Os::Task::TASK_OK) {
if (stat != Os::Task::OP_OK) {
DEBUG_PRINT("Task start error: %d\n",stat);
}

Expand Down
6 changes: 3 additions & 3 deletions Drv/LinuxGpioDriver/LinuxGpioDriverComponentImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ namespace Drv {
~LinuxGpioDriverComponentImpl();

//! Start interrupt task
Os::Task::TaskStatus startIntTask(NATIVE_UINT_TYPE priority = Os::Task::TASK_DEFAULT,
NATIVE_UINT_TYPE stackSize = Os::Task::TASK_DEFAULT,
NATIVE_UINT_TYPE cpuAffinity = Os::Task::TASK_DEFAULT);
Os::Task::Status startIntTask(Os::Task::ParamType priority = Os::Task::TASK_DEFAULT,
Os::Task::ParamType stackSize = Os::Task::TASK_DEFAULT,
Os::Task::ParamType cpuAffinity = Os::Task::TASK_DEFAULT);

//! configure GPIO
enum GpioDirection {
Expand Down
6 changes: 3 additions & 3 deletions Drv/LinuxGpioDriver/LinuxGpioDriverComponentImplStub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ namespace Drv {
return false;
}

Os::Task::TaskStatus LinuxGpioDriverComponentImpl ::
startIntTask(NATIVE_UINT_TYPE priority, NATIVE_UINT_TYPE stackSize, NATIVE_UINT_TYPE cpuAffinity) {
return Os::Task::TASK_OK;
Os::Task::Status LinuxGpioDriverComponentImpl ::
startIntTask(Os::Task::ParamType priority, Os::Task::ParamType stackSize, Os::Task::ParamType cpuAffinity) {
return Os::Task::OP_OK;
}

LinuxGpioDriverComponentImpl ::
Expand Down
16 changes: 7 additions & 9 deletions Drv/LinuxUartDriver/LinuxUartDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ void LinuxUartDriver ::serialReadTaskEntry(void* ptr) {
status = RecvStatus::RECV_ERROR;
comp->recv_out(0, buff, status);
// to avoid spinning, wait 50 ms
Os::Task::delay(50);
Os::Task::delay(Fw::Time(0, 50));
continue;
}

Expand Down Expand Up @@ -389,21 +389,19 @@ void LinuxUartDriver ::serialReadTaskEntry(void* ptr) {
}
}

void LinuxUartDriver ::startReadThread(NATIVE_UINT_TYPE priority,
NATIVE_UINT_TYPE stackSize,
NATIVE_UINT_TYPE cpuAffinity) {
void LinuxUartDriver ::start(Os::Task::ParamType priority, Os::Task::ParamType stackSize, Os::Task::ParamType cpuAffinity) {
Os::TaskString task("SerReader");
Os::Task::TaskStatus stat =
this->m_readTask.start(task, serialReadTaskEntry, this, priority, stackSize, cpuAffinity);
FW_ASSERT(stat == Os::Task::TASK_OK, stat);
Os::Task::Arguments arguments(task, serialReadTaskEntry, this, priority, stackSize, cpuAffinity);
Os::Task::Status stat = this->m_readTask.start(arguments);
FW_ASSERT(stat == Os::Task::OP_OK, stat);
}

void LinuxUartDriver ::quitReadThread() {
this->m_quitReadThread = true;
}

Os::Task::TaskStatus LinuxUartDriver ::join(void** value_ptr) {
return m_readTask.join(value_ptr);
Os::Task::Status LinuxUartDriver ::join() {
return m_readTask.join();
}

} // end namespace Drv
8 changes: 4 additions & 4 deletions Drv/LinuxUartDriver/LinuxUartDriver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ class LinuxUartDriver : public LinuxUartDriverComponentBase {
//! start the serial poll thread.
//! buffSize is the max receive buffer size
//!
void startReadThread(NATIVE_UINT_TYPE priority = Os::Task::TASK_DEFAULT,
NATIVE_UINT_TYPE stackSize = Os::Task::TASK_DEFAULT,
NATIVE_UINT_TYPE cpuAffinity = Os::Task::TASK_DEFAULT);
void start(Os::Task::ParamType priority = Os::Task::TASK_DEFAULT,
Os::Task::ParamType stackSize = Os::Task::TASK_DEFAULT,
Os::Task::ParamType cpuAffinity = Os::Task::TASK_DEFAULT);

//! Quit thread
void quitReadThread();

//! Join thread
Os::Task::TaskStatus join(void** value_ptr);
Os::Task::Status join();

//! Destroy object LinuxUartDriver
//!
Expand Down
8 changes: 4 additions & 4 deletions Drv/TcpClient/test/ut/TcpClientTester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void TcpClientTester ::test_with_loop(U32 iterations, bool recv_thread) {
// Start up a receive thread
if (recv_thread) {
Os::TaskString name("receiver thread");
this->component.startSocketTask(name, true, Os::Task::TASK_DEFAULT, Os::Task::TASK_DEFAULT);
this->component.start(name, true, Os::Task::TASK_DEFAULT, Os::Task::TASK_DEFAULT);
}

// Loop through a bunch of client disconnects
Expand All @@ -55,7 +55,7 @@ void TcpClientTester ::test_with_loop(U32 iterations, bool recv_thread) {
if (not recv_thread) {
status1 = this->component.open();
} else {
EXPECT_TRUE(Drv::Test::wait_on_change(this->component.getSocketHandler(), true, SOCKET_RETRY_INTERVAL_MS/10 + 1));
EXPECT_TRUE(Drv::Test::wait_on_change(this->component.getSocketHandler(), true, Drv::Test::get_configured_delay_ms()/10 + 1));
}
EXPECT_TRUE(this->component.getSocketHandler().isOpened());
status2 = server.open();
Expand Down Expand Up @@ -87,8 +87,8 @@ void TcpClientTester ::test_with_loop(U32 iterations, bool recv_thread) {
}
// Properly stop the client on the last iteration
if ((1 + i) == iterations && recv_thread) {
this->component.stopSocketTask();
this->component.joinSocketTask(nullptr);
this->component.stop();
this->component.join();
} else {
this->component.close();
}
Expand Down
10 changes: 5 additions & 5 deletions Drv/TcpServer/test/ut/TcpServerTester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ void TcpServerTester ::test_with_loop(U32 iterations, bool recv_thread) {
// Start up a receive thread
if (recv_thread) {
Os::TaskString name("receiver thread");
this->component.startSocketTask(name, true, Os::Task::TASK_DEFAULT, Os::Task::TASK_DEFAULT);
EXPECT_TRUE(Drv::Test::wait_on_started(this->component.getSocketHandler(), true, SOCKET_RETRY_INTERVAL_MS/10 + 1));
this->component.start(name, true, Os::Task::TASK_DEFAULT, Os::Task::TASK_DEFAULT);
EXPECT_TRUE(Drv::Test::wait_on_started(this->component.getSocketHandler(), true, Drv::Test::get_configured_delay_ms()/10 + 1));
} else {
serverStat = this->component.startup();
ASSERT_EQ(serverStat, SOCK_SUCCESS)
Expand All @@ -59,7 +59,7 @@ void TcpServerTester ::test_with_loop(U32 iterations, bool recv_thread) {
if (not recv_thread) {
status1 = this->component.open();
} else {
EXPECT_TRUE(Drv::Test::wait_on_change(this->component.getSocketHandler(), true, SOCKET_RETRY_INTERVAL_MS/10 + 1));
EXPECT_TRUE(Drv::Test::wait_on_change(this->component.getSocketHandler(), true, Drv::Test::get_configured_delay_ms()/10 + 1));
}
EXPECT_TRUE(this->component.getSocketHandler().isOpened());

Expand Down Expand Up @@ -93,8 +93,8 @@ void TcpServerTester ::test_with_loop(U32 iterations, bool recv_thread) {
// Properly stop the client on the last iteration
if ((1 + i) == iterations && recv_thread) {
this->component.shutdown();
this->component.stopSocketTask();
this->component.joinSocketTask(nullptr);
this->component.stop();
this->component.join();
} else {
this->component.close();
}
Expand Down
8 changes: 4 additions & 4 deletions Drv/Udp/test/ut/UdpTester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void UdpTester::test_with_loop(U32 iterations, bool recv_thread) {
// Start up a receive thread
if (recv_thread) {
Os::TaskString name("receiver thread");
this->component.startSocketTask(name, true, Os::Task::TASK_DEFAULT, Os::Task::TASK_DEFAULT);
this->component.start(name, true, Os::Task::TASK_DEFAULT, Os::Task::TASK_DEFAULT);
}

// Loop through a bunch of client disconnects
Expand All @@ -69,7 +69,7 @@ void UdpTester::test_with_loop(U32 iterations, bool recv_thread) {
<< "Port2: " << port2;

} else {
EXPECT_TRUE(Drv::Test::wait_on_change(this->component.getSocketHandler(), true, SOCKET_RETRY_INTERVAL_MS/10 + 1));
EXPECT_TRUE(Drv::Test::wait_on_change(this->component.getSocketHandler(), true, Drv::Test::get_configured_delay_ms()/10 + 1));
}
EXPECT_TRUE(this->component.getSocketHandler().isOpened());

Expand Down Expand Up @@ -106,8 +106,8 @@ void UdpTester::test_with_loop(U32 iterations, bool recv_thread) {
}
// Properly stop the client on the last iteration
if ((1 + i) == iterations && recv_thread) {
this->component.stopSocketTask();
this->component.joinSocketTask(nullptr);
this->component.stop();
this->component.join();
} else {
this->component.close();
}
Expand Down

0 comments on commit 640cf44

Please sign in to comment.