Skip to content

Pytest embedded idf

pytest_embedded_idf.app

IdfApp

Bases: App

Idf App class

Attributes:

Name Type Description
app_path str

App path

binary_path str

binary file path

elf_file str

elf file path

flash_args dict[str, Any]

dict of flasher_args.json

flash_files list[FlashFile]

list of (offset, file path, encrypted) of files need to be flashed in

flash_settings dict[str, Any]

dict of flash settings

Source code in pytest_embedded_idf/app.py
class IdfApp(App):
    """
    Idf App class

    Attributes:
        app_path (str): App path
        binary_path (str): binary file path
        elf_file (str): elf file path
        flash_args (dict[str, Any]): dict of flasher_args.json
        flash_files (list[FlashFile]): list of (offset, file path, encrypted) of files need to be flashed in
        flash_settings (dict[str, Any]): dict of flash settings
    """

    FLASH_ARGS_FILENAME = 'flasher_args.json'

    def __init__(
        self,
        app_path: Optional[str] = None,
        build_dir: Optional[str] = None,
        part_tool: Optional[str] = None,
        **kwargs,
    ):
        """
        Args:
            app_path: App path
            build_dir: Build directory
            part_tool: Partition tool path
        """
        super().__init__(app_path, build_dir, **kwargs)
        if not self.binary_path:
            logging.debug('Binary path not specified, skipping parsing app...')
            return

        # Required if binary path exists
        self.elf_file = self._get_elf_file()
        self.bin_file = self._get_bin_file()

        self.flash_args, self.flash_files, self.flash_settings = self._parse_flash_args()

        # Optional info
        self._sdkconfig = None
        self._target = None

        # the partition table is used for nvs
        self._parttool = part_tool
        self._partition_table = None

    @property
    def parttool_path(self) -> str:
        """
        Returns:
            Partition tool path
        """
        parttool_filepath = self._parttool or os.path.join(
            os.getenv('IDF_PATH', ''),
            'components',
            'partition_table',
            'gen_esp32part.py',
        )
        if os.path.isfile(parttool_filepath):
            return os.path.realpath(parttool_filepath)
        raise ValueError('Partition Tool not found. (Default: $IDF_PATH/components/partition_table/gen_esp32part.py)')

    @property
    def sdkconfig(self) -> Dict[str, Any]:
        """
        Returns:
            dict contains all k-v pairs from the sdkconfig file
        """
        if self._sdkconfig is not None:
            return self._sdkconfig

        sdkconfig_json_path = os.path.join(self.binary_path, 'config', 'sdkconfig.json')
        if not os.path.isfile(sdkconfig_json_path):
            logging.warning(f'{sdkconfig_json_path} doesn\'t exist. Skipping...')
            self._sdkconfig = {}
        else:
            self._sdkconfig = json.load(open(sdkconfig_json_path))
        return self._sdkconfig

    @property
    def target(self) -> str:
        """
        Returns:
            target chip type
        """
        if self.sdkconfig:
            return self.sdkconfig.get('IDF_TARGET', 'esp32')
        else:
            return self.flash_args.get('extra_esptool_args', {}).get('chip', 'esp32')

    @property
    def partition_table(self) -> Dict[str, Any]:
        """
        Returns:
            dict generated by partition tool
        """
        if self._partition_table is not None:
            return self._partition_table

        partition_file = os.path.join(
            self.binary_path,
            self.flash_args.get('partition_table', self.flash_args.get('partition-table', {})).get('file', ''),
        )
        process = subprocess.Popen(
            [sys.executable, self.parttool_path, partition_file],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        stdout, stderr = process.communicate()
        raw_data = stdout.decode() if isinstance(stdout, bytes) else stdout

        partition_table = {}
        for line in raw_data.splitlines():
            if line[0] != '#':
                try:
                    _name, _type, _subtype, _offset, _size, _flags = line.split(',')
                    if _size[-1] == 'K':
                        _size = int(_size[:-1]) * 1024
                    elif _size[-1] == 'M':
                        _size = int(_size[:-1]) * 1024 * 1024
                    else:
                        _size = int(_size)
                    _offset = int(_offset, 0)
                except ValueError:
                    continue
                partition_table[_name] = {
                    'type': _type,
                    'subtype': _subtype,
                    'offset': _offset,
                    'size': _size,
                    'flags': _flags,
                }
        self._partition_table = partition_table
        return self._partition_table

    def _get_elf_file(self) -> Optional[str]:
        for fn in os.listdir(self.binary_path):
            if os.path.splitext(fn)[-1] == '.elf':
                return os.path.realpath(os.path.join(self.binary_path, fn))

        return None

    def _get_bin_file(self) -> str:
        for fn in os.listdir(self.binary_path):
            if os.path.splitext(fn)[-1] == '.bin':
                return os.path.realpath(os.path.join(self.binary_path, fn))
        raise ValueError(f'Bin file under {self.binary_path} not found')

    def _parse_flash_args(
        self,
    ) -> Tuple[Dict[str, Any], List[FlashFile], Dict[str, str]]:
        flash_args_filepath = None
        for fn in os.listdir(self.binary_path):
            if fn == self.FLASH_ARGS_FILENAME:
                flash_args_filepath = os.path.realpath(os.path.join(self.binary_path, fn))
                break

        if not flash_args_filepath:
            raise ValueError(f'{self.FLASH_ARGS_FILENAME} not found')

        with open(flash_args_filepath) as fr:
            flash_args = json.load(fr)

        def _is_encrypted(_flash_args: Dict[str, Any], _offset: int, _file_path: str):
            for entry in _flash_args.values():
                try:
                    if (entry['offset'], entry['file']) == (_offset, _file_path):
                        return entry['encrypted'] == 'true'
                except (TypeError, KeyError):
                    continue

            return False

        flash_files = []
        for (offset, file_path) in flash_args['flash_files'].items():
            flash_files.append(
                FlashFile(
                    int(offset, 0),
                    os.path.join(self.binary_path, file_path),
                    _is_encrypted(flash_args, offset, file_path),
                )
            )

        flash_files.sort()
        flash_settings = flash_args['flash_settings']
        flash_settings['encrypt'] = any([file.encrypted for file in flash_files])

        return flash_args, flash_files, flash_settings

    def get_sha256(self, filepath: str) -> Optional[str]:
        """
        Get the sha256 of the file

        Args:
            filepath: path to the file

        Returns:
            sha256 appended to app
        """
        from pytest_embedded_serial_esp.serial import EspSerial, EsptoolVersion

        if EspSerial.ESPTOOL_VERSION == EsptoolVersion.V3:
            from esptool import LoadFirmwareImage, hexify
        else:
            from esptool.bin_image import LoadFirmwareImage
            from esptool.util import hexify

        image = LoadFirmwareImage(self.target, filepath)
        if image.append_digest:
            return hexify(image.stored_digest).lower()
        return None

__init__(app_path=None, build_dir=None, part_tool=None, **kwargs)

Parameters:

Name Type Description Default
app_path Optional[str]

App path

None
build_dir Optional[str]

Build directory

None
part_tool Optional[str]

Partition tool path

None
Source code in pytest_embedded_idf/app.py
def __init__(
    self,
    app_path: Optional[str] = None,
    build_dir: Optional[str] = None,
    part_tool: Optional[str] = None,
    **kwargs,
):
    """
    Args:
        app_path: App path
        build_dir: Build directory
        part_tool: Partition tool path
    """
    super().__init__(app_path, build_dir, **kwargs)
    if not self.binary_path:
        logging.debug('Binary path not specified, skipping parsing app...')
        return

    # Required if binary path exists
    self.elf_file = self._get_elf_file()
    self.bin_file = self._get_bin_file()

    self.flash_args, self.flash_files, self.flash_settings = self._parse_flash_args()

    # Optional info
    self._sdkconfig = None
    self._target = None

    # the partition table is used for nvs
    self._parttool = part_tool
    self._partition_table = None

parttool_path() property

Returns:

Type Description
str

Partition tool path

Source code in pytest_embedded_idf/app.py
@property
def parttool_path(self) -> str:
    """
    Returns:
        Partition tool path
    """
    parttool_filepath = self._parttool or os.path.join(
        os.getenv('IDF_PATH', ''),
        'components',
        'partition_table',
        'gen_esp32part.py',
    )
    if os.path.isfile(parttool_filepath):
        return os.path.realpath(parttool_filepath)
    raise ValueError('Partition Tool not found. (Default: $IDF_PATH/components/partition_table/gen_esp32part.py)')

sdkconfig() property

Returns:

Type Description
Dict[str, Any]

dict contains all k-v pairs from the sdkconfig file

Source code in pytest_embedded_idf/app.py
@property
def sdkconfig(self) -> Dict[str, Any]:
    """
    Returns:
        dict contains all k-v pairs from the sdkconfig file
    """
    if self._sdkconfig is not None:
        return self._sdkconfig

    sdkconfig_json_path = os.path.join(self.binary_path, 'config', 'sdkconfig.json')
    if not os.path.isfile(sdkconfig_json_path):
        logging.warning(f'{sdkconfig_json_path} doesn\'t exist. Skipping...')
        self._sdkconfig = {}
    else:
        self._sdkconfig = json.load(open(sdkconfig_json_path))
    return self._sdkconfig

target() property

Returns:

Type Description
str

target chip type

Source code in pytest_embedded_idf/app.py
@property
def target(self) -> str:
    """
    Returns:
        target chip type
    """
    if self.sdkconfig:
        return self.sdkconfig.get('IDF_TARGET', 'esp32')
    else:
        return self.flash_args.get('extra_esptool_args', {}).get('chip', 'esp32')

partition_table() property

Returns:

Type Description
Dict[str, Any]

dict generated by partition tool

Source code in pytest_embedded_idf/app.py
@property
def partition_table(self) -> Dict[str, Any]:
    """
    Returns:
        dict generated by partition tool
    """
    if self._partition_table is not None:
        return self._partition_table

    partition_file = os.path.join(
        self.binary_path,
        self.flash_args.get('partition_table', self.flash_args.get('partition-table', {})).get('file', ''),
    )
    process = subprocess.Popen(
        [sys.executable, self.parttool_path, partition_file],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    stdout, stderr = process.communicate()
    raw_data = stdout.decode() if isinstance(stdout, bytes) else stdout

    partition_table = {}
    for line in raw_data.splitlines():
        if line[0] != '#':
            try:
                _name, _type, _subtype, _offset, _size, _flags = line.split(',')
                if _size[-1] == 'K':
                    _size = int(_size[:-1]) * 1024
                elif _size[-1] == 'M':
                    _size = int(_size[:-1]) * 1024 * 1024
                else:
                    _size = int(_size)
                _offset = int(_offset, 0)
            except ValueError:
                continue
            partition_table[_name] = {
                'type': _type,
                'subtype': _subtype,
                'offset': _offset,
                'size': _size,
                'flags': _flags,
            }
    self._partition_table = partition_table
    return self._partition_table

get_sha256(filepath)

Get the sha256 of the file

Parameters:

Name Type Description Default
filepath str

path to the file

required

Returns:

Type Description
Optional[str]

sha256 appended to app

Source code in pytest_embedded_idf/app.py
def get_sha256(self, filepath: str) -> Optional[str]:
    """
    Get the sha256 of the file

    Args:
        filepath: path to the file

    Returns:
        sha256 appended to app
    """
    from pytest_embedded_serial_esp.serial import EspSerial, EsptoolVersion

    if EspSerial.ESPTOOL_VERSION == EsptoolVersion.V3:
        from esptool import LoadFirmwareImage, hexify
    else:
        from esptool.bin_image import LoadFirmwareImage
        from esptool.util import hexify

    image = LoadFirmwareImage(self.target, filepath)
    if image.append_digest:
        return hexify(image.stored_digest).lower()
    return None

pytest_embedded_idf.serial

IdfSerial

Bases: EspSerial

IDF serial Dut class

Auto flash the app while starting test.

Source code in pytest_embedded_idf/serial.py
class IdfSerial(EspSerial):
    """
    IDF serial Dut class

    Auto flash the app while starting test.
    """

    SUGGEST_FLASH_BAUDRATE = 921600
    DEFAULT_SHA256_OFFSET = 0xB0

    def __init__(
        self,
        pexpect_proc: PexpectProcess,
        app: IdfApp,
        target: Optional[str] = None,
        port: Optional[str] = None,
        baud: int = EspSerial.DEFAULT_BAUDRATE,
        skip_autoflash: bool = False,
        port_app_cache: Dict[str, str] = None,
        confirm_target_elf_sha256: bool = False,
        erase_nvs: bool = False,
        **kwargs,
    ) -> None:
        self._port_app_cache: Dict[str, str] = port_app_cache if port_app_cache is not None else {}
        self.app = app
        self.confirm_target_elf_sha256 = confirm_target_elf_sha256
        self.erase_nvs = erase_nvs

        if not hasattr(self.app, 'target'):
            raise ValueError(f'Idf app not parsable. Please check if it\'s valid: {self.app.binary_path}')

        if target and self.app.target and self.app.target != target:
            raise ValueError(f'Targets do not match. App target: {self.app.target}, Cmd target: {target}.')

        super().__init__(pexpect_proc, target or app.target, port, baud, skip_autoflash, **kwargs)

    def _post_init(self):
        if self.esp.serial_port in self._port_app_cache:
            if self.app.binary_path == self._port_app_cache[self.esp.serial_port]:  # hit the cache
                logging.debug('hit port-app cache: %s - %s', self.port, self.app.binary_path)
                if self.confirm_target_elf_sha256:
                    if self.is_target_flashed_same_elf():
                        logging.info('Confirmed target elf file sha256 the same as your local one.')
                        self.skip_autoflash = True
                    else:
                        logging.info('target elf file is different from your local one. Flash the binary again.')
                        self.skip_autoflash = False
                else:
                    logging.info(
                        'App is the same according to the session cache. '
                        'you can use flag "--confirm-target-elf-sha256" to make sure '
                        'that the target elf file is the same as your local one.'
                    )
                    self.skip_autoflash = True

        logging.debug('set port-app cache: %s - %s', self.port, self.app.binary_path)
        self._port_app_cache[self.port] = self.app.binary_path
        super()._post_init()

    def _start(self):
        if self.skip_autoflash:
            logging.info('Skipping auto flash...')
            super()._start()
        else:
            self.flash()

    @EspSerial.use_esptool
    def flash(self) -> None:
        """
        Flash the `app.flash_files` to the dut
        """
        if not self.app.flash_files:
            logging.error('No flash files detected. Skipping auto flash...')
            return

        if not self.app.flash_settings:
            logging.error('No flash settings detected. Skipping auto flash...')
            return

        flash_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if not file.encrypted]
        encrypt_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if file.encrypted]

        nvs_file = None
        if self.erase_nvs:
            address = self.app.partition_table['nvs']['offset']
            size = self.app.partition_table['nvs']['size']
            nvs_file = tempfile.NamedTemporaryFile(delete=False)
            nvs_file.write(b'\xff' * size)
            if not isinstance(address, int):
                address = int(address, 0)

            if self.app.flash_settings['encrypt']:
                encrypt_files.append((address, open(nvs_file.name, 'rb')))
            else:
                flash_files.append((address, open(nvs_file.name, 'rb')))

        # fake flasher args object, this is a hack until
        # esptool Python API is improved
        class FlashArgs(object):
            def __init__(self, attributes):
                for key, value in attributes.items():
                    self.__setattr__(key, value)

        # write_flash expects the parameter encrypt_files to be None and not
        # an empty list, so perform the check here
        default_kwargs = {
            'addr_filename': flash_files,
            'encrypt_files': encrypt_files or None,
            'no_stub': False,
            'compress': True,
            'verify': False,
            'ignore_flash_encryption_efuse_setting': False,
            'erase_all': False,
        }

        if self.ESPTOOL_VERSION == EsptoolVersion.V4:
            default_kwargs['force'] = False

        default_kwargs.update(self.app.flash_settings)
        default_kwargs.update(self.app.flash_args.get('extra_esptool_args', {}))
        args = FlashArgs(default_kwargs)

        try:
            if self.proc.baudrate < self.SUGGEST_FLASH_BAUDRATE:
                self.stub.change_baud(self.SUGGEST_FLASH_BAUDRATE)

            esptool.detect_flash_size(self.stub, args)
            esptool.write_flash(self.stub, args)

            if self.proc.baudrate > self.DEFAULT_BAUDRATE:
                self.stub.change_baud(self.DEFAULT_BAUDRATE)  # set to the default one to get the serial output
        finally:
            if nvs_file:
                nvs_file.close()
                try:
                    os.remove(nvs_file.name)
                except OSError:
                    pass
            for (_, f) in flash_files:
                f.close()
            for (_, f) in encrypt_files:
                f.close()

    @EspSerial.use_esptool
    def dump_flash(
        self,
        partition: Optional[str] = None,
        address: Optional[str] = None,
        size: Optional[str] = None,
        output: Union[str, TextIO, None] = None,
    ) -> Optional[bytes]:
        """
        Dump the flash bytes into the output file by partition name or by start address and size.

        Args:
            output: file path or file stream to write to. File stream should be opened with bytes mode.
            partition: partition name
            address: address that start reading from
            size: read size

        Returns:
            None if `output` is `str` or file stream.
            bytes if `output` is None.
        """
        if partition:
            partition = self.app.partition_table[partition]
            _addr = partition['offset']
            _size = partition['size']
        elif address and size:
            _addr = address
            _size = size
        else:
            raise ValueError('You must specify "partition" or ("address" and "size") to dump flash')

        content = self.stub.read_flash(_addr, _size)
        if output:
            if isinstance(output, str):
                os.makedirs(os.path.dirname(output), exist_ok=True)
                with open(output, 'wb') as f:
                    f.write(content)
            else:
                output.write(content)
        else:
            return content

    @EspSerial.use_esptool
    def erase_partition(self, partition_name: str) -> None:
        """
        Erase the partition provided

        Args:
            partition_name: partition name
        """
        if not self.app.partition_table:
            raise ValueError('Partition table not parsed.')

        if partition_name in self.app.partition_table:
            address = self.app.partition_table[partition_name]['offset']
            size = self.app.partition_table[partition_name]['size']
            logging.info(f'Erasing the partition "{partition_name}" of size {size} at {address}')
            self.stub.erase_region(address, size)
        else:
            raise ValueError(f'partition name "{partition_name}" not found in app partition table')

    @EspSerial.use_esptool
    def erase_flash(self) -> None:
        """
        Erase the complete flash
        """
        logging.info('Erasing the flash')
        self.stub.erase_flash()

    @EspSerial.use_esptool
    def read_flash_elf_sha256(self) -> bytes:
        """
        Read the sha256 digest of the flashed elf file

        Returns:
            bytes of sha256
        """
        bin_offset = None
        for offset, filepath, _ in self.app.flash_files:
            if self.app.bin_file == filepath:
                bin_offset = offset
                break

        if not bin_offset:
            raise ValueError('.bin file not found in flash files')

        return self.stub.read_flash(bin_offset + self.DEFAULT_SHA256_OFFSET, 32)

    def is_target_flashed_same_elf(self) -> bool:
        """
        Check if the sha256 values are matched between the flashed target and the `self.app.elf_file`

        Returns:
            True if the sha256 values are matched
        """
        if not self.app.elf_file:
            logging.info('no elf file. Can\'t tell if the target flashed the same elf file or not. Assume as False')
            return False

        flash_elf_sha256 = self.read_flash_elf_sha256()
        elf_sha256 = hashlib.sha256()
        with open(self.app.elf_file, 'rb') as fr:
            elf_sha256.update(fr.read())

        return flash_elf_sha256 == elf_sha256.digest()

