Skip to content

Commit

Permalink
web_sockets: Fix handling of stalled workers
Browse files Browse the repository at this point in the history
Workers might hang if the httpd's control socket hangs,
usually caused by stuck loopback packet processing,
which can be fixed by sending another loopback packet.
Do that at regular intervals if the worker hangs, but
not too often because the worker is expected to hang
during a firmware upload.
  • Loading branch information
MattiasTF committed Apr 10, 2024
1 parent e805f54 commit 5f5e2bc
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 34 deletions.
35 changes: 35 additions & 0 deletions software/src/tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "esp_system.h"
#include "esp_timer.h"
#include "freertos/task.h"
#include "lwip/udp.h"

#include <soc/efuse_reg.h>
#include "bindings/base58.h"
Expand Down Expand Up @@ -849,6 +850,40 @@ void dns_gethostbyname_addrtype_lwip_ctx_async(const char *hostname,
found_callback(callback_arg);
}

static esp_err_t poke_localhost_fn(void * /*ctx*/)
{
udp_pcb *l_udp_pcb = udp_new();
if (l_udp_pcb) {
//udp_bind(l_udp_pcb, IP_ADDR_ANY, 0); // pcb will be bound in udp_sendto()

struct pbuf *p = pbuf_alloc(PBUF_TRANSPORT, 0, PBUF_ROM); // PBUF_ROM because we have no payload
if (p) {
p->payload = nullptr; // payload can be nullptr because length is 0 and pbuf type is PBUF_ROM
p->len = 0;
p->tot_len = 0;

ip_addr_t dst_addr;
dst_addr.type = IPADDR_TYPE_V4;
dst_addr.u_addr.ip4.addr = htonl(IPADDR_LOOPBACK);

errno = 0;
err_t err = udp_sendto(l_udp_pcb,p, &dst_addr, 9);
if (err != ERR_OK) {
logger.printfln("udp_sendto failed: %i | %s (%i)", err, strerror(errno), errno);
}

pbuf_free(p);
}
udp_remove(l_udp_pcb);
}
return ESP_OK; // Don't care about errors.
}

void poke_localhost()
{
esp_netif_tcpip_exec(poke_localhost_fn, nullptr);
}

void trigger_reboot(const char *initiator)
{
task_scheduler.scheduleOnce([initiator]() {
Expand Down
2 changes: 2 additions & 0 deletions software/src/tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ void dns_gethostbyname_addrtype_lwip_ctx_async(const char *hostname,
dns_gethostbyname_addrtype_lwip_ctx_async_data *callback_arg,
u8_t dns_addrtype);

void poke_localhost();

void trigger_reboot(const char *initiator);

time_t ms_until_datetime(int *year, int *month, int *day, int *hour, int *minutes, int *seconds);
Expand Down
66 changes: 34 additions & 32 deletions software/src/web_sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
#include "api.h"
#include "event_log.h"
#include "task_scheduler.h"
#include "tools.h"
#include "web_server.h"

#include "esp_httpd_priv.h"

#define KEEP_ALIVE_TIMEOUT_MS 10000
#define WORKER_START_ERROR_MIN_UPTIME_FOR_REBOOT 60 * 60 * 1000

#if MODULE_WATCHDOG_AVAILABLE()
#define WORKER_WATCHDOG_TIMEOUT (5 * 60 * 1000)
static int watchdog_handle = -1;
#endif

Expand Down Expand Up @@ -80,7 +81,9 @@ bool WebSockets::queueFull()
if (work_queue.size() >= MAX_WEB_SOCKET_WORK_ITEMS_IN_QUEUE) {
return true;
}
logger.printfln("Work queue was full but %u items were cleaned.", MAX_WEB_SOCKET_WORK_ITEMS_IN_QUEUE - work_queue.size());
// Print only to the console because printing to the event log
// would generate more websocket messages to fill up the queue.
printf("web_sockets: Work queue was full but %u items were cleaned.\n", MAX_WEB_SOCKET_WORK_ITEMS_IN_QUEUE - work_queue.size());

return false;
}
Expand Down Expand Up @@ -127,7 +130,6 @@ static void work(void *arg)
clear_ws_work_item(&wi);
}

ws->worker_start_errors = 0;
ws->worker_active = WEBSOCKET_WORKER_DONE;
#if MODULE_WATCHDOG_AVAILABLE()
watchdog.reset(watchdog_handle);
Expand Down Expand Up @@ -482,49 +484,50 @@ bool WebSockets::sendToAll(const char *payload, size_t payload_len)
return true;
}

