Skip to content

Commit

Permalink
Implement join and zip "transforms"
Browse files Browse the repository at this point in the history
  • Loading branch information
mairas committed Jun 6, 2024
1 parent 5bbd746 commit f42b8fe
Show file tree
Hide file tree
Showing 3 changed files with 572 additions and 0 deletions.
152 changes: 152 additions & 0 deletions examples/join_and_zip.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/**
* @file join_and_zip.cpp
* @brief Example of Join and Zip transforms.
*
* Join and Zip transforms combine multiple input values into a single output.
*
* Try running this code with the serial monitor open ("Upload and Monitor"
* in PlatformIO menu). The program will produce capital letters every second,
* lowercase letters every 3 seconds, and integers every 10 seconds. Comment
* out and enable Join and zip transforms to see how they affect the
* output.
*
*/

#include "sensesp.h"

#include <math.h>

#include "sensesp/sensors/sensor.h"
#include "sensesp/transforms/join.h"
#include "sensesp/transforms/lambda_transform.h"
#include "sensesp/transforms/zip.h"
#include "sensesp_minimal_app_builder.h"

using namespace sensesp;

ReactESP app;

SensESPMinimalApp* sensesp_app;

void setup() {
SetupLogging();

// Note: SensESPMinimalAppBuilder is used to build the app. This creates
// a minimal app with no networking or other bells and whistles which
// would be distracting in this example. In normal use, this is not what
// you would use. Unless, of course, you know that is what you want.
SensESPMinimalAppBuilder builder;

sensesp_app = builder.get_app();

// Produce capital letters every second
auto sensor_A = new RepeatSensor<char>(1000, []() {
static char value = 'Z';
if (value == 'Z') {
value = 'A';
} else {
value += 1;
}
return value;
});

sensor_A->connect_to(new LambdaConsumer<char>(
[](char value) { ESP_LOGD("App", " %c", value); }));

// Produce lowercase letters every 3 seconds
auto sensor_a = new RepeatSensor<char>(3000, []() {
static char value = 'z';
if (value == 'z') {
value = 'a';
} else {
value += 1;
}
return value;
});

sensor_a->connect_to(new LambdaConsumer<char>(
[](char value) { ESP_LOGD("App", " %c", value); }));

// Produce integers every 10 seconds
auto sensor_int = new RepeatSensor<int>(10000, []() {
static int value = 0;
value += 1;
return value;
});

sensor_int->connect_to(new LambdaConsumer<int>(
[](int value) { ESP_LOGD("App", " %d", value); }));

// Join the three producer outputs into one tuple. A tuple is a data
// structure that can hold multiple values of different types. The resulting
// tuple can be consumed by consumers to process the values together.

auto* merged = new Join3<char, char, int>(5000);

// The Join transform will emit a tuple whenever any of the producers emit a
// new value, as long as all values are less than max_age milliseconds old.

// Once an integer is produced, the Join transform produces tuples for all
// new letter input until the last integer value is over 5000 milliseconds
// old.

// Next, try commenting out the Join transform and enabling the Zip transform
// below to see how it affects the output.

// auto* merged = new Zip3<char, char, int>(5000);

// The Zip transform will emit a tuple only when all producers have emitted a
// new value within max_age milliseconds. This has the effect of synchronizing
// the producers' outputs, at the cost of potentially waiting for all
// producers to emit a new value.

// Below, the sensors are connected to the consumers of the Join/Zip
// transform. The syntax here is a bit more complex and warrants some
// explanation.

// `merged` is our variable holding a pointer to the Join or Zip transform.
// The `consumers` member of the transform is a tuple of LambdaConsumers
// that consume and process the values of the producers. Subscripts [] can
// only be used to access elements of a same type, but our LambdaConsumers
// are of potentially different types - hence the tuple. The `std::get<>()`
// function is used to access the elements of the tuple. The first argument
// is the index of the element in the tuple, starting from 0.

// `connect_to()` expects a pointer to a `ValueConsumer`, but `std::get`
// returns a reference to the tuple element. The `&` operator is used to
// get the address of the tuple element, which is then passed to
// `connect_to()`.

// TL;DR: We connect each sensor to the corresponding consumer Join or
// Zip transform.

sensor_A->connect_to(&(std::get<0>(merged->consumers)));
sensor_a->connect_to(&(std::get<1>(merged->consumers)));
sensor_int->connect_to(&(std::get<2>(merged->consumers)));

// Here, we have a LambdaTransform that takes the tuple of values produced
// by the Join/Zip transform and converts it into a string. Note the template
// arguments: the transform input is a tuple of char, char, and int, and the
// output is a String. The same input type needs to be defined in our lambda
// function, starting with [].

auto merged_string = new LambdaTransform<std::tuple<char, char, int>, String>(
[](std::tuple<char, char, int> values) {
return String(std::get<0>(values)) + " " + String(std::get<1>(values)) +
" " + String(std::get<2>(values));
});

// Remember to connect the Join/Zip transform to the LambdaTransform:

merged->connect_to(merged_string);

// Finally, we connect the LambdaTransform to a consumer that will print the
// merged values to the console.

merged_string->connect_to(new LambdaConsumer<String>(
[](String value) {
ESP_LOGD("App", "Merged: %s", value.c_str());
}));
}

void loop() { app.tick(); }
199 changes: 199 additions & 0 deletions src/sensesp/transforms/join.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#ifndef SENSESP_SRC_SENSESP_TRANSFORMS_JOIN_H_
#define SENSESP_SRC_SENSESP_TRANSFORMS_JOIN_H_

