Skip to content

Pytest embedded idf

pytest_embedded_idf.app

IdfApp

Bases: App

Idf App class

Attributes:

Name Type Description
elf_file str

elf file path

flash_args dict[str, Any]

dict of flasher_args.json

flash_files list[FlashFile]

list of (offset, file path, encrypted) of files need to be flashed in

flash_settings dict[str, Any]

dict of flash settings

Source code in pytest_embedded_idf/app.py
class IdfApp(App):
    """
    Idf App class

    Attributes:
        elf_file (str): elf file path
        flash_args (dict[str, Any]): dict of flasher_args.json
        flash_files (list[FlashFile]): list of (offset, file path, encrypted) of files need to be flashed in
        flash_settings (dict[str, Any]): dict of flash settings
    """

    FLASH_ARGS_FILENAME = 'flash_args'
    FLASH_PROJECT_ARGS_FILENAME = 'flash_project_args'
    FLASH_ARGS_JSON_FILENAME = 'flasher_args.json'

    def __init__(
        self,
        part_tool: Optional[str] = None,
        **kwargs,
    ):
        super().__init__(**kwargs)

        # Optional info
        self._sdkconfig = None
        self._target = None
        # the partition table is used for nvs
        self._parttool = part_tool
        self._partition_table = None

        if not self.binary_path:
            logging.debug('Binary path not specified, skipping parsing app...')
            return

        # Required if binary path exists
        self.elf_file = self._get_elf_file()

        # loadable elf file skip the rest of these
        if self.sdkconfig.get('APP_BUILD_TYPE_ELF_RAM'):
            self.is_loadable_elf = True
        else:
            self.is_loadable_elf = False

        self.bin_file = None
        self.flash_args, self.flash_files, self.flash_settings = {}, [], {}

        if not self.is_loadable_elf:
            self.bin_file = self._get_bin_file()
            self.flash_args, self.flash_files, self.flash_settings = self._parse_flash_args_json()

    @property
    def parttool_path(self) -> str:
        """
        Returns:
            Partition tool path
        """
        parttool_filepath = self._parttool or os.path.join(
            os.getenv('IDF_PATH', ''),
            'components',
            'partition_table',
            'gen_esp32part.py',
        )
        if os.path.isfile(parttool_filepath):
            return os.path.realpath(parttool_filepath)
        raise ValueError('Partition Tool not found. (Default: $IDF_PATH/components/partition_table/gen_esp32part.py)')

    @property
    def sdkconfig(self) -> Dict[str, Any]:
        """
        Returns:
            dict contains all k-v pairs from the sdkconfig file
        """
        if self._sdkconfig is not None:
            return self._sdkconfig

        sdkconfig_json_path = os.path.join(self.binary_path, 'config', 'sdkconfig.json')
        if not os.path.isfile(sdkconfig_json_path):
            logging.warning(f'{sdkconfig_json_path} doesn\'t exist. Skipping...')
            self._sdkconfig = {}
        else:
            self._sdkconfig = json.load(open(sdkconfig_json_path))
        return self._sdkconfig

    @property
    def target(self) -> str:
        """
        Returns:
            target chip type
        """
        if self.sdkconfig:
            return self.sdkconfig.get('IDF_TARGET', 'esp32')
        else:
            return self.flash_args.get('extra_esptool_args', {}).get('chip', 'esp32')

    @property
    def partition_table(self) -> Dict[str, Any]:
        """
        Returns:
            partition table dict generated by the partition tool
        """
        if self._partition_table is not None:
            return self._partition_table

        partition_file = os.path.join(
            self.binary_path,
            self.flash_args.get('partition_table', self.flash_args.get('partition-table', {})).get('file', ''),
        )
        process = subprocess.Popen(
            [sys.executable, self.parttool_path, partition_file],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        stdout, stderr = process.communicate()
        raw_data = stdout.decode() if isinstance(stdout, bytes) else stdout

        partition_table = {}
        for line in raw_data.splitlines():
            if line[0] != '#':
                try:
                    _name, _type, _subtype, _offset, _size, _flags = line.split(',')
                    if _size[-1] == 'K':
                        _size = int(_size[:-1]) * 1024
                    elif _size[-1] == 'M':
                        _size = int(_size[:-1]) * 1024 * 1024
                    else:
                        _size = int(_size)
                    _offset = int(_offset, 0)
                except ValueError:
                    continue
                partition_table[_name] = {
                    'type': _type,
                    'subtype': _subtype,
                    'offset': _offset,
                    'size': _size,
                    'flags': _flags,
                }
        self._partition_table = partition_table
        return self._partition_table

    def _get_elf_file(self) -> Optional[str]:
        for fn in os.listdir(self.binary_path):
            if os.path.splitext(fn)[-1] == '.elf':
                return os.path.realpath(os.path.join(self.binary_path, fn))

        return None

    def _get_bin_file(self) -> str:
        for fn in os.listdir(self.binary_path):
            if os.path.splitext(fn)[-1] == '.bin':
                return os.path.realpath(os.path.join(self.binary_path, fn))
        raise ValueError(f'Bin file under {self.binary_path} not found')

    def _parse_flash_args(self) -> List[str]:
        flash_args_filepath = None
        for fn in os.listdir(self.binary_path):
            if fn in [self.FLASH_PROJECT_ARGS_FILENAME, self.FLASH_ARGS_FILENAME]:
                flash_args_filepath = os.path.realpath(os.path.join(self.binary_path, fn))
                break

        if not flash_args_filepath:
            raise ValueError(
                f'{self.FLASH_PROJECT_ARGS_FILENAME} or {self.FLASH_ARGS_FILENAME} '
                f'is not found under {self.binary_path}'
            )

        with open(flash_args_filepath) as fr:
            return shlex.split(fr.read().strip())

    def _parse_flash_args_json(
        self,
    ) -> Tuple[Dict[str, Any], List[FlashFile], Dict[str, str]]:
        flash_args_json_filepath = None
        for fn in os.listdir(self.binary_path):
            if fn == self.FLASH_ARGS_JSON_FILENAME:
                flash_args_json_filepath = os.path.realpath(os.path.join(self.binary_path, fn))
                break

        if not flash_args_json_filepath:
            raise ValueError(f'{self.FLASH_ARGS_JSON_FILENAME} not found')

        with open(flash_args_json_filepath) as fr:
            flash_args = json.load(fr)

        def _is_encrypted(_flash_args: Dict[str, Any], _offset: int, _file_path: str):
            for entry in _flash_args.values():
                try:
                    if (entry['offset'], entry['file']) == (_offset, _file_path):
                        return entry['encrypted'] == 'true'
                except (TypeError, KeyError):
                    continue

            return False

        flash_files = []
        for (offset, file_path) in flash_args['flash_files'].items():
            flash_files.append(
                FlashFile(
                    int(offset, 0),
                    os.path.join(self.binary_path, file_path),
                    _is_encrypted(flash_args, offset, file_path),
                )
            )

        flash_files.sort()
        flash_settings = flash_args['flash_settings']
        flash_settings['encrypt'] = any([file.encrypted for file in flash_files])

        return flash_args, flash_files, flash_settings

    def get_sha256(self, filepath: str) -> Optional[str]:
        """
        Get the sha256 of the file

        Args:
            filepath: path to the file

        Returns:
            sha256 value appended to app
        """
        from esptool.bin_image import LoadFirmwareImage
        from esptool.util import hexify

        image = LoadFirmwareImage(self.target, filepath)
        if image.append_digest:
            return hexify(image.stored_digest).lower()
        return None

get_sha256(filepath)

Get the sha256 of the file

Parameters:

Name Type Description Default
filepath str

path to the file

required

Returns:

Type Description
Optional[str]

sha256 value appended to app

Source code in pytest_embedded_idf/app.py
def get_sha256(self, filepath: str) -> Optional[str]:
    """
    Get the sha256 of the file

    Args:
        filepath: path to the file

    Returns:
        sha256 value appended to app
    """
    from esptool.bin_image import LoadFirmwareImage
    from esptool.util import hexify

    image = LoadFirmwareImage(self.target, filepath)
    if image.append_digest:
        return hexify(image.stored_digest).lower()
    return None

partition_table() property

Returns:

Type Description
Dict[str, Any]

partition table dict generated by the partition tool

Source code in pytest_embedded_idf/app.py
@property
def partition_table(self) -> Dict[str, Any]:
    """
    Returns:
        partition table dict generated by the partition tool
    """
    if self._partition_table is not None:
        return self._partition_table

    partition_file = os.path.join(
        self.binary_path,
        self.flash_args.get('partition_table', self.flash_args.get('partition-table', {})).get('file', ''),
    )
    process = subprocess.Popen(
        [sys.executable, self.parttool_path, partition_file],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    stdout, stderr = process.communicate()
    raw_data = stdout.decode() if isinstance(stdout, bytes) else stdout

    partition_table = {}
    for line in raw_data.splitlines():
        if line[0] != '#':
            try:
                _name, _type, _subtype, _offset, _size, _flags = line.split(',')
                if _size[-1] == 'K':
                    _size = int(_size[:-1]) * 1024
                elif _size[-1] == 'M':
                    _size = int(_size[:-1]) * 1024 * 1024
                else:
                    _size = int(_size)
                _offset = int(_offset, 0)
            except ValueError:
                continue
            partition_table[_name] = {
                'type': _type,
                'subtype': _subtype,
                'offset': _offset,
                'size': _size,
                'flags': _flags,
            }
    self._partition_table = partition_table
    return self._partition_table

parttool_path() property

Returns:

Type Description
str

Partition tool path

Source code in pytest_embedded_idf/app.py
@property
def parttool_path(self) -> str:
    """
    Returns:
        Partition tool path
    """
    parttool_filepath = self._parttool or os.path.join(
        os.getenv('IDF_PATH', ''),
        'components',
        'partition_table',
        'gen_esp32part.py',
    )
    if os.path.isfile(parttool_filepath):
        return os.path.realpath(parttool_filepath)
    raise ValueError('Partition Tool not found. (Default: $IDF_PATH/components/partition_table/gen_esp32part.py)')

sdkconfig() property

Returns:

Type Description
Dict[str, Any]

dict contains all k-v pairs from the sdkconfig file

Source code in pytest_embedded_idf/app.py
@property
def sdkconfig(self) -> Dict[str, Any]:
    """
    Returns:
        dict contains all k-v pairs from the sdkconfig file
    """
    if self._sdkconfig is not None:
        return self._sdkconfig

    sdkconfig_json_path = os.path.join(self.binary_path, 'config', 'sdkconfig.json')
    if not os.path.isfile(sdkconfig_json_path):
        logging.warning(f'{sdkconfig_json_path} doesn\'t exist. Skipping...')
        self._sdkconfig = {}
    else:
        self._sdkconfig = json.load(open(sdkconfig_json_path))
    return self._sdkconfig

target() property

Returns:

Type Description
str

target chip type

Source code in pytest_embedded_idf/app.py
@property
def target(self) -> str:
    """
    Returns:
        target chip type
    """
    if self.sdkconfig:
        return self.sdkconfig.get('IDF_TARGET', 'esp32')
    else:
        return self.flash_args.get('extra_esptool_args', {}).get('chip', 'esp32')

pytest_embedded_idf.serial

IdfSerial

Bases: EspSerial

IDF serial Dut class

Auto flash the app while starting test.

Source code in pytest_embedded_idf/serial.py
class IdfSerial(EspSerial):
    """
    IDF serial Dut class

    Auto flash the app while starting test.
    """

    SUGGEST_FLASH_BAUDRATE = 921600
    DEFAULT_SHA256_OFFSET = 0xB0

    def __init__(
        self,
        app: IdfApp,
        target: Optional[str] = None,
        confirm_target_elf_sha256: bool = False,
        erase_nvs: bool = False,
        **kwargs,
    ) -> None:
        self.app = app
        self.confirm_target_elf_sha256 = confirm_target_elf_sha256
        self.erase_nvs = erase_nvs

        if not hasattr(self.app, 'target'):
            raise ValueError(f'Idf app not parsable. Please check if it\'s valid: {self.app.binary_path}')

        if target and self.app.target and self.app.target != target:
            raise ValueError(f'Targets do not match. App target: {self.app.target}, Cmd target: {target}.')

        super().__init__(
            target=target or app.target,
            **kwargs,
        )

    def _post_init(self):
        if self.erase_all:
            self.skip_autoflash = False
        elif self._meta and self._meta.hit_port_app_cache(self.port, self.app):
            if self.confirm_target_elf_sha256:
                if self.is_target_flashed_same_elf():
                    logging.info('Confirmed target elf file sha256 the same as your local one.')
                    self.skip_autoflash = True
                else:
                    logging.info('target elf file is different from your local one. Flash the binary again.')
                    self.skip_autoflash = False
            else:
                logging.info(
                    'App is the same according to the session cache. '
                    'you can use flag "--confirm-target-elf-sha256" to make sure '
                    'that the target elf file is the same as your local one.'
                )
                self.skip_autoflash = True

        super()._post_init()

    def _start(self):
        if self.skip_autoflash:
            logging.info('Skipping auto flash...')
            super()._start()
        else:
            if self.app.is_loadable_elf:
                self.load_ram()
            else:
                self.flash()

    def load_ram(self) -> None:
        if not self.app.is_loadable_elf:
            raise ValueError('elf should be loadable elf')

        live_print_call(
            [
                'esptool.py',
                '--chip',
                self.app.target,
                'elf2image',
                self.app.elf_file,
                *self.app._parse_flash_args(),
            ],
            msg_queue=self._q,
        )
        live_print_call(
            [
                'esptool.py',
                '--chip',
                self.app.target,
                '--no-stub',
                'load_ram',
                self.app.elf_file.replace('.elf', '.bin'),
            ],
            msg_queue=self._q,
        )

    @EspSerial.use_esptool()
    def flash(self) -> None:
        """
        Flash the `app.flash_files` to the dut
        """
        if not self.app.flash_files:
            logging.error('No flash files detected. Skipping auto flash...')
            return

        if not self.app.flash_settings:
            logging.error('No flash settings detected. Skipping auto flash...')
            return

        flash_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if not file.encrypted]
        encrypt_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if file.encrypted]

        nvs_file = None
        try:
            if self.erase_nvs:
                address = self.app.partition_table['nvs']['offset']
                size = self.app.partition_table['nvs']['size']
                nvs_file = tempfile.NamedTemporaryFile(delete=False)
                nvs_file.write(b'\xff' * size)
                if not isinstance(address, int):
                    address = int(address, 0)

                if self.app.flash_settings['encrypt']:
                    encrypt_files.append((address, open(nvs_file.name, 'rb')))
                else:
                    flash_files.append((address, open(nvs_file.name, 'rb')))

            # write_flash expects the parameter encrypt_files to be None and not
            # an empty list, so perform the check here
            default_kwargs = {
                'addr_filename': flash_files,
                'encrypt_files': encrypt_files or None,
                'no_stub': False,
                'compress': True,
                'verify': False,
                'ignore_flash_encryption_efuse_setting': False,
                'erase_all': False,
                'force': False,
            }

            default_kwargs.update(self.app.flash_settings)
            default_kwargs.update(self.app.flash_args.get('extra_esptool_args', {}))
            args = EsptoolArgs(**default_kwargs)

            self.stub.change_baud(self.esptool_baud)
            esptool.detect_flash_size(self.stub, args)
            esptool.write_flash(self.stub, args)
            self.stub.change_baud(self.baud)

            if self._meta:
                self._meta.set_port_app_cache(self.port, self.app)
        finally:
            if nvs_file:
                nvs_file.close()
                try:
                    os.remove(nvs_file.name)
                except OSError:
                    pass
            for (_, f) in flash_files:
                f.close()
            for (_, f) in encrypt_files:
                f.close()

    @EspSerial.use_esptool()
    def dump_flash(
        self,
        partition: Optional[str] = None,
        address: Optional[str] = None,
        size: Optional[str] = None,
        output: Union[str, TextIO, None] = None,
    ) -> Optional[bytes]:
        """
        Dump the flash bytes into the output file by partition name or by start address and size.

        Args:
            output: file path or file stream to write to. File stream should be opened with bytes mode.
            partition: partition name
            address: address that start reading from
            size: read size

        Returns:
            None if `output` is `str` or file stream.
            bytes if `output` is None.
        """
        if partition:
            partition = self.app.partition_table[partition]
            _addr = partition['offset']
            _size = partition['size']
        elif address and size:
            _addr = address
            _size = size
        else:
            raise ValueError('You must specify "partition" or ("address" and "size") to dump flash')

        content = self.stub.read_flash(_addr, _size)
        if output:
            if isinstance(output, str):
                os.makedirs(os.path.dirname(output), exist_ok=True)
                with open(output, 'wb') as f:
                    f.write(content)
            else:
                output.write(content)
        else:
            return content

    @EspSerial.use_esptool()
    def erase_partition(self, partition_name: str) -> None:
        """
        Erase the partition provided

        Args:
            partition_name: partition name
        """
        if not self.app.partition_table:
            raise ValueError('Partition table not parsed.')

        if partition_name in self.app.partition_table:
            address = self.app.partition_table[partition_name]['offset']
            size = self.app.partition_table[partition_name]['size']
            logging.info(f'Erasing the partition "{partition_name}" of size {size} at {address}')
            self.stub.erase_region(address, size)
        else:
            raise ValueError(f'partition name "{partition_name}" not found in app partition table')

    @EspSerial.use_esptool()
    def read_flash_elf_sha256(self) -> bytes:
        """
        Read the sha256 digest of the flashed elf file

        Returns:
            bytes of sha256
        """
        bin_offset = None
        for offset, filepath, _ in self.app.flash_files:
            if self.app.bin_file == filepath:
                bin_offset = offset
                break

        if not bin_offset:
            raise ValueError('.bin file not found in flash files')

        return self.stub.read_flash(bin_offset + self.DEFAULT_SHA256_OFFSET, 32)

    def is_target_flashed_same_elf(self) -> bool:
        """
        Check if the sha256 values are matched between the flashed target and the `self.app.elf_file`

        Returns:
            True if the sha256 values are matched
        """
        if not self.app.elf_file:
            logging.info('no elf file. Can\'t tell if the target flashed the same elf file or not. Assume as False')
            return False

        flash_elf_sha256 = self.read_flash_elf_sha256()
        elf_sha256 = hashlib.sha256()
        with open(self.app.elf_file, 'rb') as fr:
            elf_sha256.update(fr.read())

        return flash_elf_sha256 == elf_sha256.digest()

