diff --git a/src/AsyncEventSource.cpp b/src/AsyncEventSource.cpp index 7b95b48a..1109d056 100644 --- a/src/AsyncEventSource.cpp +++ b/src/AsyncEventSource.cpp @@ -217,8 +217,12 @@ bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) { forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP */ - if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client->canSend()) { + if (_client && _client->canSend() && _messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2) { _runQueue(); + + } else if (!_client) { + _messageQueue.clear(); + return false; } return true; @@ -243,9 +247,14 @@ bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) { forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP */ - if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client->canSend()) { + if (_client && _client->canSend() && _messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2) { _runQueue(); + + } else if (!_client) { + _messageQueue.clear(); + return false; } + return true; } @@ -334,7 +343,7 @@ void AsyncEventSourceClient::_runQueue() { } // flush socket - if (total_bytes_written) { + if (total_bytes_written && _client) { _client->send(); } } @@ -360,7 +369,21 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient *client) { #ifdef ESP32 std::lock_guard lock(_client_queue_lock); #endif - _clients.emplace_back(client); + + // find first unique_ptr with nullptr and reuse it + bool reused = false; + for (auto &c : _clients) { + if (c.get() == nullptr) { + c.reset(client); + reused = true; + break; + } + } + + if (!reused) { + _clients.emplace_back(client); + } + if (_connectcb) { _connectcb(client); } @@ -377,7 +400,7 @@ void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) { #endif for (auto i = _clients.begin(); i != _clients.end(); ++i) { if (i->get() == client) { - _clients.erase(i); + i->reset(); // reset the unique_ptr but do not remove the list entry yet to keep other iterators valid break; } } @@ -392,7 +415,7 @@ void AsyncEventSource::close() { std::lock_guard lock(_client_queue_lock); #endif for (const auto &c : _clients) { - if (c->connected()) { + if (c.get() != nullptr && c->connected()) { /** * @brief: Fix self-deadlock by using recursive_mutex instead. * Due to c->close() shall call the callback function _onDisconnect() @@ -403,6 +426,17 @@ void AsyncEventSource::close() { } } +void AsyncEventSource::cleanup() { +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif + + // resize the list to remove unique_ptr with nullptr + _clients.remove_if([](const std::unique_ptr &c) { + return c.get() == nullptr; + }); +} + // pmb fix size_t AsyncEventSource::avgPacketsWaiting() const { size_t aql = 0; @@ -410,17 +444,13 @@ size_t AsyncEventSource::avgPacketsWaiting() const { #ifdef ESP32 std::lock_guard lock(_client_queue_lock); #endif - if (!_clients.size()) { - return 0; - } - for (const auto &c : _clients) { - if (c->connected()) { + if (c.get() != nullptr && c->connected()) { aql += c->packetsWaiting(); ++nConnectedClients; } } - return ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up + return nConnectedClients == 0 ? 0 : ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up } AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) { @@ -431,10 +461,12 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c size_t hits = 0; size_t miss = 0; for (const auto &c : _clients) { - if (c->write(shared_msg)) { - ++hits; - } else { - ++miss; + if (c.get() != nullptr && c->connected()) { + if (c->write(shared_msg)) { + ++hits; + } else { + ++miss; + } } } return hits == 0 ? DISCARDED : (miss == 0 ? ENQUEUED : PARTIALLY_ENQUEUED); @@ -446,7 +478,7 @@ size_t AsyncEventSource::count() const { #endif size_t n_clients{0}; for (const auto &i : _clients) { - if (i->connected()) { + if (i.get() != nullptr && i->connected()) { ++n_clients; } } @@ -462,11 +494,15 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) { request->send(new AsyncEventSourceResponse(this)); } +// list iteration protected by caller's lock void AsyncEventSource::_adjust_inflight_window() { - if (_clients.size()) { - size_t inflight = SSE_MAX_INFLIGH / _clients.size(); + const size_t clientCount = count(); + if (clientCount) { + size_t inflight = SSE_MAX_INFLIGH / clientCount; for (const auto &c : _clients) { - c->set_max_inflight_bytes(inflight); + if (c.get() != nullptr && c->connected()) { + c->set_max_inflight_bytes(inflight); + } } // Serial.printf("adjusted inflight to: %u\n", inflight); } diff --git a/src/AsyncEventSource.h b/src/AsyncEventSource.h index 69c755ff..a8bd2a1a 100644 --- a/src/AsyncEventSource.h +++ b/src/AsyncEventSource.h @@ -272,6 +272,10 @@ class AsyncEventSource : public AsyncWebHandler { // close all connected clients void close(); + // Cleanup internal resources. + // Has to be called periodically in the loop + void cleanup(); + /** * @brief set on-connect callback for the client * used to deliver messages to client on first connect