#include <elapsedMillis.h>
#include <tuple>

#include "sensesp/system/lambda_consumer.h"
#include "sensesp/system/valueproducer.h"
#include "transform.h"

namespace sensesp {

////////////////////////////
// Base class for Join. This is needed until Arduino ESP32 Core supports C++14
// and template pack indices: https://stackoverflow.com/a/55807564/2999754

template <int N>
class JoinBase {
public:
JoinBase(long max_age = 0) : max_age_{max_age} {
for (int i = 0; i < N; i++) {
age_[i] = max_age;
}
}

protected:
elapsedMillis age_[N];
long max_age_;

virtual void emit_tuple() = 0;

void check_emit() {
for (int i = 0; i < N; i++) {
if (max_age_ != 0 && age_[i] > max_age_) {
return;
}
}
emit_tuple();
}
};

/**
* @brief Join two producer values into a tuple.
*
* Joins the connected producers' values into a tuple. The tuple is emitted
* when any producer emits a new value and none of the values have aged more
* than max_age milliseconds.
*
*/
template <typename T1, typename T2>
class Join : public JoinBase<2>, public ValueProducer<std::tuple<T1, T2>> {
public:
Join(long max_age = 0) : JoinBase<2>(max_age) {}

std::tuple<LambdaConsumer<T1>, LambdaConsumer<T2>> consumers = {
LambdaConsumer<T1>([this](T1 value) {
std::get<0>(values) = value;
age_[0] = 0;
check_emit();
}),
LambdaConsumer<T2>([this](T2 value) {
std::get<1>(values) = value;
age_[1] = 0;
check_emit();
})};

protected:
std::tuple<T1, T2> values;

void emit_tuple() override { this->emit(values); }
};

/**
* @brief Merge three producer values into a tuple.
*
* Merges the connected producers' values into a tuple. The tuple is emitted
* once all producers have emitted a new value within max_age milliseconds.
*
*/
template <typename T1, typename T2, typename T3>
class Join3 : public JoinBase<3>, public ValueProducer<std::tuple<T1, T2, T3>> {
public:
Join3(long max_age = 0) : JoinBase<3>(max_age) {}

std::tuple<LambdaConsumer<T1>, LambdaConsumer<T2>, LambdaConsumer<T3>>
consumers = {LambdaConsumer<T1>([this](T1 value) {
std::get<0>(values) = value;
age_[0] = 0;
check_emit();
}),
LambdaConsumer<T2>([this](T2 value) {
std::get<1>(values) = value;
age_[1] = 0;
check_emit();
}),
LambdaConsumer<T3>([this](T3 value) {
std::get<2>(values) = value;
age_[2] = 0;
check_emit();
})};

protected:
std::tuple<T1, T2, T3> values;

void emit_tuple() override { this->emit(values); }
};

/**
* @brief Merge four producer values into a tuple.
*
* Merges the connected producers' values into a tuple. The tuple is emitted
* once all producers have emitted a new value within max_age milliseconds.
*
*/
template <typename T1, typename T2, typename T3, typename T4>
class Join4 : public JoinBase<4>,
public ValueProducer<std::tuple<T1, T2, T3, T4>> {
public:
Join4(long max_age = 0) : JoinBase<4>(max_age) {}

std::tuple<LambdaConsumer<T1>, LambdaConsumer<T2>, LambdaConsumer<T3>,
LambdaConsumer<T4>>
consumers = {LambdaConsumer<T1>([this](T1 value) {
std::get<0>(values) = value;
age_[0] = 0;
check_emit();
}),
LambdaConsumer<T2>([this](T2 value) {
std::get<1>(values) = value;
age_[1] = 0;
check_emit();
}),
LambdaConsumer<T3>([this](T3 value) {
std::get<2>(values) = value;
age_[2] = 0;
check_emit();
}),
LambdaConsumer<T4>([this](T4 value) {
std::get<3>(values) = value;
age_[3] = 0;
check_emit();
})};

protected:
std::tuple<T1, T2, T3, T4> values;

void emit_tuple() override { this->emit(values); }
};

/**
* @brief Merge five producer values into a tuple.
*
* Merges the connected producers' values into a tuple. The tuple is emitted
* once all producers have emitted a new value within max_age milliseconds.
*
*/
template <typename T1, typename T2, typename T3, typename T4, typename T5>
class Join5 : public JoinBase<5>,
public ValueProducer<std::tuple<T1, T2, T3, T4, T5>> {
public:
Join5(long max_age = 0) : JoinBase<5>(max_age) {}

std::tuple<LambdaConsumer<T1>, LambdaConsumer<T2>, LambdaConsumer<T3>,
LambdaConsumer<T4>, LambdaConsumer<T5>>
consumers = {LambdaConsumer<T1>([this](T1 value) {
std::get<0>(values) = value;
age_[0] = 0;
check_emit();
}),
LambdaConsumer<T2>([this](T2 value) {
std::get<1>(values) = value;
age_[1] = 0;
check_emit();
}),
LambdaConsumer<T3>([this](T3 value) {
std::get<2>(values) = value;
age_[2] = 0;
check_emit();
}),
LambdaConsumer<T4>([this](T4 value) {
std::get<3>(values) = value;
age_[3] = 0;
check_emit();
}),
LambdaConsumer<T5>([this](T5 value) {
std::get<4>(values) = value;
age_[4] = 0;
check_emit();
})};

protected:
std::tuple<T1, T2, T3, T4, T5> values;

void emit_tuple() override { this->emit(values); }
};

} // namespace sensesp

#endif
Loading

0 comments on commit f42b8fe

Please sign in to comment.