Skip to content

Commit

Permalink
feat(esp): support flash with a different port
Browse files Browse the repository at this point in the history
  • Loading branch information
hfudev committed May 28, 2024
1 parent 81158ea commit ac8e2cc
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 39 deletions.
121 changes: 121 additions & 0 deletions docs/concepts/serial.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
###################################################################
Understand :class:`~pytest_embedded_serial.serial.Serial` Objects
###################################################################

The ``Serial`` object is the main object that you will use for testing. This chapter will explain the basic concepts of the ``Serial`` object.

.. note::

This chapter is mainly for developers who want to understand the internal structure of the ``Serial`` object. If you are a user who just wants to use the ``Serial`` object, you can skip this chapter.

************************************************
:class:`~pytest_embedded_serial.serial.Serial`
************************************************

- :func:`__init__`

- decide port

Support auto-detecting port by port location

- :func:`_post_init`

occupy ports globally. Used for preventing other tests from using the same port while auto-detecting ports.

- :func:`_start`

doing nothing

- :func:`_finalize_init`

doing nothing

- :func:`start_redirect_thread`

Start the redirect thread. Read data from the serial port and write it to the log file, optionally echoing it to the console.

- :func:`close`:

- :func:`stop_redirect_thread`

Stop the redirect thread.

- close serial connection

- release the occupied port globally

***********************************************************************************************************************
:class:`~pytest_embedded_serial_esp.serial.EspSerial` (Inherited from :class:`~pytest_embedded_serial.serial.Serial`)
***********************************************************************************************************************

- :func:`__init__`

- :func:`_before_init_port` (newly added method before deciding port)

- parent class :func:`_post_init`

- decide port

Support auto-detecting port by device MAC, or device target. (Espressif-chips only)

- :func:`_post_init`

- Call :func:`set_port_target_cache`, speed up auto-detection next time

- erase flash if set :attr:`erase_all`, and not set :attr:`flash_port`

since if :attr:`flash_port` is set, the "erase" and "flash"" process will be done earlier already.

- parent class :func:`_post_init`

- :func:`_start`

Run :func:`esptool.hard_reset`

*******************************************************************************************************************************
:class:`~pytest_embedded_arduino.serial.ArduinoSerial` (Inherited from :class:`~pytest_embedded_serial_esp.serial.EspSerial`)
*******************************************************************************************************************************

- :func:`__init__`

- :func:`_start`

Auto-flash the app if not :attr:`skip_autoflash`

***********************************************************************************************************************
:class:`~pytest_embedded_idf.serial.IdfSerial` (Inherited from :class:`~pytest_embedded_serial_esp.serial.EspSerial`)
***********************************************************************************************************************

- :func:`__init__`

- :func:`_before_init_port`

If :attr:`flash_port` is set differently from the :attr:`port`, the target chip will always be flashed with the given port(without the port-app cache)

- Occupying the :attr:`flash_port` globally
- erase flash if set :attr:`erase_all`
- Flash the app if not set :attr:`skip_autoflash`
- :func:`set_port_target_cache` for the flash port
- :func:`set_port_app_cache` for the flash port

- :func:`_post_init`

- if set :attr:`flash_port`, do nothing
- otherwise, check port-app cache, if the app has been flashed, skip the auto-flash process
- Run parent :func:`_post_init`

- :func:`_start`

- if the target has been flashed while :func:`_before_init_port`, set the port-app cache with the :attr:`port` and :attr:`app` and do nothing
- otherwise, run :func:`flash` automatically the app if not set :attr:`skip_autoflash`
- Run parent :func:`_start`

- :func:`flash`

- flash the app
- :func:`set_port_app_cache` for the flash port

- :func:`close`

- release the occupied flash port globally
- Run parent :func:`close`
2 changes: 1 addition & 1 deletion pytest-embedded-idf/pytest_embedded_idf/dut.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def setup_jtag(self):

def flash_via_jtag(self):
if not self.openocd:
logging.warning("no openocd instance created. can't flash via openocd `program_esp`")
logging.debug('no openocd instance created. NOT flashing via openocd `program_esp`')
return

if self.app.is_loadable_elf:
Expand Down
132 changes: 113 additions & 19 deletions pytest-embedded-idf/pytest_embedded_idf/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import logging
import os
import tempfile
import time
from typing import Optional, TextIO, Union

import esptool
from pytest_embedded.log import MessageQueue
from pytest_embedded_serial_esp.serial import EspSerial

from .app import IdfApp
Expand Down Expand Up @@ -44,9 +46,56 @@ def __init__(
**kwargs,
)

def _post_init(self):
def _before_init_port(self, q: MessageQueue):
if not self.flash_port:
return

self.occupied_ports[self.flash_port] = None
logging.debug(f'occupied {self.flash_port}')

