Skip to content

Commit

Permalink
Emit new 'subscription' events
Browse files Browse the repository at this point in the history
  • Loading branch information
uNetworkingAB committed Dec 24, 2022
1 parent c4d45fb commit 27d93c8
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 12 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<i>Simple, secure</i><sup><a href="https://github.com/uNetworking/uWebSockets/tree/master/fuzzing#fuzz-testing-of-various-parsers-and-mocked-examples">1</a></sup><i> & standards compliant</i><sup><a href="https://unetworking.github.io/uWebSockets.js/report.pdf">2</a></sup><i> web server for the most demanding</i><sup><a href="https://github.com/uNetworking/uWebSockets/tree/master/benchmarks#benchmark-driven-development">3</a></sup><i> of applications.</i> <a href="https://github.com/uNetworking/uWebSockets/blob/master/misc/READMORE.md">Read more...</a>
<br><br>

<a href="https://github.com/uNetworking/uWebSockets/releases"><img src="https://img.shields.io/github/v/release/uNetworking/uWebSockets"></a> <a href="https://lgtm.com/projects/g/uNetworking/uWebSockets/context:cpp"><img alt="Language grade: C/C++" src="https://img.shields.io/lgtm/grade/cpp/g/uNetworking/uWebSockets.svg?logo=lgtm&logoWidth=18"/></a> <a href="https://osv.dev/list?q=uwebsockets&affected_only=true&page=1&ecosystem=OSS-Fuzz"><img src="https://oss-fuzz-build-logs.storage.googleapis.com/badges/uwebsockets.svg" /></a> <img src="https://img.shields.io/badge/downloads-65%20million-pink" />
<a href="https://github.com/uNetworking/uWebSockets/releases"><img src="https://img.shields.io/github/v/release/uNetworking/uWebSockets"></a> <a href="https://osv.dev/list?q=uwebsockets&affected_only=true&page=1&ecosystem=OSS-Fuzz"><img src="https://oss-fuzz-build-logs.storage.googleapis.com/badges/uwebsockets.svg" /></a> <img src="https://img.shields.io/badge/downloads-70%20million-pink" />

</div>
<br><br>
Expand Down
18 changes: 10 additions & 8 deletions src/TopicTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ struct TopicTree {
}

/* Subscribe fails if we already are subscribed */
bool subscribe(Subscriber *s, std::string_view topic) {
Topic *subscribe(Subscriber *s, std::string_view topic) {
/* Notify user that they are doing something wrong here */
checkIteratingSubscriber(s);

Expand All @@ -173,42 +173,44 @@ struct TopicTree {
/* Insert us in topic, insert topic in us */
auto [it, inserted] = s->topics.insert(topicPtr);
if (!inserted) {
return false;
return nullptr;
}
topicPtr->insert(s);

/* Success */
return true;
return topicPtr;
}

/* Returns ok, last */
std::pair<bool, bool> unsubscribe(Subscriber *s, std::string_view topic) {
/* Returns ok, last, newCount */
std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) {
/* Notify user that they are doing something wrong here */
checkIteratingSubscriber(s);

/* Lookup topic */
Topic *topicPtr = lookupTopic(topic);
if (!topicPtr) {
/* If the topic doesn't exist we are assumed to still be subscribers of something */
return {false, false};
return {false, false, -1};
}

/* Erase from our list first */
if (s->topics.erase(topicPtr) == 0) {
return {false, false};
return {false, false, -1};
}

/* Remove us from topic */
topicPtr->erase(s);

int newCount = topicPtr->size();

/* If there is no subscriber to this topic, remove it */
if (!topicPtr->size()) {
/* Unique_ptr deletes the topic */
topics.erase(topic);
}

/* If we don't hold any topics we are to be freed altogether */
return {true, topics.size() == 0};
return {true, s->topics.size() == 0, newCount};
}

/* Factory function for creating a Subscriber */
Expand Down
19 changes: 17 additions & 2 deletions src/WebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ struct WebSocket : AsyncSocket<SSL> {
/* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */
Super::timeout(webSocketContextData->idleTimeoutComponents.second);

/* At this point we iterate all currently held subscriptions and emit an event for all of them */
if (webSocketContextData->subscriptionHandler) {
for (Topic *t : webSocketData->subscriber->topics) {
webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size());
}
}

/* Make sure to unsubscribe from any pub/sub node at exit */
webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber);
webSocketData->subscriber = nullptr;
Expand Down Expand Up @@ -234,7 +241,11 @@ struct WebSocket : AsyncSocket<SSL> {
}

/* Cannot return numSubscribers as this is only for this particular websocket context */
webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic);
Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic);
if (topicOrNull && webSocketContextData->subscriptionHandler) {
/* Emit this socket, the topic, new count, old count */
webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1);
}

/* Subscribe always succeeds */
return true;
Expand All @@ -251,7 +262,11 @@ struct WebSocket : AsyncSocket<SSL> {
if (!webSocketData->subscriber) { return false; }

/* Cannot return numSubscribers as this is only for this particular websocket context */
auto [ok, last] = webSocketContextData->topicTree->unsubscribe(webSocketData->subscriber, topic);
auto [ok, last, newCount] = webSocketContextData->topicTree->unsubscribe(webSocketData->subscriber, topic);
/* Emit subscription event if last */
if (ok && webSocketContextData->subscriptionHandler) {
webSocketContextData->subscriptionHandler(this, topic, newCount, newCount + 1);
}

/* Free us as subscribers if we unsubscribed from our last topic */
if (ok && last) {
Expand Down
2 changes: 1 addition & 1 deletion uSockets
Submodule uSockets updated 2 files
+17 −13 src/bsd.c
+0 −54 uSockets.vcxproj

0 comments on commit 27d93c8

Please sign in to comment.