Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 46 additions & 19 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
*/
if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client->canSend()) {
if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client && _client->canSend()) {
_runQueue();
}
return true;
Expand Down Expand Up @@ -334,7 +334,7 @@ void AsyncEventSourceClient::_runQueue() {
}

// flush socket
if (total_bytes_written) {
if (total_bytes_written && _client) {
_client->send();
}
}
Expand All @@ -360,7 +360,21 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient *client) {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
_clients.emplace_back(client);

// find first unique_ptr with nullptr and reuse it
bool reused = false;
for (auto &c : _clients) {
if (c.get() == nullptr) {
c.reset(client);
reused = true;
break;
}
}

if (!reused) {
_clients.emplace_back(client);
}

if (_connectcb) {
_connectcb(client);
}
Expand All @@ -377,7 +391,7 @@ void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) {
#endif
for (auto i = _clients.begin(); i != _clients.end(); ++i) {
if (i->get() == client) {
_clients.erase(i);
i->reset(); // reset the unique_ptr but do not remove the list entry yet to keep other iterators valid
break;
}
}
Expand All @@ -392,7 +406,7 @@ void AsyncEventSource::close() {
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
for (const auto &c : _clients) {
if (c->connected()) {
if (c.get() != nullptr && c->connected()) {
/**
* @brief: Fix self-deadlock by using recursive_mutex instead.
* Due to c->close() shall call the callback function _onDisconnect()
Expand All @@ -403,24 +417,31 @@ void AsyncEventSource::close() {
}
}

void AsyncEventSource::cleanup() {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif

// resize the list to remove unique_ptr with nullptr
_clients.remove_if([](const std::unique_ptr<AsyncEventSourceClient> &c) {
return c.get() == nullptr;
});
}

// pmb fix
size_t AsyncEventSource::avgPacketsWaiting() const {
size_t aql = 0;
uint32_t nConnectedClients = 0;
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
if (!_clients.size()) {
return 0;
}

for (const auto &c : _clients) {
if (c->connected()) {
if (c.get() != nullptr && c->connected()) {
aql += c->packetsWaiting();
++nConnectedClients;
}
}
return ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
return nConnectedClients == 0 ? 0 : ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
}

AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
Expand All @@ -431,10 +452,12 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c
size_t hits = 0;
size_t miss = 0;
for (const auto &c : _clients) {
if (c->write(shared_msg)) {
++hits;
} else {
++miss;
if (c.get() != nullptr && c->connected()) {
if (c->write(shared_msg)) {
++hits;
} else {
++miss;
}
}
}
return hits == 0 ? DISCARDED : (miss == 0 ? ENQUEUED : PARTIALLY_ENQUEUED);
Expand All @@ -446,7 +469,7 @@ size_t AsyncEventSource::count() const {
#endif
size_t n_clients{0};
for (const auto &i : _clients) {
if (i->connected()) {
if (i.get() != nullptr && i->connected()) {
++n_clients;
}
}
Expand All @@ -462,11 +485,15 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) {
request->send(new AsyncEventSourceResponse(this));
}

// list iteration protected by caller's lock
void AsyncEventSource::_adjust_inflight_window() {
if (_clients.size()) {
size_t inflight = SSE_MAX_INFLIGH / _clients.size();
const size_t clientCount = count();
if (clientCount) {
size_t inflight = SSE_MAX_INFLIGH / clientCount;
for (const auto &c : _clients) {
c->set_max_inflight_bytes(inflight);
if (c.get() != nullptr && c->connected()) {
c->set_max_inflight_bytes(inflight);
}
}
// Serial.printf("adjusted inflight to: %u\n", inflight);
}
Expand Down
4 changes: 4 additions & 0 deletions src/AsyncEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ class AsyncEventSource : public AsyncWebHandler {
// close all connected clients
void close();

// Cleanup internal resources.
// Has to be called periodically in the loop
void cleanup();

/**
* @brief set on-connect callback for the client
* used to deliver messages to client on first connect
Expand Down