dump_flash(partition=None, address=None, size=None, output=None)

Dump the flash bytes into the output file by partition name or by start address and size.

Parameters:

Name Type Description Default
output Union[str, TextIO, None]

file path or file stream to write to. File stream should be opened with bytes mode.

None
partition Optional[str]

partition name

None
address Optional[str]

address that start reading from

None
size Optional[str]

read size

None

Returns:

Type Description
Optional[bytes]

None if output is str or file stream.

Optional[bytes]

bytes if output is None.

Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool()
def dump_flash(
    self,
    partition: Optional[str] = None,
    address: Optional[str] = None,
    size: Optional[str] = None,
    output: Union[str, TextIO, None] = None,
) -> Optional[bytes]:
    """
    Dump the flash bytes into the output file by partition name or by start address and size.

    Args:
        output: file path or file stream to write to. File stream should be opened with bytes mode.
        partition: partition name
        address: address that start reading from
        size: read size

    Returns:
        None if `output` is `str` or file stream.
        bytes if `output` is None.
    """
    if partition:
        partition = self.app.partition_table[partition]
        _addr = partition['offset']
        _size = partition['size']
    elif address and size:
        _addr = address
        _size = size
    else:
        raise ValueError('You must specify "partition" or ("address" and "size") to dump flash')

    content = self.stub.read_flash(_addr, _size)
    if output:
        if isinstance(output, str):
            os.makedirs(os.path.dirname(output), exist_ok=True)
            with open(output, 'wb') as f:
                f.write(content)
        else:
            output.write(content)
    else:
        return content

