Skip to content

Pytest embedded serial esp

pytest_embedded_serial_esp.serial

EspSerial

Bases: Serial

Serial class for ports connected to espressif products

Attributes:

Name Type Description
esp esptool.ESPLoader

esptool.ESPLoader, will auto upload stub.

stub esptool.ESPLoader

esptool.ESPStubLoader, stubbed loader.

Source code in pytest_embedded_serial_esp/serial.py
class EspSerial(Serial):
    """
    Serial class for ports connected to espressif products

    Attributes:
        esp: esptool.ESPLoader, will auto upload stub.
        stub: esptool.ESPStubLoader, stubbed loader.
    """

    ESPTOOL_DEFAULT_BAUDRATE = 921600

    try:
        # esptool>=4.0
        from esptool.loader import ESPLoader

        ESPTOOL_VERSION = EsptoolVersion.V4
    except (AttributeError, ModuleNotFoundError):
        # esptool<4.0
        from esptool import ESPLoader

        ESPTOOL_VERSION = EsptoolVersion.V3

    def __init__(
        self,
        pexpect_proc: PexpectProcess,
        target: Optional[str] = None,
        port: Optional[str] = None,
        baud: int = Serial.DEFAULT_BAUDRATE,
        esptool_baud: int = ESPTOOL_DEFAULT_BAUDRATE,
        skip_autoflash: bool = False,
        erase_all: bool = False,
        port_target_cache: Dict[str, str] = None,
        **kwargs,
    ) -> None:
        self._port_target_cache: Dict[str, str] = port_target_cache if port_target_cache is not None else {}

        if port is None:
            available_ports = esptool.get_port_list()
            ports = list(set(available_ports) - set(self.occupied_ports.keys()))

            # sort to make /dev/ttyS* ports before /dev/ttyUSB* ports
            # esptool will reverse the list
            ports.sort()

            # prioritize the cache recorded target port
            if target:
                for _port, _target in self._port_target_cache.items():
                    if _target == target and _port in ports:
                        ports.sort(key=lambda x: x == _port)
                        logging.debug('hit port-target cache: %s - %s', _port, _target)

            logging.debug(f'Detecting ports from {", ".join(ports)}')
        else:
            ports = [port]

        with DuplicateStdout(pexpect_proc):
            # normal loader
            self.esp: esptool.ESPLoader = esptool.get_default_connected_device(
                ports, port=port, connect_attempts=3, initial_baud=baud, chip=target
            )
            if not self.esp:
                raise ValueError('Couldn\'t auto detect chip. Please manually specify with "--port"')

            # stub loader has more functionalities, need to run after calling `run_stub()`
            self.stub: esptool.ESPLoader = self.esp.run_stub()

        target = self.esp.CHIP_NAME.lower().replace('-', '')
        logging.info(f'Target: %s, Port: %s', target, self.esp.serial_port)

        self.target = target

        self.skip_autoflash = skip_autoflash
        self.erase_all = erase_all
        self.esptool_baud = esptool_baud
        super().__init__(pexpect_proc, port=self.esp._port, baud=baud, **kwargs)

    def _post_init(self):
        logging.debug('set port-target cache: %s - %s', self.port, self.target)
        self._port_target_cache[self.port] = self.target
        super()._post_init()

    def use_esptool(func):
        """
        1. close the port and open the port to kill the `self._forward_io` thread
        2. call `run_stub()`
        3. call to the decorated function, could use `self.stub` as the stubbed loader
        4. call `hard_reset()`
        5. create the `self.forward_io` thread again.
        """

        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            with self.disable_redirect_thread() as killed:
                settings = self.proc.get_settings()
                try:
                    with DuplicateStdout(self.pexpect_proc):
                        if killed:
                            self.esp.connect('hard_reset')
                            self.stub = self.esp.run_stub()
                        ret = func(self, *args, **kwargs)
                        self.stub.hard_reset()
                finally:
                    self.proc.apply_settings(settings)

            return ret

        return wrapper

    def _start(self):
        self.hard_reset()

    @use_esptool
    def hard_reset(self):
        """Hard reset your espressif device"""
        pass

use_esptool(func)

  1. close the port and open the port to kill the self._forward_io thread
  2. call run_stub()
  3. call to the decorated function, could use self.stub as the stubbed loader
  4. call hard_reset()
  5. create the self.forward_io thread again.
Source code in pytest_embedded_serial_esp/serial.py
def use_esptool(func):
    """
    1. close the port and open the port to kill the `self._forward_io` thread
    2. call `run_stub()`
    3. call to the decorated function, could use `self.stub` as the stubbed loader
    4. call `hard_reset()`
    5. create the `self.forward_io` thread again.
    """

    @functools.wraps(func)
    def wrapper(self, *args, **kwargs):
        with self.disable_redirect_thread() as killed:
            settings = self.proc.get_settings()
            try:
                with DuplicateStdout(self.pexpect_proc):
                    if killed:
                        self.esp.connect('hard_reset')
                        self.stub = self.esp.run_stub()
                    ret = func(self, *args, **kwargs)
                    self.stub.hard_reset()
            finally:
                self.proc.apply_settings(settings)

        return ret

    return wrapper

hard_reset()

Hard reset your espressif device

Source code in pytest_embedded_serial_esp/serial.py
@use_esptool
def hard_reset(self):
    """Hard reset your espressif device"""
    pass