flash()

Flash the app.flash_files to the dut

Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool
def flash(self) -> None:
    """
    Flash the `app.flash_files` to the dut
    """
    if not self.app.flash_files:
        logging.error('No flash files detected. Skipping auto flash...')
        return

    if not self.app.flash_settings:
        logging.error('No flash settings detected. Skipping auto flash...')
        return

    flash_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if not file.encrypted]
    encrypt_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if file.encrypted]

    nvs_file = None
    if self.erase_nvs:
        address = self.app.partition_table['nvs']['offset']
        size = self.app.partition_table['nvs']['size']
        nvs_file = tempfile.NamedTemporaryFile(delete=False)
        nvs_file.write(b'\xff' * size)
        if not isinstance(address, int):
            address = int(address, 0)

        if self.app.flash_settings['encrypt']:
            encrypt_files.append((address, open(nvs_file.name, 'rb')))
        else:
            flash_files.append((address, open(nvs_file.name, 'rb')))

    # fake flasher args object, this is a hack until
    # esptool Python API is improved
    class FlashArgs(object):
        def __init__(self, attributes):
            for key, value in attributes.items():
                self.__setattr__(key, value)

    # write_flash expects the parameter encrypt_files to be None and not
    # an empty list, so perform the check here
    default_kwargs = {
        'addr_filename': flash_files,
        'encrypt_files': encrypt_files or None,
        'no_stub': False,
        'compress': True,
        'verify': False,
        'ignore_flash_encryption_efuse_setting': False,
        'erase_all': False,
    }

    if self.ESPTOOL_VERSION == EsptoolVersion.V4:
        default_kwargs['force'] = False

    default_kwargs.update(self.app.flash_settings)
    default_kwargs.update(self.app.flash_args.get('extra_esptool_args', {}))
    args = FlashArgs(default_kwargs)

    try:
        if self.proc.baudrate < self.SUGGEST_FLASH_BAUDRATE:
            self.stub.change_baud(self.SUGGEST_FLASH_BAUDRATE)

        esptool.detect_flash_size(self.stub, args)
        esptool.write_flash(self.stub, args)

        if self.proc.baudrate > self.DEFAULT_BAUDRATE:
            self.stub.change_baud(self.DEFAULT_BAUDRATE)  # set to the default one to get the serial output
    finally:
        if nvs_file:
            nvs_file.close()
            try:
                os.remove(nvs_file.name)
            except OSError:
                pass
        for (_, f) in flash_files:
            f.close()
        for (_, f) in encrypt_files:
            f.close()