erase_partition(partition_name)

Erase the partition provided

Parameters:

Name Type Description Default
partition_name str

partition name

required
Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool()
def erase_partition(self, partition_name: str) -> None:
    """
    Erase the partition provided

    Args:
        partition_name: partition name
    """
    if not self.app.partition_table:
        raise ValueError('Partition table not parsed.')

    if partition_name in self.app.partition_table:
        address = self.app.partition_table[partition_name]['offset']
        size = self.app.partition_table[partition_name]['size']
        logging.info(f'Erasing the partition "{partition_name}" of size {size} at {address}')
        self.stub.erase_region(address, size)
    else:
        raise ValueError(f'partition name "{partition_name}" not found in app partition table')

flash()

Flash the app.flash_files to the dut

Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool()
def flash(self) -> None:
    """
    Flash the `app.flash_files` to the dut
    """
    if not self.app.flash_files:
        logging.error('No flash files detected. Skipping auto flash...')
        return

    if not self.app.flash_settings:
        logging.error('No flash settings detected. Skipping auto flash...')
        return

    flash_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if not file.encrypted]
    encrypt_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if file.encrypted]

    nvs_file = None
    try:
        if self.erase_nvs:
            address = self.app.partition_table['nvs']['offset']
            size = self.app.partition_table['nvs']['size']
            nvs_file = tempfile.NamedTemporaryFile(delete=False)
            nvs_file.write(b'\xff' * size)
            if not isinstance(address, int):
                address = int(address, 0)

            if self.app.flash_settings['encrypt']:
                encrypt_files.append((address, open(nvs_file.name, 'rb')))
            else:
                flash_files.append((address, open(nvs_file.name, 'rb')))

        # write_flash expects the parameter encrypt_files to be None and not
        # an empty list, so perform the check here
        default_kwargs = {
            'addr_filename': flash_files,
            'encrypt_files': encrypt_files or None,
            'no_stub': False,
            'compress': True,
            'verify': False,
            'ignore_flash_encryption_efuse_setting': False,
            'erase_all': False,
            'force': False,
        }

        default_kwargs.update(self.app.flash_settings)
        default_kwargs.update(self.app.flash_args.get('extra_esptool_args', {}))
        args = EsptoolArgs(**default_kwargs)

        self.stub.change_baud(self.esptool_baud)
        esptool.detect_flash_size(self.stub, args)
        esptool.write_flash(self.stub, args)
        self.stub.change_baud(self.baud)

        if self._meta:
            self._meta.set_port_app_cache(self.port, self.app)
    finally:
        if nvs_file:
            nvs_file.close()
            try:
                os.remove(nvs_file.name)
            except OSError:
                pass
        for (_, f) in flash_files:
            f.close()
        for (_, f) in encrypt_files:
            f.close()