if self.erase_all:
self.skip_autoflash = False
self.skip_autoflash = False # do we really need it? may be a breaking change if removed

cmd = ['--port', self.flash_port, 'erase_flash']
if self._force_flag():
cmd.append('--force')

with contextlib.redirect_stdout(q):
esptool.main(cmd)

if self._meta:
self._meta.drop_port_app_cache(self.flash_port)

if self.skip_autoflash:
return

if self.erase_nvs:
_args, _kwargs = self._get_erase_nvs_cli_args(port=self.flash_port)
with contextlib.redirect_stdout(q):
esptool.main(_args, **_kwargs)

# this is required to wait for the reset happened after erase the nvs partition
time.sleep(1)

_args, _kwargs = self._get_flash_cli_args(port=self.flash_port)
with contextlib.redirect_stdout(q):
esptool.main(_args, **_kwargs)

# after flash caches
# here instead of ``_finalize_init`` to avoid occupying the port wrongly while multi-DUT test case
self._flashed_with_different_port = True
if self._meta:
self._meta.set_port_target_cache(self.flash_port, self.app.target)
self._meta.set_port_app_cache(self.flash_port, self.app)

# this is required to wait for the reset happened after flashing
time.sleep(1)

def _post_init(self):
if self._flashed_with_different_port:
pass
elif self.erase_all:
self.skip_autoflash = False # do we really need it? may be a breaking change if removed
elif self._meta and self._meta.hit_port_app_cache(self.port, self.app):
if self.confirm_target_elf_sha256:
if self.is_target_flashed_same_elf():
Expand All @@ -66,6 +115,13 @@ def _post_init(self):
super()._post_init()

def _start(self):
if self._flashed_with_different_port:
if self._meta:
self._meta.set_port_app_cache(self.port, self.app)

super()._start()
return

if self.skip_autoflash:
logging.info('Skipping auto flash...')
super()._start()
Expand All @@ -75,6 +131,13 @@ def _start(self):
else:
self.flash()

def close(self):
if self._flashed_with_different_port:
self.occupied_ports.pop(self.flash_port, None)
logging.debug(f'released {self.flash_port}')

super().close()

def load_ram(self) -> None:
if not self.app.is_loadable_elf:
raise ValueError('elf should be loadable elf')
Expand Down Expand Up @@ -131,11 +194,12 @@ def erase_flash(self, force: bool = False):
else:
super().erase_flash()

@EspSerial.use_esptool()
def flash(self, app: Optional[IdfApp] = None) -> None:
"""
Flash the `app.flash_files` to the dut
"""
def _get_flash_cli_args(
self,
*,
app: Optional[IdfApp] = None,
port: Optional[str] = None,
):
if not app:
app = self.app

Expand All @@ -148,6 +212,8 @@ def flash(self, app: Optional[IdfApp] = None) -> None:
return

_args = []
_kwargs = {}

for k, v in app.flash_args['extra_esptool_args'].items():
if isinstance(v, bool):
if k == 'stub':
Expand All @@ -166,17 +232,6 @@ def flash(self, app: Optional[IdfApp] = None) -> None:
_args.extend(['--baud', os.getenv('ESPBAUD', '921600')])
_args.append('write_flash')

if self.erase_nvs:
esptool.main(
[
'erase_region',
str(app.partition_table['nvs']['offset']),
str(app.partition_table['nvs']['size']),
],
esp=self.esp,
)
self.esp.connect()

encrypt_files = []
flash_files = []
for file in app.flash_files:
Expand All @@ -195,7 +250,46 @@ def flash(self, app: Optional[IdfApp] = None) -> None:

_args.extend([*app.flash_args['write_flash_args'], *self._force_flag(app)])

esptool.main(_args, esp=self.esp)
if port:
_args = ['--port', port, *_args]
else:
_kwargs.update({'esp': self.esp})

return _args, _kwargs

def _get_erase_nvs_cli_args(self, app: Optional[IdfApp] = None, port: Optional[str] = None):
if not app:
app = self.app

_args = [
'erase_region',
str(app.partition_table['nvs']['offset']),
str(app.partition_table['nvs']['size']),
]
_kwargs = {}

if port:
_args = ['--port', port, *_args]
else:
_kwargs.update({'esp': self.esp})

return _args, _kwargs

@EspSerial.use_esptool()
def flash(self, app: Optional[IdfApp] = None) -> None:
"""
Flash the app to the target device, or the app provided.
"""
if not app:
app = self.app

if self.erase_nvs:
_args, _kwargs = self._get_erase_nvs_cli_args(app=app)
esptool.main(_args, **_kwargs)
self.esp.connect()

_args, _kwargs = self._get_flash_cli_args(app=app)
esptool.main(_args, **_kwargs)

if self._meta:
self._meta.set_port_app_cache(self.port, app)
Expand Down
Loading

0 comments on commit ac8e2cc

Please sign in to comment.