dump_flash(partition=None, address=None, size=None, output=None)

Dump the flash bytes into the output file by partition name or by start address and size.

Parameters:

Name Type Description Default
output Union[str, TextIO, None]

file path or file stream to write to. File stream should be opened with bytes mode.

None
partition Optional[str]

partition name

None
address Optional[str]

address that start reading from

None
size Optional[str]

read size

None

Returns:

Type Description
Optional[bytes]

None if output is str or file stream.

Optional[bytes]

bytes if output is None.

Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool
def dump_flash(
    self,
    partition: Optional[str] = None,
    address: Optional[str] = None,
    size: Optional[str] = None,
    output: Union[str, TextIO, None] = None,
) -> Optional[bytes]:
    """
    Dump the flash bytes into the output file by partition name or by start address and size.

    Args:
        output: file path or file stream to write to. File stream should be opened with bytes mode.
        partition: partition name
        address: address that start reading from
        size: read size

    Returns:
        None if `output` is `str` or file stream.
        bytes if `output` is None.
    """
    if partition:
        partition = self.app.partition_table[partition]
        _addr = partition['offset']
        _size = partition['size']
    elif address and size:
        _addr = address
        _size = size
    else:
        raise ValueError('You must specify "partition" or ("address" and "size") to dump flash')

    content = self.stub.read_flash(_addr, _size)
    if output:
        if isinstance(output, str):
            os.makedirs(os.path.dirname(output), exist_ok=True)
            with open(output, 'wb') as f:
                f.write(content)
        else:
            output.write(content)
    else:
        return content