is_target_flashed_same_elf()

Check if the sha256 values are matched between the flashed target and the self.app.elf_file

Returns:

Type Description
bool

True if the sha256 values are matched

Source code in pytest_embedded_idf/serial.py
def is_target_flashed_same_elf(self) -> bool:
    """
    Check if the sha256 values are matched between the flashed target and the `self.app.elf_file`

    Returns:
        True if the sha256 values are matched
    """
    if not self.app.elf_file:
        logging.info('no elf file. Can\'t tell if the target flashed the same elf file or not. Assume as False')
        return False

    flash_elf_sha256 = self.read_flash_elf_sha256()
    elf_sha256 = hashlib.sha256()
    with open(self.app.elf_file, 'rb') as fr:
        elf_sha256.update(fr.read())

    return flash_elf_sha256 == elf_sha256.digest()

read_flash_elf_sha256()

Read the sha256 digest of the flashed elf file

Returns:

Type Description
bytes

bytes of sha256

Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool()
def read_flash_elf_sha256(self) -> bytes:
    """
    Read the sha256 digest of the flashed elf file

    Returns:
        bytes of sha256
    """
    bin_offset = None
    for offset, filepath, _ in self.app.flash_files:
        if self.app.bin_file == filepath:
            bin_offset = offset
            break

    if not bin_offset:
        raise ValueError('.bin file not found in flash files')

    return self.stub.read_flash(bin_offset + self.DEFAULT_SHA256_OFFSET, 32)