static uint32_t last_worker_run = 0;

void WebSockets::triggerHttpThread()
{
if (worker_active == WEBSOCKET_WORKER_RUNNING) {
return;
}

if (worker_active == WEBSOCKET_WORKER_ENQUEUED) {
// Protect against lost UDP packet in httpd_queue_work control socket.
// If the packet that enqueues the worker is lost
// worker_active must be reset or web sockets will never send data again.
if (last_worker_run != 0 && deadline_elapsed(last_worker_run + KEEP_ALIVE_TIMEOUT_MS * 2)) {
logger.printfln("Worker did not start for %u seconds. Control socket drop? Retrying.", (KEEP_ALIVE_TIMEOUT_MS * 2) / 1000U);
last_worker_run = millis();
worker_active = WEBSOCKET_WORKER_DONE;

worker_start_errors += (KEEP_ALIVE_TIMEOUT_MS * 2) / 100; // count a hanging worker as if we've attempted to start the worker the whole time.

std::lock_guard<std::recursive_mutex> lock{work_queue_mutex};
while (!work_queue.empty()) {
ws_work_item *wi = &work_queue.front();
clear_ws_work_item(wi);
work_queue.pop_front();
}
// Protect against stuck localhost communication that blocks the
// httpd_queue_work control socket. While the worker is enqueued,
// poke localhost in regular intervals to get things going again.
// A poll count of 32 results in a poke interval of roughly 4s.
if ((++worker_poll_count) % 32 == 0) {
// Don't log this because it happens constantly during a firmware upload.
//logger.printfln("Poking localhost to get the worker unstuck");
poke_localhost();
}
return;
}

last_worker_run = millis();
/*
// Don't schedule work task if no work is pending.
// Schedule it anyway once in a while to reset the watchdog.
#if MODULE_WATCHDOG_AVAILABLE()
if (!deadline_elapsed(last_worker_run + WORKER_WATCHDOG_TIMEOUT / 8))
#endif
{
std::lock_guard<std::recursive_mutex> lock{work_queue_mutex};
if (work_queue.empty()) {
return;
}
}
*/

// If we don't set worker_active to "enqueued" BEFORE enqueueing the worker,
// we can be preempted after enqueueing, but before we set worker_active to "enqueued"
// the worker can then run to completion, we then set worker_active to "enqueued" and are
// NEVER able to start the worker again.
worker_active = WEBSOCKET_WORKER_ENQUEUED;
if (httpd_queue_work(server.httpd, work, this) != ESP_OK) {
logger.printfln("Failed to start WebSocket worker!");
errno = 0;
err_t err = httpd_queue_work(server.httpd, work, this);
if (err == ESP_OK) {
last_worker_run = millis();
worker_poll_count = 0;
} else {
logger.printfln("Failed to start WebSocket worker: %i | %s (%i)", err, strerror(errno), errno);
worker_active = WEBSOCKET_WORKER_DONE;
++worker_start_errors;
}
}

Expand Down Expand Up @@ -581,11 +584,10 @@ void WebSockets::start(const char *uri)
}, 1000, 1000);

#if MODULE_WATCHDOG_AVAILABLE()
task_scheduler.scheduleOnce([this]() {
watchdog_handle = watchdog.add(
"websocket_worker",
"Websocket worker was not able to start for five minutes. The control socket is probably dead.");
}, WORKER_START_ERROR_MIN_UPTIME_FOR_REBOOT);
watchdog_handle = watchdog.add(
"websocket_worker",
"Websocket worker was not able to start for five minutes. The control socket is probably dead.",
WORKER_WATCHDOG_TIMEOUT);
#endif

task_scheduler.scheduleWithFixedDelay([this](){
Expand Down
5 changes: 3 additions & 2 deletions software/src/web_sockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void clear_ws_work_item(ws_work_item *wi);
class WebSockets
{
public:
WebSockets() : worker_active(WEBSOCKET_WORKER_DONE), worker_start_errors(0)
WebSockets() : worker_active(WEBSOCKET_WORKER_DONE)
{
}

Expand Down Expand Up @@ -102,7 +102,8 @@ class WebSockets
std::deque<ws_work_item> work_queue;

std::atomic<uint8_t> worker_active;
std::atomic<uint32_t> worker_start_errors;
uint32_t last_worker_run = 0;
uint32_t worker_poll_count = 0;

std::function<void(WebSocketsClient)> on_client_connect_fn;

Expand Down

0 comments on commit 5f5e2bc

Please sign in to comment.