erase_partition(partition_name)

Erase the partition provided

Parameters:

Name Type Description Default
partition_name str

partition name

required
Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool
def erase_partition(self, partition_name: str) -> None:
    """
    Erase the partition provided

    Args:
        partition_name: partition name
    """
    if not self.app.partition_table:
        raise ValueError('Partition table not parsed.')

    if partition_name in self.app.partition_table:
        address = self.app.partition_table[partition_name]['offset']
        size = self.app.partition_table[partition_name]['size']
        logging.info(f'Erasing the partition "{partition_name}" of size {size} at {address}')
        self.stub.erase_region(address, size)
    else:
        raise ValueError(f'partition name "{partition_name}" not found in app partition table')

erase_flash()

Erase the complete flash

Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool
def erase_flash(self) -> None:
    """
    Erase the complete flash
    """
    logging.info('Erasing the flash')
    self.stub.erase_flash()

read_flash_elf_sha256()

Read the sha256 digest of the flashed elf file

Returns:

Type Description
bytes

bytes of sha256

Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool
def read_flash_elf_sha256(self) -> bytes:
    """
    Read the sha256 digest of the flashed elf file

    Returns:
        bytes of sha256
    """
    bin_offset = None
    for offset, filepath, _ in self.app.flash_files:
        if self.app.bin_file == filepath:
            bin_offset = offset
            break

    if not bin_offset:
        raise ValueError('.bin file not found in flash files')

    return self.stub.read_flash(bin_offset + self.DEFAULT_SHA256_OFFSET, 32)