pytest_embedded_idf.dut

IdfDut

Bases: SerialDut

Dut class for serial ports connect to Espressif boards which are flashed with ESP-IDF apps

Attributes:

Name Type Description
target str

target chip type

skip_check_coredump bool

skip check core dumped or not while dut teardown if set to True

Source code in pytest_embedded_idf/dut.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
class IdfDut(SerialDut):
    """
    Dut class for serial ports connect to Espressif boards which are flashed with ESP-IDF apps

    Attributes:
        target (str): target chip type
        skip_check_coredump (bool): skip check core dumped or not while dut teardown if set to True
    """

    XTENSA_TARGETS = ['esp32', 'esp32s2', 'esp32s3']
    RISCV32_TARGETS = ['esp32c3', 'esp32h2', 'esp32c2']

    COREDUMP_UART_START = b'================= CORE DUMP START ================='
    COREDUMP_UART_END = b'================= CORE DUMP END ================='
    COREDUMP_UART_REGEX = re.compile(COREDUMP_UART_START + b'(.+?)' + COREDUMP_UART_END, re.DOTALL)

    # panic handler related messages
    PANIC_START = b'register dump:'
    PANIC_END = b'ELF file SHA256:'

    app: IdfApp

    def __init__(
        self,
        app: IdfApp,
        skip_check_coredump: bool = False,
        panic_output_decode_script: str = None,
        **kwargs,
    ) -> None:
        self.target = app.target
        self.skip_check_coredump = skip_check_coredump
        self._panic_output_decode_script = panic_output_decode_script
        self._test_menu: t.List[UnittestMenuCase] = None  # type: ignore

        super().__init__(app=app, **kwargs)

    @property
    def toolchain_prefix(self) -> str:
        """
        Returns:
            Toolchain prefix according to the `self.target`
        """
        if self.target in self.XTENSA_TARGETS:
            return f'xtensa-{self.target}-elf-'
        elif self.target in self.RISCV32_TARGETS:
            return f'riscv32-esp-elf-'
        else:
            raise ValueError(f'Unknown target: {self.target}')

    @property
    def panic_output_decode_script(self) -> t.Optional[str]:
        """
        Returns:
            Panic output decode script path
        """
        script_filepath = self._panic_output_decode_script or os.path.join(
            os.getenv('IDF_PATH', 'IDF_PATH'),
            'tools',
            'gdb_panic_server.py',
        )
        if not os.path.isfile(script_filepath):
            raise ValueError(
                'Panic output decode script not found. Please use --panic-output-decode-script flag '
                'to provide script or set IDF_PATH (Default: $IDF_PATH/tools/gdb_panic_server.py)'
            )
        return os.path.realpath(script_filepath)

    def _check_panic_decode_trigger(self):  # type: () -> None
        with open(self.logfile, 'rb') as output_file:
            output = output_file.read()
        # get the panic output by looking for the indexes
        # of the first occurrences of PANIC_START and PANIC_END patterns
        panic_output_idx_start = output.find(self.PANIC_START) - 10
        panic_output_idx_end = output.find(self.PANIC_END, output.find(self.PANIC_START) + 1) + 15
        panic_output_res = output[panic_output_idx_start:panic_output_idx_end]
        panic_output = panic_output_res if panic_output_res else None
        if panic_output is None:
            return
        with tempfile.NamedTemporaryFile(mode='wb', delete=False) as panic_output_file:
            panic_output_file.write(panic_output)
            panic_output_file.flush()
        try:
            cmd = [
                f'{self.toolchain_prefix}-gdb',
                '--command',
                f'{self.app.app_path}/build/prefix_map_gdbinit',
                '--batch',
                '-n',
                self.app.elf_file,
                '-ex',
                "target remote | \"{python}\" \"{script}\" --target {target} \"{output_file}\"".format(
                    python=sys.executable,
                    script=self.panic_output_decode_script,
                    target=self.target,
                    output_file=panic_output_file.name,
                ),
                '-ex',
                'bt',
            ]
            output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
            logging.info('\n\nBacktrace:\n')
            logging.info(output.decode())  # noqa: E999
        except subprocess.CalledProcessError as e:
            logging.debug(f'Failed to run gdb_panic_server.py script: {e}\n{e.output}\n\n')
            logging.info(panic_output.decode())
        finally:
            if panic_output_file is not None:
                try:
                    os.unlink(panic_output_file.name)
                except OSError as e:
                    logging.debug(f'Couldn\'t remove temporary panic output file ({e})')

    def _check_coredump(self) -> None:
        """
        Handle errors by panic_handler_script or check core dumps via UART or partition table.
        Write the decoded or read core dumps into separated files.

        For UART and panic output, would read the `_pexpect_logfile` file.
        For partition, would read the flash according to the partition table. needs a valid `parttool_path`.

        Notes:
            - May include multiple core dumps, since each test case may include several unity test cases.
            - May have duplicated core dumps, since after the core dump happened, the target chip would reboot
            automatically.

        Returns:
            None
        """
        if self.target in self.RISCV32_TARGETS:
            self._check_panic_decode_trigger()  # need IDF_PATH
        if self.app.sdkconfig.get('ESP_COREDUMP_ENABLE_TO_UART', False):
            self._dump_b64_coredumps()
        elif self.app.sdkconfig.get('ESP_COREDUMP_ENABLE_TO_FLASH', False):
            self._dump_flash_coredump()
        else:
            logging.debug('core dump disabled')

    def _dump_b64_coredumps(self) -> None:
        if not self.app.elf_file:
            logging.debug('no elf file. skipping dumping core dumps')
            return

        from esp_coredump import CoreDump  # need IDF_PATH

        with open(self.logfile, 'rb') as fr:
            s = fr.read()

            for i, coredump in enumerate(set(self.COREDUMP_UART_REGEX.findall(s))):  # may duplicate
                coredump_file = None
                try:
                    with tempfile.NamedTemporaryFile(mode='wb', delete=False) as coredump_file:
                        coredump_file.write(coredump.strip().replace(b'\r', b''))
                        coredump_file.flush()

                    coredump = CoreDump(
                        chip=self.target,
                        core=coredump_file.name,
                        core_format='b64',
                        prog=self.app.elf_file,
                    )
                    with open(os.path.join(self._meta.logdir, f'coredump_output_{i}'), 'w') as fw:
                        with redirect_stdout(fw):
                            coredump.info_corefile()
                finally:
                    if coredump_file:
                        os.remove(coredump_file.name)

    def _dump_flash_coredump(self) -> None:
        if not self.app.elf_file:
            logging.debug('no elf file. skipping dumping core dumps')
            return

        from esp_coredump import CoreDump  # need IDF_PATH

        if self.app.sdkconfig['ESP_COREDUMP_DATA_FORMAT_ELF']:
            core_format = 'elf'
        elif self.app.sdkconfig['ESP_COREDUMP_DATA_FORMAT_BIN']:
            core_format = 'raw'
        else:
            raise ValueError('Invalid coredump format. Use _parse_b64_coredump for UART')

        with self.serial.disable_redirect_thread():
            coredump = CoreDump(
                chip=self.target,
                core_format=core_format,
                port=self.serial.port,
                prog=self.app.elf_file,
            )
            with open(os.path.join(self._meta.logdir, 'coredump_output'), 'w') as fw:
                with redirect_stdout(fw):
                    coredump.info_corefile()

    def close(self) -> None:
        if not self.skip_check_coredump:
            try:
                self._check_coredump()
            except ValueError as e:
                logging.debug(e)
        super().close()

    #####################
    # IDF-unity related #
    #####################
    def _parse_test_menu(
        self,
        ready_line: str = 'Press ENTER to see the list of tests',
        pattern="Here's the test menu, pick your combo:(.+)Enter test for running.",
        trigger: str = '',
    ) -> t.List[UnittestMenuCase]:
        """
        Get test case list from test menu via UART print.

        Args:
            ready_line: Prompt to indicate that device is ready to print test menu.
            pattern: Pattern to match the output from device, menu block should be in the first group.
                     This will be directly passed to `pexpect.expect()`.
            trigger: Keys to trigger device to print test menu by UART.

        Returns:
            A `list` of `UnittestMenuCase`, which includes info for each test case.
        """
        self.expect_exact(ready_line)
        self.write(trigger)
        menu_block = self.expect(pattern).group(1)
        s = str(menu_block, encoding='UTF-8')
        return self._parse_unity_menu_from_str(s)

    def parse_test_menu(
        self,
        ready_line: str = 'Press ENTER to see the list of tests',
        pattern="Here's the test menu, pick your combo:(.+)Enter test for running.",
        trigger: str = '',
    ) -> t.List[UnittestMenuCase]:
        warnings.warn(
            'Use `dut.test_menu` property directly, '
            'will rename this function to `_parse_test_menu` in release 2.0.0',
            DeprecationWarning,
        )

        return self._parse_test_menu(ready_line, pattern, trigger)

    @staticmethod
    def parse_unity_menu_from_str(s: str) -> t.List[UnittestMenuCase]:
        warnings.warn(
            'Please use `dut.test_menu` property directly, '
            'will rename this function to `_parse_unity_menu_from_str` in release 2.0.0',
            DeprecationWarning,
        )

        return IdfDut._parse_unity_menu_from_str(s)

    @staticmethod
    def _parse_unity_menu_from_str(s: str) -> t.List[UnittestMenuCase]:
        """
        Parse test case menu from string to list of `UnittestMenuCase`.

        Args:
            s: string include test case menu.

        Returns:
            A `list` of `UnittestMenuCase`, which includes info for each test case.
        """
        cases = s.splitlines()

        case_regex = re.compile(r'\((\d+)\)\s\"(.+)\"\s(\[.+\])+')
        subcase_regex = re.compile(r'\t\((\d+)\)\s\"(.+)\"')

        test_menu = []
        for case in cases:
            case_match = case_regex.match(case)
            if case_match is not None:
                index, name, tag_block = case_match.groups()
                tags = re.findall(r'\[(.+?)\]', tag_block)

                if 'multi_stage' in tags:
                    _type = 'multi_stage'
                    tags.remove('multi_stage')
                elif 'multi_device' in tags:
                    _type = 'multi_device'
                    tags.remove('multi_device')
                else:
                    _type = 'normal'

                keyword = []
                if 'ignore' in tags:
                    keyword.append('ignore')
                    tags.remove('ignore')
                elif 'disable' in tags:
                    keyword = 'disable'
                    tags.remove('disable')

                attributes = {}
                group = []
                for tag in tags:
                    if '=' in tag:
                        k, v = tag.replace(' ', '').split('=')
                        attributes[k] = v
                    else:
                        group.append(tag)

                test_menu.append(
                    UnittestMenuCase(
                        index=int(index),
                        name=name,
                        type=_type,
                        keywords=keyword,
                        groups=group,
                        attributes=attributes,
                        subcases=[],
                    )
                )
                continue
            subcase_match = subcase_regex.match(case)
            if subcase_match is not None:
                index, name = subcase_match.groups()
                test_menu[-1].subcases.append({'index': int(index), 'name': name})
                continue

            if case != '':
                raise NotImplementedError('Unrecognized test case:', case)

        return test_menu

    @property
    def test_menu(self) -> t.List[UnittestMenuCase]:
        if self._test_menu is None:
            self._test_menu = self._parse_test_menu()
            logging.debug('Successfully parsed unity test menu')
            self.serial.hard_reset()

        return self._test_menu

    def _record_single_unity_test_case(func):
        """
        The first argument of the function that is using this decorator must be `case`. passing with args.

        Notes:
            This function is better than `dut.expect_unity_output()` since it will record the test case even it core
                dumped during running the test case or other reasons that cause the final result block is uncaught.
        """

        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            _start_at = time.perf_counter()  # declare here in case hard reset failed
            _timeout = kwargs.get('timeout', 30)
            _case = args[0]

            try:
                # do it here since the first hard reset before test case shouldn't be counted in duration time
                if 'reset' in kwargs:
                    if kwargs.pop('reset'):
                        self.serial.hard_reset()

                _start_at = time.perf_counter()
                func(self, *args, **kwargs)
            finally:
                _timestamp = time.perf_counter()
                _log = ''
                try:
                    _timeout = _timeout - _timestamp + _start_at
                    if _timeout < 0:  # pexpect process would expect 30s if < 0
                        _timeout = 0
                    self.expect(UNITY_SUMMARY_LINE_REGEX, timeout=_timeout)
                except Exception:  # result block missing # noqa
                    pass
                else:  # result block exists
                    _log = remove_asci_color_code(self.pexpect_proc.before)
                finally:
                    _end_at = time.perf_counter()
                    self._add_single_unity_test_case(
                        _case, _log, additional_attrs={'time': round(_end_at - _start_at, 3)}
                    )

        return wrapper

    def _add_single_unity_test_case(
        self, case: UnittestMenuCase, log: t.Optional[t.AnyStr], additional_attrs: t.Optional[t.Dict[str, t.Any]] = None
    ):
        if log:
            # check format
            check = UNITY_FIXTURE_REGEX.search(log)
            if check:
                regex = UNITY_FIXTURE_REGEX
            else:
                regex = UNITY_BASIC_REGEX

            res = list(regex.finditer(log))
        else:
            res = []

        # real parsing
        if len(res) == 0:
            logging.warning(f'unity test case not found, use case {case.name} instead')
            attrs = {'name': case.name, 'result': 'FAIL', 'message': self.pexpect_proc.buffer_debug_str}
        elif len(res) == 1:
            attrs = {k: v for k, v in res[0].groupdict().items() if v is not None}
        else:
            warnings.warn('This function is for recording single unity test case only. Use the last matched one')
            attrs = {k: v for k, v in res[-1].groupdict().items() if v is not None}

        if additional_attrs:
            attrs.update(additional_attrs)

        testcase = TestCase(**attrs)
        self.testsuite.testcases.append(testcase)
        if testcase.result == 'FAIL':
            self.testsuite.attrs['failures'] += 1
        elif testcase.result == 'IGNORE':
            self.testsuite.attrs['skipped'] += 1
        else:
            self.testsuite.attrs['tests'] += 1

    @_record_single_unity_test_case
    def _run_normal_case(
        self,
        case: UnittestMenuCase,
        reset: bool = False,
        timeout: float = 30,
    ) -> None:
        """
        Run a specific normal case

        Notes:
            Will skip with a warning if the case type is not "normal"

        Args:
            case: the specific case that parsed in test menu
            reset: whether to perform a hardware reset before running a case
            timeout: timeout. (Default: 30 seconds)
        """
        if case.type != 'normal':
            logging.warning('case %s is not a normal case', case.name)
            return

        self.expect_exact(READY_PATTERN_LIST, timeout=timeout)
        self.write(str(case.index))
        self.expect_exact(f'Running {case.name}...', timeout=1)

    @_record_single_unity_test_case
    def _run_multi_stage_case(
        self,
        case: UnittestMenuCase,
        reset: bool = False,
        timeout: float = 30,
    ) -> None:
        """
        Run a specific multi_stage case

        Notes:
            Will skip with a warning if the case type is not "multi_stage"

        Args:
            case: the specific case that parsed in test menu
            reset: whether to perform a hardware reset before running a case
            timeout: timeout. (Default: 30 seconds)
        """
        if case.type != 'multi_stage':
            logging.warning('case %s is not a multi stage case', case.name)
            return

        _start_at = time.perf_counter()
        _timestamp = _start_at
        for sub_case in case.subcases:
            _timeout = timeout - _timestamp + _start_at
            if _timeout < 0:  # pexpect process would expect 30s if < 0
                _timeout = 0
            self.expect_exact(READY_PATTERN_LIST, timeout=_timeout)
            self.write(str(case.index))
            self.expect_exact(case.name, timeout=1)
            self.write(str(sub_case['index']))
            _timestamp = time.perf_counter()

    def run_all_single_board_cases(
        self,
        group: t.Optional[str] = None,
        reset: bool = False,
        timeout: float = 30,
        run_ignore_cases: bool = False,
    ):
        """
        Run all multi_stage cases

        Args:
            group: test case group
            reset: whether to perform a hardware reset before running a case
            timeout: timeout. (Default: 30 seconds)
            run_ignore_cases: run ignored test cases or not
        """
        for case in self.test_menu:
            if not group or group in case.groups:
                if not case.is_ignored or run_ignore_cases:
                    if case.type == 'normal':
                        self._run_normal_case(case, reset=reset, timeout=timeout)
                    elif case.type == 'multi_stage':
                        self._run_multi_stage_case(case, reset=reset, timeout=timeout)

    def write(self, data: t.AnyStr) -> None:
        data_str = to_str(data).strip('\n') or ''
        if data_str == '*':
            warnings.warn(
                'if you\'re using `dut.expect_exact("Press ENTER to see the list of tests"); '
                'dut.write("*"); dut.expect_unity_test_output()` to run esp-idf unity tests, '
                'please consider using `dut.run_all_single_board_cases()` instead. '
                'It could help record the duration time and the error messages even for crashed test cases.',
                UserHint,
            )

        if data_str and data_str[0] == '[' and data_str[-1] == ']':
            group_name = data_str[1:-1]
            warnings.warn(
                f'if you\'re using `dut.expect_exact("Press ENTER to see the list of tests"); '
                f'dut.write("{data_str}"); dut.expect_unity_test_output()` to run esp-idf unity tests, '
                f'please consider using `dut.run_all_single_board_cases(group="{group_name}")` instead. '
                f'It could help record the duration time and the error messages even for crashed test cases.',
                UserHint,
            )

        super().write(data)

    ################
    # JTAG related #
    ################
    def setup_jtag(self):
        super().setup_jtag()
        if self.gdb:
            self.gdb.write(f'file {self.app.elf_file}')

        run_flash = True
        if self._meta and self._meta.hit_port_app_cache(self.serial.port, self.app):
            run_flash = False

        if run_flash:
            self.flash_via_jtag()

    def flash_via_jtag(self):
        if not self.openocd:
            logging.warning('no openocd instance created. can\'t flash via openocd `program_esp`')
            return

        if self.app.is_loadable_elf:
            # loadable elf flash to ram. no cache.
            # load via test script.
            # For example:
            # self.gdb.write('mon reset halt')
            # self.gdb.write('thb *0x40007d54')
            # self.gdb.write('c')
            # self.gdb.write('load')
            return

        for _f in self.app.flash_files:
            if _f.encrypted:
                raise ValueError('Encrypted files can\'t be flashed in via JTAG')
            self.openocd.write(f'program_esp {_f.file_path} {hex(_f.offset)} verify')

        if self._meta:
            self._meta.set_port_app_cache(self.serial.port, self.app)

