diff --git a/components/esp_modem/Kconfig b/components/esp_modem/Kconfig index 242b58b6fa..3b8a1a6287 100644 --- a/components/esp_modem/Kconfig +++ b/components/esp_modem/Kconfig @@ -16,6 +16,17 @@ menu "esp-modem" in command mode might come fragmented in rare cases so might need to retry AT commands. + config ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED + bool "Use inflatable buffer in DCE" + default n + help + If enabled we will process the ongoing AT command by growing the current + buffer (if we've run out the preconfigured buffer). + If disabled, we simply report a failure. + Use this if additional allocation is not a problem and you need to reliably process + all commands, usually with sporadically longer responses than the configured buffer. + Could be also used to defragment AT replies in CMUX mode if CMUX_DEFRAGMENT_PAYLOAD=n + config ESP_MODEM_CMUX_DELAY_AFTER_DLCI_SETUP int "Delay in ms to wait before creating another virtual terminal" default 0 diff --git a/components/esp_modem/include/cxx_include/esp_modem_dte.hpp b/components/esp_modem/include/cxx_include/esp_modem_dte.hpp index 2ee56099a6..5ac9b0d5ad 100644 --- a/components/esp_modem/include/cxx_include/esp_modem_dte.hpp +++ b/components/esp_modem/include/cxx_include/esp_modem_dte.hpp @@ -79,6 +79,13 @@ class DTE : public CommandableIf { */ void set_read_cb(std::function f); + /** + * @brief Sets read callback for manual command processing + * Note that this API also locks the command API, which can only be used + * after you remove the callback by dte->on_read(nullptr) + * + * @param on_data Function to be called when a command response is available + */ void on_read(got_line_cb on_data) override; /** @@ -122,7 +129,6 @@ class DTE : public CommandableIf { } friend class Scoped; /*!< Declaring "Scoped lock(dte)" locks this instance */ private: - static const size_t GOT_LINE = SignalGroup::bit0; /*!< Bit indicating response available */ [[nodiscard]] bool setup_cmux(); /*!< Internal setup of CMUX mode */ [[nodiscard]] bool exit_cmux(); /*!< Exit of CMUX mode */ @@ -133,9 +139,68 @@ class DTE : public CommandableIf { std::shared_ptr primary_term; /*!< Reference to the primary terminal (mostly for sending commands) */ std::shared_ptr secondary_term; /*!< Secondary terminal for this DTE */ modem_mode mode; /*!< DTE operation mode */ - SignalGroup signal; /*!< Event group used to signal request-response operations */ - command_result result; /*!< Command result of the currently exectuted command */ std::function on_data; /*!< on data callback for current terminal */ + +#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED + /** + * @brief Implements an extra buffer that is used to capture partial reads from underlying terminals + * when we run out of the standard buffer + */ + struct extra_buffer { + extra_buffer(): buffer(nullptr) {} + ~extra_buffer() + { + delete buffer; + } + std::vector *buffer; + size_t consumed{0}; + void grow(size_t need_size); + void deflate() + { + grow(0); + consumed = 0; + } + [[nodiscard]] uint8_t *begin() const + { + return &buffer->at(0); + } + [[nodiscard]] uint8_t *current() const + { + return &buffer->at(0) + consumed; + } + } inflatable; +#endif // CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED + + /** + * @brief Set internal command callbacks to the underlying terminal. + * Here we capture command replies to be processed by supplied command callbacks in struct command_cb. + */ + void set_command_callbacks(); + + /** + * @brief This abstracts command callback processing and implements its locking, signaling of completion and timeouts. + */ + struct command_cb { + static const size_t GOT_LINE = SignalGroup::bit0; /*!< Bit indicating response available */ + got_line_cb got_line; /*!< Supplied command callback */ + Lock line_lock{}; /*!< Command callback locking mechanism */ + char separator{}; /*!< Command reply separator (end of line/processing unit) */ + command_result result{}; /*!< Command return code */ + SignalGroup signal; /*!< Event group used to signal request-response operations */ + bool process_line(uint8_t *data, size_t consumed, size_t len); /*!< Lets the processing callback handle one line (processing unit) */ + bool wait_for_line(uint32_t time_ms); /*!< Waiting for command processing */ + void set(got_line_cb l, char s = '\n') /*!< Sets the command callback atomically */ + { + Scoped lock(line_lock); + got_line = std::move(l); + separator = s; + } + void give_up() /*!< Reports other than timeout error when processing replies (out of buffer) */ + { + result = command_result::FAIL; + signal.set(GOT_LINE); + } + } command_cb; /*!< Command callback utility class */ }; /** diff --git a/components/esp_modem/src/esp_modem_dte.cpp b/components/esp_modem/src/esp_modem_dte.cpp index 98e3ff6874..5e715680a7 100644 --- a/components/esp_modem/src/esp_modem_dte.cpp +++ b/components/esp_modem/src/esp_modem_dte.cpp @@ -17,53 +17,118 @@ static const size_t dte_default_buffer_size = 1000; DTE::DTE(const esp_modem_dte_config *config, std::unique_ptr terminal): buffer(config->dte_buffer_size), cmux_term(nullptr), primary_term(std::move(terminal)), secondary_term(primary_term), - mode(modem_mode::UNDEF) {} + mode(modem_mode::UNDEF) +{ + set_command_callbacks(); +} DTE::DTE(std::unique_ptr terminal): buffer(dte_default_buffer_size), cmux_term(nullptr), primary_term(std::move(terminal)), secondary_term(primary_term), - mode(modem_mode::UNDEF) {} + mode(modem_mode::UNDEF) +{ + set_command_callbacks(); +} DTE::DTE(const esp_modem_dte_config *config, std::unique_ptr t, std::unique_ptr s): buffer(config->dte_buffer_size), cmux_term(nullptr), primary_term(std::move(t)), secondary_term(std::move(s)), - mode(modem_mode::DUAL_MODE) {} + mode(modem_mode::UNDEF) +{ + set_command_callbacks(); +} DTE::DTE(std::unique_ptr t, std::unique_ptr s): buffer(dte_default_buffer_size), cmux_term(nullptr), primary_term(std::move(t)), secondary_term(std::move(s)), - mode(modem_mode::DUAL_MODE) {} + mode(modem_mode::UNDEF) +{ + set_command_callbacks(); +} -command_result DTE::command(const std::string &command, got_line_cb got_line, uint32_t time_ms, const char separator) +void DTE::set_command_callbacks() { - Scoped l(internal_lock); - result = command_result::TIMEOUT; - signal.clear(GOT_LINE); - primary_term->set_read_cb([this, got_line, separator](uint8_t *data, size_t len) { - if (!data) { + primary_term->set_read_cb([this](uint8_t *data, size_t len) { + Scoped l(command_cb.line_lock); + if (command_cb.got_line == nullptr) { + return false; + } + if (data) { + // For terminals which post data directly with the callback (CMUX) + // we cannot defragment unless we allocate, but + // we'll try to process the data on the actual buffer +#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED + if (inflatable.consumed != 0) { + inflatable.grow(inflatable.consumed + len); + std::memcpy(inflatable.current(), data, len); + data = inflatable.begin(); + } + if (command_cb.process_line(data, inflatable.consumed, len)) { + return true; + } + // at this point we're sure that the data processing hasn't finished, + // and we have to grow the inflatable buffer (if enabled) or give up + if (inflatable.consumed == 0) { + inflatable.grow(len); + std::memcpy(inflatable.begin(), data, len); + } + inflatable.consumed += len; + return false; +#else + if (command_cb.process_line(data, 0, len)) { + return true; + } + // cannot inflate and the processing hasn't finishes in the first iteration -> report a failure + command_cb.give_up(); + return true; +#endif + } + // data == nullptr: Terminals which request users to read current data + // we're able to use DTE's buffer to defragment it; as long as we consume less that the buffer size + if (buffer.size > buffer.consumed) { data = buffer.get(); len = primary_term->read(data + buffer.consumed, buffer.size - buffer.consumed); - } else { - buffer.consumed = 0; // if the underlying terminal contains data, we cannot fragment - } - if (memchr(data + buffer.consumed, separator, len)) { - result = got_line(data, buffer.consumed + len); - if (result == command_result::OK || result == command_result::FAIL) { - signal.set(GOT_LINE); + if (command_cb.process_line(data, buffer.consumed, len)) { return true; } + buffer.consumed += len; + return false; + } + // we have used the entire DTE's buffer, need to use the inflatable buffer to continue +#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED + if (inflatable.consumed == 0) { + inflatable.grow(buffer.size + len); + std::memcpy(inflatable.begin(), buffer.get(), buffer.size); + inflatable.consumed = buffer.size; + } else { + inflatable.grow(inflatable.consumed + len); + } + len = primary_term->read(inflatable.current(), len); + if (command_cb.process_line(inflatable.begin(), inflatable.consumed, len)) { + return true; } - buffer.consumed += len; + inflatable.consumed += len; return false; +#else + // cannot inflate -> report a failure + command_cb.give_up(); + return true; +#endif }); +} + +command_result DTE::command(const std::string &command, got_line_cb got_line, uint32_t time_ms, const char separator) +{ + Scoped l1(internal_lock); + command_cb.set(got_line, separator); primary_term->write((uint8_t *)command.c_str(), command.length()); - auto got_lf = signal.wait(GOT_LINE, time_ms); - if (got_lf && result == command_result::TIMEOUT) { - ESP_MODEM_THROW_IF_ERROR(ESP_ERR_INVALID_STATE); - } + command_cb.wait_for_line(time_ms); + command_cb.set(nullptr); buffer.consumed = 0; - primary_term->set_read_cb(nullptr); - return result; +#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED + inflatable.deflate(); +#endif + return command_cb.result; } command_result DTE::command(const std::string &cmd, got_line_cb got_line, uint32_t time_ms) @@ -81,6 +146,7 @@ bool DTE::exit_cmux() primary_term = std::move(ejected.first); buffer = std::move(ejected.second); secondary_term = primary_term; + set_command_callbacks(); return true; } @@ -98,6 +164,7 @@ bool DTE::setup_cmux() return false; } secondary_term = std::make_unique(cmux_term, 1); + set_command_callbacks(); return true; } @@ -169,6 +236,10 @@ bool DTE::set_mode(modem_mode m) void DTE::set_read_cb(std::function f) { + if (f == nullptr) { + set_command_callbacks(); + return; + } on_data = std::move(f); secondary_term->set_read_cb([this](uint8_t *data, size_t len) { if (!data) { // if no data available from terminal callback -> need to explicitly read some @@ -230,6 +301,41 @@ void DTE::on_read(got_line_cb on_read_cb) }); } +bool DTE::command_cb::process_line(uint8_t *data, size_t consumed, size_t len) +{ + if (memchr(data + consumed, separator, len)) { + result = got_line(data, consumed + len); + if (result == command_result::OK || result == command_result::FAIL) { + signal.set(GOT_LINE); + return true; + } + } + return false; +} + +bool DTE::command_cb::wait_for_line(uint32_t time_ms) +{ + auto got_lf = signal.wait(command_cb::GOT_LINE, time_ms); + if (got_lf && result == command_result::TIMEOUT) { + ESP_MODEM_THROW_IF_ERROR(ESP_ERR_INVALID_STATE); + } + return got_lf; +} + +#ifdef CONFIG_ESP_MODEM_USE_INFLATABLE_BUFFER_IF_NEEDED +void DTE::extra_buffer::grow(size_t need_size) +{ + if (need_size == 0) { + delete buffer; + buffer = nullptr; + } else if (buffer == nullptr) { + buffer = new std::vector(need_size); + } else { + buffer->resize(need_size); + } +} +#endif + /** * Implemented here to keep all headers C++11 compliant */ diff --git a/components/esp_modem/src/esp_modem_uart.cpp b/components/esp_modem/src/esp_modem_uart.cpp index 13f0e83e7f..deb84ddf2a 100644 --- a/components/esp_modem/src/esp_modem_uart.cpp +++ b/components/esp_modem/src/esp_modem_uart.cpp @@ -64,7 +64,6 @@ class UartTerminal : public Terminal { void set_read_cb(std::function f) override { - ESP_MODEM_THROW_IF_FALSE(signal.wait(TASK_PARAMS, 1000), "Failed to set UART task params"); on_read = std::move(f); } @@ -91,7 +90,6 @@ class UartTerminal : public Terminal { static const size_t TASK_INIT = BIT0; static const size_t TASK_START = BIT1; static const size_t TASK_STOP = BIT2; - static const size_t TASK_PARAMS = BIT3; QueueHandle_t event_queue; uart_resource uart; @@ -118,9 +116,7 @@ void UartTerminal::task() return; // exits to the static method where the task gets deleted } while (signal.is_any(TASK_START)) { - signal.set(TASK_PARAMS); if (get_event(event, 100)) { - signal.clear(TASK_PARAMS); switch (event.type) { case UART_DATA: uart_get_buffered_data_len(uart.port, &len);