is_target_flashed_same_elf()

Check if the sha256 values are matched between the flashed target and the self.app.elf_file

Returns:

Type Description
bool

True if the sha256 values are matched

Source code in pytest_embedded_idf/serial.py
def is_target_flashed_same_elf(self) -> bool:
    """
    Check if the sha256 values are matched between the flashed target and the `self.app.elf_file`

    Returns:
        True if the sha256 values are matched
    """
    if not self.app.elf_file:
        logging.info('no elf file. Can\'t tell if the target flashed the same elf file or not. Assume as False')
        return False

    flash_elf_sha256 = self.read_flash_elf_sha256()
    elf_sha256 = hashlib.sha256()
    with open(self.app.elf_file, 'rb') as fr:
        elf_sha256.update(fr.read())

    return flash_elf_sha256 == elf_sha256.digest()

pytest_embedded_idf.dut

IdfDut

Bases: SerialDut

Source code in pytest_embedded_idf/dut.py
class IdfDut(SerialDut):
    XTENSA_TARGETS = ['esp32', 'esp32s2', 'esp32s3']
    RISCV32_TARGETS = ['esp32c3', 'esp32h2', 'esp32c2']

    COREDUMP_UART_START = b'================= CORE DUMP START ================='
    COREDUMP_UART_END = b'================= CORE DUMP END ================='
    COREDUMP_UART_REGEX = re.compile(COREDUMP_UART_START + b'(.+?)' + COREDUMP_UART_END, re.DOTALL)

    app: IdfApp
    serial: IdfSerial

    def __init__(
        self, pexpect_proc: PexpectProcess, app: IdfApp, serial: IdfSerial, skip_check_coredump: bool = False, **kwargs
    ) -> None:
        """
        Args:
            pexpect_proc: `PexpectProcess` instance
            app: `IdfApp` instance
            serial: `IdfSerial` instance
        """
        super().__init__(pexpect_proc, app, serial, **kwargs)

        self.target = serial.target
        self.skip_check_coredump = skip_check_coredump

    @property
    def toolchain_prefix(self) -> str:
        """
        Returns:
            Toolchain prefix according to the `self.target`
        """
        if self.target in self.XTENSA_TARGETS:
            return f'xtensa-{self.target}-elf-'
        elif self.target in self.RISCV32_TARGETS:
            return f'riscv32-{self.target}-elf-'
        else:
            raise ValueError(f'Unknown target: {self.target}')

    def _check_coredump(self) -> None:
        """
        Check core dumps via UART or partition table. Write the decoded or read core dumps into separated files.

        For UART, would read the `_pexpect_logfile` file.
        For partition, would read the flash according to the partition table. needs a valid `parttool_path`.

        Notes:
            - May include multiple core dumps, since each test case may include several unity test cases.
            - May have duplicated core dumps, since after the core dump happened, the target chip would reboot
            automatically.

        Returns:
            None
        """
        if self.app.sdkconfig.get('ESP_COREDUMP_ENABLE_TO_UART', False):
            self._dump_b64_coredumps()
        elif self.app.sdkconfig.get('ESP_COREDUMP_ENABLE_TO_FLASH', False):
            self._dump_flash_coredump()
        else:
            logging.debug('core dump disabled')

    def _dump_b64_coredumps(self) -> None:
        if not self.app.elf_file:
            logging.debug('no elf file. skipping dumping core dumps')
            return

        from esp_coredump import CoreDump  # need IDF_PATH

        with open(self.pexpect_proc._fr.name, 'rb') as fr:
            s = fr.read()

            for i, coredump in enumerate(set(self.COREDUMP_UART_REGEX.findall(s))):  # may duplicate
                coredump_file = None
                try:
                    with tempfile.NamedTemporaryFile(mode='wb', delete=False) as coredump_file:
                        coredump_file.write(coredump.strip().replace(b'\r', b''))
                        coredump_file.flush()

                    coredump = CoreDump(
                        chip=self.target, core=coredump_file.name, core_format='b64', prog=self.app.elf_file
                    )
                    with open(os.path.join(self.logdir, f'coredump_output_{i}'), 'w') as fw:
                        with redirect_stdout(fw):
                            coredump.info_corefile()
                finally:
                    if coredump_file:
                        os.remove(coredump_file.name)

    def _dump_flash_coredump(self) -> None:
        if not self.app.elf_file:
            logging.debug('no elf file. skipping dumping core dumps')
            return

        from esp_coredump import CoreDump  # need IDF_PATH

        if self.app.sdkconfig['ESP_COREDUMP_DATA_FORMAT_ELF']:
            core_format = 'elf'
        elif self.app.sdkconfig['ESP_COREDUMP_DATA_FORMAT_BIN']:
            core_format = 'raw'
        else:
            raise ValueError(f'Invalid coredump format. Use _parse_b64_coredump for UART')

        with self.serial.disable_redirect_thread():
            coredump = CoreDump(
                chip=self.target,
                core_format=core_format,
                port=self.serial.port,
                prog=self.app.elf_file,
            )
            with open(os.path.join(self.logdir, f'coredump_output'), 'w') as fw:
                with redirect_stdout(fw):
                    coredump.info_corefile()

    def close(self) -> None:
        if not self.skip_check_coredump:
            self._check_coredump()
        super().close()