panic_output_decode_script() property

Returns:

Type Description
t.Optional[str]

Panic output decode script path

Source code in pytest_embedded_idf/dut.py
@property
def panic_output_decode_script(self) -> t.Optional[str]:
    """
    Returns:
        Panic output decode script path
    """
    script_filepath = self._panic_output_decode_script or os.path.join(
        os.getenv('IDF_PATH', 'IDF_PATH'),
        'tools',
        'gdb_panic_server.py',
    )
    if not os.path.isfile(script_filepath):
        raise ValueError(
            'Panic output decode script not found. Please use --panic-output-decode-script flag '
            'to provide script or set IDF_PATH (Default: $IDF_PATH/tools/gdb_panic_server.py)'
        )
    return os.path.realpath(script_filepath)

run_all_single_board_cases(group=None, reset=False, timeout=30, run_ignore_cases=False)

Run all multi_stage cases

Parameters:

Name Type Description Default
group t.Optional[str]

test case group

None
reset bool

whether to perform a hardware reset before running a case

False
timeout float

timeout. (Default: 30 seconds)

30
run_ignore_cases bool

run ignored test cases or not

False
Source code in pytest_embedded_idf/dut.py
def run_all_single_board_cases(
    self,
    group: t.Optional[str] = None,
    reset: bool = False,
    timeout: float = 30,
    run_ignore_cases: bool = False,
):
    """
    Run all multi_stage cases

    Args:
        group: test case group
        reset: whether to perform a hardware reset before running a case
        timeout: timeout. (Default: 30 seconds)
        run_ignore_cases: run ignored test cases or not
    """
    for case in self.test_menu:
        if not group or group in case.groups:
            if not case.is_ignored or run_ignore_cases:
                if case.type == 'normal':
                    self._run_normal_case(case, reset=reset, timeout=timeout)
                elif case.type == 'multi_stage':
                    self._run_multi_stage_case(case, reset=reset, timeout=timeout)

toolchain_prefix() property

Returns:

Type Description
str

Toolchain prefix according to the self.target

Source code in pytest_embedded_idf/dut.py
@property
def toolchain_prefix(self) -> str:
    """
    Returns:
        Toolchain prefix according to the `self.target`
    """
    if self.target in self.XTENSA_TARGETS:
        return f'xtensa-{self.target}-elf-'
    elif self.target in self.RISCV32_TARGETS:
        return f'riscv32-esp-elf-'
    else:
        raise ValueError(f'Unknown target: {self.target}')