__init__(pexpect_proc, app, serial, skip_check_coredump=False, **kwargs)

Parameters:

Name Type Description Default
pexpect_proc PexpectProcess

PexpectProcess instance

required
app IdfApp

IdfApp instance

required
serial IdfSerial

IdfSerial instance

required
Source code in pytest_embedded_idf/dut.py
def __init__(
    self, pexpect_proc: PexpectProcess, app: IdfApp, serial: IdfSerial, skip_check_coredump: bool = False, **kwargs
) -> None:
    """
    Args:
        pexpect_proc: `PexpectProcess` instance
        app: `IdfApp` instance
        serial: `IdfSerial` instance
    """
    super().__init__(pexpect_proc, app, serial, **kwargs)

    self.target = serial.target
    self.skip_check_coredump = skip_check_coredump

toolchain_prefix() property

Returns:

Type Description
str

Toolchain prefix according to the self.target

Source code in pytest_embedded_idf/dut.py
@property
def toolchain_prefix(self) -> str:
    """
    Returns:
        Toolchain prefix according to the `self.target`
    """
    if self.target in self.XTENSA_TARGETS:
        return f'xtensa-{self.target}-elf-'
    elif self.target in self.RISCV32_TARGETS:
        return f'riscv32-{self.target}-elf-'
    else:
        raise ValueError(f'Unknown target: {self.target}')