Skip to content

Pytest embedded idf

pytest_embedded_idf.app

IdfApp

Bases: App

Idf App class

Attributes:

Name Type Description
elf_file str

elf file path

flash_args dict[str, Any]

dict of flasher_args.json

flash_files list[FlashFile]

list of (offset, file path, encrypted) of files need to be flashed in

flash_settings dict[str, Any]

dict of flash settings

Source code in pytest_embedded_idf/app.py
class IdfApp(App):
    """
    Idf App class

    Attributes:
        elf_file (str): elf file path
        flash_args (dict[str, Any]): dict of flasher_args.json
        flash_files (list[FlashFile]): list of (offset, file path, encrypted) of files need to be flashed in
        flash_settings (dict[str, Any]): dict of flash settings
    """

    FLASH_ARGS_FILENAME = 'flash_args'
    FLASH_PROJECT_ARGS_FILENAME = 'flash_project_args'
    FLASH_ARGS_JSON_FILENAME = 'flasher_args.json'

    def __init__(
        self,
        part_tool: Optional[str] = None,
        **kwargs,
    ):
        super().__init__(**kwargs)

        # Optional info
        self._sdkconfig = None
        self._target = None
        # the partition table is used for nvs
        self._parttool = part_tool
        self._partition_table = None

        if not self.binary_path:
            logging.debug('Binary path not specified, skipping parsing app...')
            return

        # Required if binary path exists
        self.elf_file = self._get_elf_file()

        # loadable elf file skip the rest of these
        # TODO to be improved in #186
        # 5.1 changed from APP_BUILD_TYPE_ELF_RAM to APP_BUILD_TYPE_RAM
        # keep backward compatibility
        if self.sdkconfig.get('APP_BUILD_TYPE_ELF_RAM') or self.sdkconfig.get('APP_BUILD_TYPE_RAM'):
            self.is_loadable_elf = True
        else:
            self.is_loadable_elf = False

        self.bin_file = self._get_bin_file()
        self.flash_args, self.flash_files, self.flash_settings = {}, [], {}

        if not self.is_loadable_elf:
            self.flash_args, self.flash_files, self.flash_settings = self._parse_flash_args_json()

    @property
    def parttool_path(self) -> str:
        """
        Returns:
            Partition tool path
        """
        parttool_filepath = self._parttool or os.path.join(
            os.getenv('IDF_PATH', ''),
            'components',
            'partition_table',
            'gen_esp32part.py',
        )
        if os.path.isfile(parttool_filepath):
            return os.path.realpath(parttool_filepath)
        raise ValueError('Partition Tool not found. (Default: $IDF_PATH/components/partition_table/gen_esp32part.py)')

    @property
    def sdkconfig(self) -> Dict[str, Any]:
        """
        Returns:
            dict contains all k-v pairs from the sdkconfig file
        """
        if self._sdkconfig is not None:
            return self._sdkconfig

        sdkconfig_json_path = os.path.join(self.binary_path, 'config', 'sdkconfig.json')
        if not os.path.isfile(sdkconfig_json_path):
            logging.warning(f'{sdkconfig_json_path} doesn\'t exist. Skipping...')
            self._sdkconfig = {}
        else:
            self._sdkconfig = json.load(open(sdkconfig_json_path))
        return self._sdkconfig

    @property
    def target(self) -> str:
        """
        Returns:
            target chip type
        """
        if self.sdkconfig:
            return self.sdkconfig.get('IDF_TARGET', 'esp32')
        else:
            return self.flash_args.get('extra_esptool_args', {}).get('chip', 'esp32')

    @property
    def partition_table(self) -> Dict[str, Any]:
        """
        Returns:
            partition table dict generated by the partition tool
        """
        if self._partition_table is not None:
            return self._partition_table

        partition_file = os.path.join(
            self.binary_path,
            self.flash_args.get('partition_table', self.flash_args.get('partition-table', {})).get('file', ''),
        )
        process = subprocess.Popen(
            [sys.executable, self.parttool_path, partition_file],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        stdout, stderr = process.communicate()
        raw_data = stdout.decode() if isinstance(stdout, bytes) else stdout

        partition_table = {}
        for line in raw_data.splitlines():
            if line[0] != '#':
                try:
                    _name, _type, _subtype, _offset, _size, _flags = line.split(',')
                    if _size[-1] == 'K':
                        _size = int(_size[:-1]) * 1024
                    elif _size[-1] == 'M':
                        _size = int(_size[:-1]) * 1024 * 1024
                    else:
                        _size = int(_size)
                    _offset = int(_offset, 0)
                except ValueError:
                    continue
                partition_table[_name] = {
                    'type': _type,
                    'subtype': _subtype,
                    'offset': _offset,
                    'size': _size,
                    'flags': _flags,
                }
        self._partition_table = partition_table
        return self._partition_table

    def _get_elf_file(self) -> Optional[str]:
        for fn in os.listdir(self.binary_path):
            if os.path.splitext(fn)[-1] == '.elf':
                return os.path.realpath(os.path.join(self.binary_path, fn))

        return None

    def _get_bin_file(self) -> Optional[str]:
        for fn in os.listdir(self.binary_path):
            if os.path.splitext(fn)[-1] == '.bin':
                return os.path.realpath(os.path.join(self.binary_path, fn))

        return None

    def _parse_flash_args(self) -> List[str]:
        flash_args_filepath = None
        for fn in os.listdir(self.binary_path):
            if fn in [self.FLASH_PROJECT_ARGS_FILENAME, self.FLASH_ARGS_FILENAME]:
                flash_args_filepath = os.path.realpath(os.path.join(self.binary_path, fn))
                break

        if not flash_args_filepath:
            raise ValueError(
                f'{self.FLASH_PROJECT_ARGS_FILENAME} or {self.FLASH_ARGS_FILENAME} '
                f'is not found under {self.binary_path}'
            )

        with open(flash_args_filepath) as fr:
            return shlex.split(fr.read().strip())

    def _parse_flash_args_json(
        self,
    ) -> Tuple[Dict[str, Any], List[FlashFile], Dict[str, str]]:
        flash_args_json_filepath = None
        for fn in os.listdir(self.binary_path):
            if fn == self.FLASH_ARGS_JSON_FILENAME:
                flash_args_json_filepath = os.path.realpath(os.path.join(self.binary_path, fn))
                break

        if not flash_args_json_filepath:
            raise ValueError(f'{self.FLASH_ARGS_JSON_FILENAME} not found')

        with open(flash_args_json_filepath) as fr:
            flash_args = json.load(fr)

        def _is_encrypted(_flash_args: Dict[str, Any], _offset: int, _file_path: str):
            for entry in _flash_args.values():
                try:
                    if (entry['offset'], entry['file']) == (_offset, _file_path):
                        return entry['encrypted'] == 'true'
                except (TypeError, KeyError):
                    continue

            return False

        flash_files = []
        for offset, file_path in flash_args['flash_files'].items():
            flash_files.append(
                FlashFile(
                    int(offset, 0),
                    os.path.join(self.binary_path, file_path),
                    _is_encrypted(flash_args, offset, file_path),
                )
            )

        flash_files.sort()
        flash_settings = flash_args['flash_settings']
        flash_settings['encrypt'] = any([file.encrypted for file in flash_files])

        return flash_args, flash_files, flash_settings

    def get_sha256(self, filepath: str) -> Optional[str]:
        """
        Get the sha256 of the file

        Args:
            filepath: path to the file

        Returns:
            sha256 value appended to app
        """
        from esptool.bin_image import LoadFirmwareImage
        from esptool.util import hexify

        image = LoadFirmwareImage(self.target, filepath)
        if image.append_digest:
            return hexify(image.stored_digest).lower()
        return None

partition_table: Dict[str, Any] property

Returns:

Type Description
Dict[str, Any]

partition table dict generated by the partition tool

parttool_path: str property

Returns:

Type Description
str

Partition tool path

sdkconfig: Dict[str, Any] property

Returns:

Type Description
Dict[str, Any]

dict contains all k-v pairs from the sdkconfig file

target: str property

Returns:

Type Description
str

target chip type

get_sha256(filepath)

Get the sha256 of the file

Parameters:

Name Type Description Default
filepath str

path to the file

required

Returns:

Type Description
Optional[str]

sha256 value appended to app

Source code in pytest_embedded_idf/app.py
def get_sha256(self, filepath: str) -> Optional[str]:
    """
    Get the sha256 of the file

    Args:
        filepath: path to the file

    Returns:
        sha256 value appended to app
    """
    from esptool.bin_image import LoadFirmwareImage
    from esptool.util import hexify

    image = LoadFirmwareImage(self.target, filepath)
    if image.append_digest:
        return hexify(image.stored_digest).lower()
    return None

pytest_embedded_idf.serial

IdfSerial

Bases: EspSerial

IDF serial Dut class

Auto flash the app while starting test.

Source code in pytest_embedded_idf/serial.py
class IdfSerial(EspSerial):
    """
    IDF serial Dut class

    Auto flash the app while starting test.
    """

    SUGGEST_FLASH_BAUDRATE = 921600
    DEFAULT_SHA256_OFFSET = 0xB0

    def __init__(
        self,
        app: IdfApp,
        target: Optional[str] = None,
        confirm_target_elf_sha256: bool = False,
        erase_nvs: bool = False,
        **kwargs,
    ) -> None:
        self.app = app
        self.confirm_target_elf_sha256 = confirm_target_elf_sha256
        self.erase_nvs = erase_nvs

        if not hasattr(self.app, 'target'):
            raise ValueError(f'Idf app not parsable. Please check if it\'s valid: {self.app.binary_path}')

        if target and self.app.target and self.app.target != target:
            raise ValueError(f'Targets do not match. App target: {self.app.target}, Cmd target: {target}.')

        super().__init__(
            target=target or app.target,
            **kwargs,
        )

    def _post_init(self):
        if self.erase_all:
            self.skip_autoflash = False
        elif self._meta and self._meta.hit_port_app_cache(self.port, self.app):
            if self.confirm_target_elf_sha256:
                if self.is_target_flashed_same_elf():
                    logging.info('Confirmed target elf file sha256 the same as your local one.')
                    self.skip_autoflash = True
                else:
                    logging.info('target elf file is different from your local one. Flash the binary again.')
                    self.skip_autoflash = False
            else:
                logging.info(
                    'App is the same according to the session cache. '
                    'you can use flag "--confirm-target-elf-sha256" to make sure '
                    'that the target elf file is the same as your local one.'
                )
                self.skip_autoflash = True

        super()._post_init()

    def _start(self):
        if self.skip_autoflash:
            logging.info('Skipping auto flash...')
            super()._start()
        else:
            if self.app.is_loadable_elf:
                self.load_ram()
            else:
                self.flash()

    def load_ram(self) -> None:
        if not self.app.is_loadable_elf:
            raise ValueError('elf should be loadable elf')

        # 5.1 or earlier with sdkconfig APP_BUILD_TYPE_ELF_RAM, would build elf file only
        # 5.1 or later with sdkconfig renamed APP_BUILD_TYPE_RAM, would build bin file only
        if self.app.bin_file:
            bin_file = self.app.bin_file
        else:
            live_print_call(
                [
                    'esptool.py',
                    '--chip',
                    self.app.target,
                    'elf2image',
                    self.app.elf_file,
                    *self.app._parse_flash_args(),
                ],
                msg_queue=self._q,
            )
            bin_file = self.app.elf_file.replace('.elf', '.bin')

        live_print_call(
            [
                'esptool.py',
                '--chip',
                self.app.target,
                '--no-stub',
                'load_ram',
                bin_file,
            ],
            msg_queue=self._q,
        )

    @EspSerial.use_esptool()
    def flash(self) -> None:
        """
        Flash the `app.flash_files` to the dut
        """
        if not self.app.flash_files:
            logging.error('No flash files detected. Skipping auto flash...')
            return

        if not self.app.flash_settings:
            logging.error('No flash settings detected. Skipping auto flash...')
            return

        flash_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if not file.encrypted]
        encrypt_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if file.encrypted]

        nvs_file = None
        try:
            if self.erase_nvs:
                address = self.app.partition_table['nvs']['offset']
                size = self.app.partition_table['nvs']['size']
                nvs_file = tempfile.NamedTemporaryFile(delete=False)
                nvs_file.write(b'\xff' * size)
                if not isinstance(address, int):
                    address = int(address, 0)

                if self.app.flash_settings['encrypt']:
                    encrypt_files.append((address, open(nvs_file.name, 'rb')))
                else:
                    flash_files.append((address, open(nvs_file.name, 'rb')))

            # write_flash expects the parameter encrypt_files to be None and not
            # an empty list, so perform the check here
            default_kwargs = {
                'addr_filename': flash_files,
                'encrypt_files': encrypt_files or None,
                'no_stub': False,
                'compress': True,
                'verify': False,
                'ignore_flash_encryption_efuse_setting': False,
                'erase_all': False,
                'force': False,
            }

            default_kwargs.update(self.app.flash_settings)
            default_kwargs.update(self.app.flash_args.get('extra_esptool_args', {}))
            args = EsptoolArgs(**default_kwargs)

            self.stub.change_baud(self.esptool_baud)
            esptool.detect_flash_size(self.stub, args)
            esptool.write_flash(self.stub, args)
            self.stub.change_baud(self.baud)

            if self._meta:
                self._meta.set_port_app_cache(self.port, self.app)
        finally:
            if nvs_file:
                nvs_file.close()
                try:
                    os.remove(nvs_file.name)
                except OSError:
                    pass
            for _, f in flash_files:
                f.close()
            for _, f in encrypt_files:
                f.close()

    @EspSerial.use_esptool()
    def dump_flash(
        self,
        partition: Optional[str] = None,
        address: Optional[str] = None,
        size: Optional[str] = None,
        output: Union[str, TextIO, None] = None,
    ) -> Optional[bytes]:
        """
        Dump the flash bytes into the output file by partition name or by start address and size.

        Args:
            output: file path or file stream to write to. File stream should be opened with bytes mode.
            partition: partition name
            address: address that start reading from
            size: read size

        Returns:
            None if `output` is `str` or file stream.
            bytes if `output` is None.
        """
        if partition:
            partition = self.app.partition_table[partition]
            _addr = partition['offset']
            _size = partition['size']
        elif address and size:
            _addr = address
            _size = size
        else:
            raise ValueError('You must specify "partition" or ("address" and "size") to dump flash')

        content = self.stub.read_flash(_addr, _size)
        if output:
            if isinstance(output, str):
                os.makedirs(os.path.dirname(output), exist_ok=True)
                with open(output, 'wb') as f:
                    f.write(content)
            else:
                output.write(content)
        else:
            return content

    @EspSerial.use_esptool()
    def erase_partition(self, partition_name: str) -> None:
        """
        Erase the partition provided

        Args:
            partition_name: partition name
        """
        if not self.app.partition_table:
            raise ValueError('Partition table not parsed.')

        if partition_name in self.app.partition_table:
            address = self.app.partition_table[partition_name]['offset']
            size = self.app.partition_table[partition_name]['size']
            logging.info(f'Erasing the partition "{partition_name}" of size {size} at {address}')
            self.stub.erase_region(address, size)
        else:
            raise ValueError(f'partition name "{partition_name}" not found in app partition table')

    @EspSerial.use_esptool()
    def read_flash_elf_sha256(self) -> bytes:
        """
        Read the sha256 digest of the flashed elf file

        Returns:
            bytes of sha256
        """
        bin_offset = None
        for offset, filepath, _ in self.app.flash_files:
            if self.app.bin_file == filepath:
                bin_offset = offset
                break

        if not bin_offset:
            raise ValueError('.bin file not found in flash files')

        return self.stub.read_flash(bin_offset + self.DEFAULT_SHA256_OFFSET, 32)

    def is_target_flashed_same_elf(self) -> bool:
        """
        Check if the sha256 values are matched between the flashed target and the `self.app.elf_file`

        Returns:
            True if the sha256 values are matched
        """
        if not self.app.elf_file:
            logging.info('no elf file. Can\'t tell if the target flashed the same elf file or not. Assume as False')
            return False

        flash_elf_sha256 = self.read_flash_elf_sha256()
        elf_sha256 = hashlib.sha256()
        with open(self.app.elf_file, 'rb') as fr:
            elf_sha256.update(fr.read())

        return flash_elf_sha256 == elf_sha256.digest()

dump_flash(partition=None, address=None, size=None, output=None)

Dump the flash bytes into the output file by partition name or by start address and size.

Parameters:

Name Type Description Default
output Union[str, TextIO, None]

file path or file stream to write to. File stream should be opened with bytes mode.

None
partition Optional[str]

partition name

None
address Optional[str]

address that start reading from

None
size Optional[str]

read size

None

Returns:

Type Description
Optional[bytes]

None if output is str or file stream.

Optional[bytes]

bytes if output is None.

Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool()
def dump_flash(
    self,
    partition: Optional[str] = None,
    address: Optional[str] = None,
    size: Optional[str] = None,
    output: Union[str, TextIO, None] = None,
) -> Optional[bytes]:
    """
    Dump the flash bytes into the output file by partition name or by start address and size.

    Args:
        output: file path or file stream to write to. File stream should be opened with bytes mode.
        partition: partition name
        address: address that start reading from
        size: read size

    Returns:
        None if `output` is `str` or file stream.
        bytes if `output` is None.
    """
    if partition:
        partition = self.app.partition_table[partition]
        _addr = partition['offset']
        _size = partition['size']
    elif address and size:
        _addr = address
        _size = size
    else:
        raise ValueError('You must specify "partition" or ("address" and "size") to dump flash')

    content = self.stub.read_flash(_addr, _size)
    if output:
        if isinstance(output, str):
            os.makedirs(os.path.dirname(output), exist_ok=True)
            with open(output, 'wb') as f:
                f.write(content)
        else:
            output.write(content)
    else:
        return content

erase_partition(partition_name)

Erase the partition provided

Parameters:

Name Type Description Default
partition_name str

partition name

required
Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool()
def erase_partition(self, partition_name: str) -> None:
    """
    Erase the partition provided

    Args:
        partition_name: partition name
    """
    if not self.app.partition_table:
        raise ValueError('Partition table not parsed.')

    if partition_name in self.app.partition_table:
        address = self.app.partition_table[partition_name]['offset']
        size = self.app.partition_table[partition_name]['size']
        logging.info(f'Erasing the partition "{partition_name}" of size {size} at {address}')
        self.stub.erase_region(address, size)
    else:
        raise ValueError(f'partition name "{partition_name}" not found in app partition table')

flash()

Flash the app.flash_files to the dut

Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool()
def flash(self) -> None:
    """
    Flash the `app.flash_files` to the dut
    """
    if not self.app.flash_files:
        logging.error('No flash files detected. Skipping auto flash...')
        return

    if not self.app.flash_settings:
        logging.error('No flash settings detected. Skipping auto flash...')
        return

    flash_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if not file.encrypted]
    encrypt_files = [(file.offset, open(file.file_path, 'rb')) for file in self.app.flash_files if file.encrypted]

    nvs_file = None
    try:
        if self.erase_nvs:
            address = self.app.partition_table['nvs']['offset']
            size = self.app.partition_table['nvs']['size']
            nvs_file = tempfile.NamedTemporaryFile(delete=False)
            nvs_file.write(b'\xff' * size)
            if not isinstance(address, int):
                address = int(address, 0)

            if self.app.flash_settings['encrypt']:
                encrypt_files.append((address, open(nvs_file.name, 'rb')))
            else:
                flash_files.append((address, open(nvs_file.name, 'rb')))

        # write_flash expects the parameter encrypt_files to be None and not
        # an empty list, so perform the check here
        default_kwargs = {
            'addr_filename': flash_files,
            'encrypt_files': encrypt_files or None,
            'no_stub': False,
            'compress': True,
            'verify': False,
            'ignore_flash_encryption_efuse_setting': False,
            'erase_all': False,
            'force': False,
        }

        default_kwargs.update(self.app.flash_settings)
        default_kwargs.update(self.app.flash_args.get('extra_esptool_args', {}))
        args = EsptoolArgs(**default_kwargs)

        self.stub.change_baud(self.esptool_baud)
        esptool.detect_flash_size(self.stub, args)
        esptool.write_flash(self.stub, args)
        self.stub.change_baud(self.baud)

        if self._meta:
            self._meta.set_port_app_cache(self.port, self.app)
    finally:
        if nvs_file:
            nvs_file.close()
            try:
                os.remove(nvs_file.name)
            except OSError:
                pass
        for _, f in flash_files:
            f.close()
        for _, f in encrypt_files:
            f.close()

is_target_flashed_same_elf()

Check if the sha256 values are matched between the flashed target and the self.app.elf_file

Returns:

Type Description
bool

True if the sha256 values are matched

Source code in pytest_embedded_idf/serial.py
def is_target_flashed_same_elf(self) -> bool:
    """
    Check if the sha256 values are matched between the flashed target and the `self.app.elf_file`

    Returns:
        True if the sha256 values are matched
    """
    if not self.app.elf_file:
        logging.info('no elf file. Can\'t tell if the target flashed the same elf file or not. Assume as False')
        return False

    flash_elf_sha256 = self.read_flash_elf_sha256()
    elf_sha256 = hashlib.sha256()
    with open(self.app.elf_file, 'rb') as fr:
        elf_sha256.update(fr.read())

    return flash_elf_sha256 == elf_sha256.digest()

read_flash_elf_sha256()

Read the sha256 digest of the flashed elf file

Returns:

Type Description
bytes

bytes of sha256

Source code in pytest_embedded_idf/serial.py
@EspSerial.use_esptool()
def read_flash_elf_sha256(self) -> bytes:
    """
    Read the sha256 digest of the flashed elf file

    Returns:
        bytes of sha256
    """
    bin_offset = None
    for offset, filepath, _ in self.app.flash_files:
        if self.app.bin_file == filepath:
            bin_offset = offset
            break

    if not bin_offset:
        raise ValueError('.bin file not found in flash files')

    return self.stub.read_flash(bin_offset + self.DEFAULT_SHA256_OFFSET, 32)

pytest_embedded_idf.dut

IdfDut

Bases: SerialDut

Dut class for serial ports connect to Espressif boards which are flashed with ESP-IDF apps

Attributes:

Name Type Description
target str

target chip type

skip_check_coredump bool

skip check core dumped or not while dut teardown if set to True

Source code in pytest_embedded_idf/dut.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
class IdfDut(SerialDut):
    """
    Dut class for serial ports connect to Espressif boards which are flashed with ESP-IDF apps

    Attributes:
        target (str): target chip type
        skip_check_coredump (bool): skip check core dumped or not while dut teardown if set to True
    """

    XTENSA_TARGETS = ['esp32', 'esp32s2', 'esp32s3']
    RISCV32_TARGETS = ['esp32c3', 'esp32h2', 'esp32c2']

    COREDUMP_UART_START = b'================= CORE DUMP START ================='
    COREDUMP_UART_END = b'================= CORE DUMP END ================='
    COREDUMP_UART_REGEX = re.compile(COREDUMP_UART_START + b'(.+?)' + COREDUMP_UART_END, re.DOTALL)

    # panic handler related messages
    PANIC_START = b'register dump:'
    PANIC_END = b'ELF file SHA256:'

    app: IdfApp

    def __init__(
        self,
        app: IdfApp,
        skip_check_coredump: bool = False,
        panic_output_decode_script: str = None,
        **kwargs,
    ) -> None:
        self.target = app.target
        self.skip_check_coredump = skip_check_coredump
        self._panic_output_decode_script = panic_output_decode_script
        self._test_menu: t.List[UnittestMenuCase] = None  # type: ignore

        super().__init__(app=app, **kwargs)

    @property
    def toolchain_prefix(self) -> str:
        """
        Returns:
            Toolchain prefix according to the `self.target`
        """
        if self.target in self.XTENSA_TARGETS:
            return f'xtensa-{self.target}-elf-'
        elif self.target in self.RISCV32_TARGETS:
            return 'riscv32-esp-elf-'
        else:
            raise ValueError(f'Unknown target: {self.target}')

    @property
    def panic_output_decode_script(self) -> t.Optional[str]:
        """
        Returns:
            Panic output decode script path
        """
        script_filepath = self._panic_output_decode_script or os.path.join(
            os.getenv('IDF_PATH', 'IDF_PATH'),
            'tools',
            'gdb_panic_server.py',
        )
        if not os.path.isfile(script_filepath):
            raise ValueError(
                'Panic output decode script not found. Please use --panic-output-decode-script flag '
                'to provide script or set IDF_PATH (Default: $IDF_PATH/tools/gdb_panic_server.py)'
            )
        return os.path.realpath(script_filepath)

    def _check_panic_decode_trigger(self):  # type: () -> None
        if not self.app.elf_file:
            logging.warning('No elf file found. Skipping decode panic output...')
            return

        with open(self.logfile, 'rb') as output_file:
            output = output_file.read()
        # get the panic output by looking for the indexes
        # of the first occurrences of PANIC_START and PANIC_END patterns
        panic_output_idx_start = output.find(self.PANIC_START) - 10
        panic_output_idx_end = output.find(self.PANIC_END, output.find(self.PANIC_START) + 1) + 15
        panic_output_res = output[panic_output_idx_start:panic_output_idx_end]
        panic_output = panic_output_res if panic_output_res else None
        if panic_output is None:
            return

        with tempfile.NamedTemporaryFile(mode='wb', delete=False) as panic_output_file:
            panic_output_file.write(panic_output)
            panic_output_file.flush()
        try:
            cmd = [
                f'{self.toolchain_prefix}-gdb',
                '--command',
                f'{self.app.app_path}/build/prefix_map_gdbinit',
                '--batch',
                '-n',
                self.app.elf_file,
                '-ex',
                "target remote | \"{python}\" \"{script}\" --target {target} \"{output_file}\"".format(
                    python=sys.executable,
                    script=self.panic_output_decode_script,
                    target=self.target,
                    output_file=panic_output_file.name,
                ),
                '-ex',
                'bt',
            ]
            output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
            logging.info('\n\nBacktrace:\n')
            logging.info(output.decode())  # noqa: E999
        except subprocess.CalledProcessError as e:
            logging.debug(f'Failed to run gdb_panic_server.py script: {e}\n{e.output}\n\n')
            logging.info(panic_output.decode())
        finally:
            if panic_output_file is not None:
                try:
                    os.unlink(panic_output_file.name)
                except OSError as e:
                    logging.debug(f'Couldn\'t remove temporary panic output file ({e})')

    def _check_coredump(self) -> None:
        """
        Handle errors by panic_handler_script or check core dumps via UART or partition table.
        Write the decoded or read core dumps into separated files.

        For UART and panic output, would read the `_pexpect_logfile` file.
        For partition, would read the flash according to the partition table. needs a valid `parttool_path`.

        Notes:
            - May include multiple core dumps, since each test case may include several unity test cases.
            - May have duplicated core dumps, since after the core dump happened, the target chip would reboot
            automatically.

        Returns:
            None
        """
        if self.target in self.RISCV32_TARGETS:
            self._check_panic_decode_trigger()  # need IDF_PATH
        if self.app.sdkconfig.get('ESP_COREDUMP_ENABLE_TO_UART', False):
            self._dump_b64_coredumps()
        elif self.app.sdkconfig.get('ESP_COREDUMP_ENABLE_TO_FLASH', False):
            self._dump_flash_coredump()
        else:
            logging.debug('core dump disabled')

    def _dump_b64_coredumps(self) -> None:
        if not self.app.elf_file:
            logging.debug('no elf file. skipping dumping core dumps')
            return

        from esp_coredump import CoreDump  # need IDF_PATH

        with open(self.logfile, 'rb') as fr:
            s = fr.read()

            for i, coredump in enumerate(set(self.COREDUMP_UART_REGEX.findall(s))):  # may duplicate
                coredump_file = None
                try:
                    with tempfile.NamedTemporaryFile(mode='wb', delete=False) as coredump_file:
                        coredump_file.write(coredump.strip().replace(b'\r', b''))
                        coredump_file.flush()

                    coredump = CoreDump(
                        chip=self.target,
                        core=coredump_file.name,
                        core_format='b64',
                        prog=self.app.elf_file,
                    )
                    with open(os.path.join(self._meta.logdir, f'coredump_output_{i}'), 'w') as fw:
                        with redirect_stdout(fw):
                            coredump.info_corefile()
                finally:
                    if coredump_file:
                        os.remove(coredump_file.name)

    def _dump_flash_coredump(self) -> None:
        if not self.app.elf_file:
            logging.debug('no elf file. skipping dumping core dumps')
            return

        from esp_coredump import CoreDump  # need IDF_PATH

        if self.app.sdkconfig['ESP_COREDUMP_DATA_FORMAT_ELF']:
            core_format = 'elf'
        elif self.app.sdkconfig['ESP_COREDUMP_DATA_FORMAT_BIN']:
            core_format = 'raw'
        else:
            raise ValueError('Invalid coredump format. Use _parse_b64_coredump for UART')

        with self.serial.disable_redirect_thread():
            coredump = CoreDump(
                chip=self.target,
                core_format=core_format,
                port=self.serial.port,
                prog=self.app.elf_file,
            )
            with open(os.path.join(self._meta.logdir, 'coredump_output'), 'w') as fw:
                with redirect_stdout(fw):
                    coredump.info_corefile()

    def close(self) -> None:
        if not self.skip_check_coredump:
            try:
                self._check_coredump()
            except Exception as e:
                logging.debug(e)
        super().close()

    #####################
    # IDF-unity related #
    #####################
    def _parse_test_menu(
        self,
        ready_line: str = 'Press ENTER to see the list of tests',
        pattern="Here's the test menu, pick your combo:(.+)Enter test for running.",
        trigger: str = '',
    ) -> t.List[UnittestMenuCase]:
        """
        Get test case list from test menu via UART print.

        Args:
            ready_line: Prompt to indicate that device is ready to print test menu.
            pattern: Pattern to match the output from device, menu block should be in the first group.
                     This will be directly passed to `pexpect.expect()`.
            trigger: Keys to trigger device to print test menu by UART.

        Returns:
            A `list` of `UnittestMenuCase`, which includes info for each test case.
        """
        self.expect_exact(ready_line)
        self.write(trigger)
        menu_block = self.expect(pattern).group(1)
        s = str(menu_block, encoding='UTF-8')
        return self._parse_unity_menu_from_str(s)

    def parse_test_menu(
        self,
        ready_line: str = 'Press ENTER to see the list of tests',
        pattern="Here's the test menu, pick your combo:(.+)Enter test for running.",
        trigger: str = '',
    ) -> t.List[UnittestMenuCase]:
        warnings.warn(
            'Please use `dut.test_menu` property directly, '
            'will rename this function to `_parse_test_menu` in release 2.0.0',
            DeprecationWarning,
        )

        return self._parse_test_menu(ready_line, pattern, trigger)

    @staticmethod
    def parse_unity_menu_from_str(s: str) -> t.List[UnittestMenuCase]:
        warnings.warn(
            'Please use `dut.test_menu` property directly, '
            'will rename this function to `_parse_unity_menu_from_str` in release 2.0.0',
            DeprecationWarning,
        )

        return IdfDut._parse_unity_menu_from_str(s)

    @staticmethod
    def _parse_unity_menu_from_str(s: str) -> t.List[UnittestMenuCase]:
        """
        Parse test case menu from string to list of `UnittestMenuCase`.

        Args:
            s: string include test case menu.

        Returns:
            A `list` of `UnittestMenuCase`, which includes info for each test case.
        """
        cases = s.splitlines()

        case_regex = re.compile(r'\((\d+)\)\s\"(.+)\"\s(\[.+\])+')
        subcase_regex = re.compile(r'\t\((\d+)\)\s\"(.+)\"')

        test_menu = []
        for case in cases:
            case_match = case_regex.match(case)
            if case_match is not None:
                index, name, tag_block = case_match.groups()
                tags = re.findall(r'\[(.+?)\]', tag_block)

                if 'multi_stage' in tags:
                    _type = 'multi_stage'
                    tags.remove('multi_stage')
                elif 'multi_device' in tags:
                    _type = 'multi_device'
                    tags.remove('multi_device')
                else:
                    _type = 'normal'

                keyword = []
                if 'ignore' in tags:
                    keyword.append('ignore')
                    tags.remove('ignore')
                elif 'disable' in tags:
                    keyword = 'disable'
                    tags.remove('disable')

                attributes = {}
                group = []
                for tag in tags:
                    if '=' in tag:
                        k, v = tag.replace(' ', '').split('=')
                        attributes[k] = v
                    else:
                        group.append(tag)

                test_menu.append(
                    UnittestMenuCase(
                        index=int(index),
                        name=name,
                        type=_type,
                        keywords=keyword,
                        groups=group,
                        attributes=attributes,
                        subcases=[],
                    )
                )
                continue
            subcase_match = subcase_regex.match(case)
            if subcase_match is not None:
                index, name = subcase_match.groups()
                test_menu[-1].subcases.append({'index': int(index), 'name': name})
                continue

            if case != '':
                raise NotImplementedError('Unrecognized test case:', case)

        return test_menu

    @property
    def test_menu(self) -> t.List[UnittestMenuCase]:
        if self._test_menu is None:
            self._test_menu = self._parse_test_menu()
            logging.debug('Successfully parsed unity test menu')
            self.serial.hard_reset()

        return self._test_menu

    def _record_single_unity_test_case(func):
        """
        The first argument of the function that is using this decorator must be `case`. passing with args.

        Notes:
            This function is better than `dut.expect_unity_output()` since it will record the test case even it core
                dumped during running the test case or other reasons that cause the final result block is uncaught.
        """

        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            _start_at = time.perf_counter()  # declare here in case hard reset failed
            _timeout = kwargs.get('timeout', 30)
            _case = args[0]

            try:
                # do it here since the first hard reset before test case shouldn't be counted in duration time
                if 'reset' in kwargs:
                    if kwargs.pop('reset'):
                        self.serial.hard_reset()

                _start_at = time.perf_counter()
                func(self, *args, **kwargs)
            finally:
                _timestamp = time.perf_counter()
                _log = ''
                try:
                    _timeout = _timeout - _timestamp + _start_at
                    if _timeout < 0:  # pexpect process would expect 30s if < 0
                        _timeout = 0
                    self.expect(UNITY_SUMMARY_LINE_REGEX, timeout=_timeout)
                except Exception:  # result block missing # noqa
                    pass
                else:  # result block exists
                    _log = remove_asci_color_code(self.pexpect_proc.before)
                finally:
                    _end_at = time.perf_counter()
                    self._add_single_unity_test_case(
                        _case, _log, additional_attrs={'time': round(_end_at - _start_at, 3)}
                    )

        return wrapper

    def _add_single_unity_test_case(
        self, case: UnittestMenuCase, log: t.Optional[t.AnyStr], additional_attrs: t.Optional[t.Dict[str, t.Any]] = None
    ):
        if log:
            # check format
            check = UNITY_FIXTURE_REGEX.search(log)
            if check:
                regex = UNITY_FIXTURE_REGEX
            else:
                regex = UNITY_BASIC_REGEX

            res = list(regex.finditer(log))
        else:
            res = []

        # real parsing
        if len(res) == 0:
            logging.warning(f'unity test case not found, use case {case.name} instead')
            attrs = {'name': case.name, 'result': 'FAIL', 'message': self.pexpect_proc.buffer_debug_str}
        elif len(res) == 1:
            attrs = {k: v for k, v in res[0].groupdict().items() if v is not None}
        else:
            warnings.warn('This function is for recording single unity test case only. Use the last matched one')
            attrs = {k: v for k, v in res[-1].groupdict().items() if v is not None}

        if additional_attrs:
            attrs.update(additional_attrs)

        testcase = TestCase(**attrs)
        self.testsuite.testcases.append(testcase)
        if testcase.result == 'FAIL':
            self.testsuite.attrs['failures'] += 1
        elif testcase.result == 'IGNORE':
            self.testsuite.attrs['skipped'] += 1
        else:
            self.testsuite.attrs['tests'] += 1

    @_record_single_unity_test_case
    def _run_normal_case(
        self,
        case: UnittestMenuCase,
        reset: bool = False,
        timeout: float = 30,
    ) -> None:
        """
        Run a specific normal case

        Notes:
            Will skip with a warning if the case type is not "normal"

        Args:
            case: the specific case that parsed in test menu
            reset: whether to perform a hardware reset before running a case
            timeout: timeout. (Default: 30 seconds)
        """
        if case.type != 'normal':
            logging.warning('case %s is not a normal case', case.name)
            return

        self.expect_exact(READY_PATTERN_LIST, timeout=timeout)
        self.write(str(case.index))
        self.expect_exact(f'Running {case.name}...', timeout=1)

    @_record_single_unity_test_case
    def _run_multi_stage_case(
        self,
        case: UnittestMenuCase,
        reset: bool = False,
        timeout: float = 30,
    ) -> None:
        """
        Run a specific multi_stage case

        Notes:
            Will skip with a warning if the case type is not "multi_stage"

        Args:
            case: the specific case that parsed in test menu
            reset: whether to perform a hardware reset before running a case
            timeout: timeout. (Default: 30 seconds)
        """
        if case.type != 'multi_stage':
            logging.warning('case %s is not a multi stage case', case.name)
            return

        _start_at = time.perf_counter()
        _timestamp = _start_at
        for sub_case in case.subcases:
            _timeout = timeout - _timestamp + _start_at
            if _timeout < 0:  # pexpect process would expect 30s if < 0
                _timeout = 0
            self.expect_exact(READY_PATTERN_LIST, timeout=_timeout)
            self.write(str(case.index))
            self.expect_exact(case.name, timeout=1)
            self.write(str(sub_case['index']))
            _timestamp = time.perf_counter()

    def run_all_single_board_cases(
        self,
        group: t.Optional[str] = None,
        reset: bool = False,
        timeout: float = 30,
        run_ignore_cases: bool = False,
    ):
        """
        Run all multi_stage cases

        Args:
            group: test case group
            reset: whether to perform a hardware reset before running a case
            timeout: timeout. (Default: 30 seconds)
            run_ignore_cases: run ignored test cases or not
        """
        for case in self.test_menu:
            if not group or group in case.groups:
                if not case.is_ignored or run_ignore_cases:
                    if case.type == 'normal':
                        self._run_normal_case(case, reset=reset, timeout=timeout)
                    elif case.type == 'multi_stage':
                        self._run_multi_stage_case(case, reset=reset, timeout=timeout)

    def write(self, data: t.AnyStr) -> None:
        data_str = to_str(data).strip('\n') or ''
        if data_str == '*':
            warnings.warn(
                'if you\'re using `dut.expect_exact("Press ENTER to see the list of tests"); '
                'dut.write("*"); dut.expect_unity_test_output()` to run esp-idf unity tests, '
                'please consider using `dut.run_all_single_board_cases()` instead. '
                'It could help record the duration time and the error messages even for crashed test cases.',
                UserHint,
            )

        if data_str and data_str[0] == '[' and data_str[-1] == ']':
            group_name = data_str[1:-1]
            warnings.warn(
                f'if you\'re using `dut.expect_exact("Press ENTER to see the list of tests"); '
                f'dut.write("{data_str}"); dut.expect_unity_test_output()` to run esp-idf unity tests, '
                f'please consider using `dut.run_all_single_board_cases(group="{group_name}")` instead. '
                f'It could help record the duration time and the error messages even for crashed test cases.',
                UserHint,
            )

        super().write(data)

    ################
    # JTAG related #
    ################
    def setup_jtag(self):
        super().setup_jtag()
        if self.gdb:
            self.gdb.write(f'file {self.app.elf_file}')

        run_flash = True
        if self._meta and self._meta.hit_port_app_cache(self.serial.port, self.app):
            run_flash = False

        if run_flash:
            self.flash_via_jtag()

    def flash_via_jtag(self):
        if not self.openocd:
            logging.warning('no openocd instance created. can\'t flash via openocd `program_esp`')
            return

        if self.app.is_loadable_elf:
            # loadable elf flash to ram. no cache.
            # load via test script.
            # For example:
            # self.gdb.write('mon reset halt')
            # self.gdb.write('thb *0x40007d54')
            # self.gdb.write('c')
            # self.gdb.write('load')
            return

        for _f in self.app.flash_files:
            if _f.encrypted:
                raise ValueError('Encrypted files can\'t be flashed in via JTAG')
            self.openocd.write(f'program_esp {_f.file_path} {hex(_f.offset)} verify')

        if self._meta:
            self._meta.set_port_app_cache(self.serial.port, self.app)

panic_output_decode_script: t.Optional[str] property

Returns:

Type Description
t.Optional[str]

Panic output decode script path

toolchain_prefix: str property

Returns:

Type Description
str

Toolchain prefix according to the self.target

run_all_single_board_cases(group=None, reset=False, timeout=30, run_ignore_cases=False)

Run all multi_stage cases

Parameters:

Name Type Description Default
group t.Optional[str]

test case group

None
reset bool

whether to perform a hardware reset before running a case

False
timeout float

timeout. (Default: 30 seconds)

30
run_ignore_cases bool

run ignored test cases or not

False
Source code in pytest_embedded_idf/dut.py
def run_all_single_board_cases(
    self,
    group: t.Optional[str] = None,
    reset: bool = False,
    timeout: float = 30,
    run_ignore_cases: bool = False,
):
    """
    Run all multi_stage cases

    Args:
        group: test case group
        reset: whether to perform a hardware reset before running a case
        timeout: timeout. (Default: 30 seconds)
        run_ignore_cases: run ignored test cases or not
    """
    for case in self.test_menu:
        if not group or group in case.groups:
            if not case.is_ignored or run_ignore_cases:
                if case.type == 'normal':
                    self._run_normal_case(case, reset=reset, timeout=timeout)
                elif case.type == 'multi_stage':
                    self._run_multi_stage_case(case, reset=reset, timeout=timeout)

pytest_embedded_idf.unity_tester

CaseTester

The Generic tester of all the types

Attributes:

Name Type Description
group t.List[MultiDevResource]

The group of the devices' resources

dut IdfDut

The first dut if there is more than one

test_menu t.List[UnittestMenuCase]

The list of the cases

Source code in pytest_embedded_idf/unity_tester.py
class CaseTester:
    """
    The Generic tester of all the types

    Attributes:
        group (t.List[MultiDevResource]): The group of the devices' resources
        dut (IdfDut): The first dut if there is more than one
        test_menu (t.List[UnittestMenuCase]): The list of the cases
    """

    # The signal pattens come from 'test_utils.c'
    SEND_SIGNAL_PREFIX = 'Send signal: '
    WAIT_SIGNAL_PREFIX = 'Waiting for signal: '
    UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!'
    UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!'

    def __init__(self, dut: t.Union['IdfDut', t.List['IdfDut']]) -> None:  # type: ignore
        """
        Create the object for every dut and put them into the group
        """
        if isinstance(dut, Iterable):
            self.is_multi_dut = True
            self.dut = list(dut)
            self.first_dut = self.dut[0]
            self.test_menu = self.first_dut.test_menu
        else:
            self.is_multi_dut = False
            self.dut = dut
            self.first_dut = dut
            self.test_menu = self.dut.test_menu

        if self.is_multi_dut:
            self.group: t.List[MultiDevResource] = []
            if isinstance(dut, list):
                for item in dut:
                    dev_res = MultiDevResource(item)
                    self.group.append(dev_res)

    def _wait_multi_dev_case_finish(self, timeout: float = DEFAULT_TIMEOUT) -> None:
        """
        Wait until all the sub-cases of this multi_device case finished
        """
        for d in self.group:
            if d.sem.acquire(timeout=timeout):
                d.sem.release()
            else:
                raise TimeoutError('Wait case to finish timeout')

    def _start_sub_case_thread(
        self,
        dev_res: MultiDevResource,
        case: UnittestMenuCase,
        sub_case_index: int,
        case_start_time: float,
        start_retry: int = DEFAULT_START_RETRY,
    ) -> None:
        """
        Start the thread monitoring on the corresponding dut of the sub-case
        """
        # Allocate the kwargs that pass to '_run'
        _kwargs = {
            'dut': dev_res.dut,
            'dev_res': dev_res,
            'case': case,
            'sub_case_index': sub_case_index,
            'start_retry': start_retry,
            'start_time': case_start_time,
        }

        # Create the thread of the sub-case
        dev_res.thread = Thread(target=self._run, kwargs=_kwargs, daemon=True)
        dev_res.thread.start()
        # Thread starts, acquire the semaphore to block '_wait_multi_dev_case_finish'
        dev_res.sem.acquire()

    def _run(self, **kwargs) -> None:  # type: ignore
        """
        The thread target function
        Will run for each case on each dut

        Call the wrapped function to trigger the case
        Then keep listening on the dut for the signal

            - If the dut send a signal, it will be put into others' recv_sig
            - If the dut waits for a signal, it block and keep polling for the recv_sig until get the signal it requires
            - If the dut finished running the case, it will quite the loop and terminate the thread
        """
        signal_pattern_list = [
            self.UNITY_SEND_SIGNAL_REGEX,  # The dut send a signal
            self.UNITY_WAIT_SIGNAL_REGEX,  # The dut is blocked and waiting for a signal
            unity.UNITY_SUMMARY_LINE_REGEX,  # Means the case finished
        ]
        dut = kwargs['dut']
        dev_res = kwargs['dev_res']
        case = kwargs['case']
        sub_case_index = kwargs['sub_case_index']
        start_retry = kwargs['start_retry']
        start_time = kwargs['start_time']
        # Start the case
        dut.expect_exact(READY_PATTERN_LIST)
        # Retry at defined number of times if not write successfully
        for retry in range(start_retry):
            dut.write(str(case.index))
            try:
                dut.expect_exact(case.name, timeout=1)
                break
            except TIMEOUT as e:
                if retry >= start_retry - 1:
                    dev_res.sem.release()
                    raise e

        dut.write(str(sub_case_index))

        # Wait for the specific patterns, only exist when the sub-case finished
        while True:
            pat = dut.expect(signal_pattern_list, timeout=60)
            if pat is not None:
                match_str = pat.group().decode('utf-8')

                # Send a signal
                if self.SEND_SIGNAL_PREFIX in match_str:
                    send_sig = pat.group(1).decode('utf-8')
                    for d in self.group:
                        d.recv_sig.append(send_sig)

                # Waiting for a signal
                elif self.WAIT_SIGNAL_PREFIX in match_str:
                    wait_sig = pat.group(1).decode('utf-8')
                    while True:
                        if wait_sig in dev_res.recv_sig:
                            dev_res.recv_sig.remove(wait_sig)
                            dut.write('')
                            break
                        # Keep waiting the signal
                        else:
                            time.sleep(0.1)

                # Case finished
                elif 'Tests' in match_str:
                    case_end_time = time.perf_counter()
                    case_duration = case_end_time - start_time
                    additional_attrs = {'time': round(case_duration, 3)}
                    log = utils.remove_asci_color_code(dut.pexpect_proc.before)
                    dut.testsuite.add_unity_test_cases(log, additional_attrs=additional_attrs)
                    break

        # The case finished, release the semaphore to unblock the '_wait_multi_dev_case_finish'
        dev_res.sem.release()

    def run_multi_dev_case(
        self,
        case: UnittestMenuCase,
        reset: bool = False,
        timeout: float = DEFAULT_TIMEOUT,
        start_retry: int = DEFAULT_START_RETRY,
    ) -> None:
        """
        Run a specific multi_device case

        Notes:
            Will skip with a warning if the case type is not multi_device

        Args:
            case: the specific case that parsed in test menu
            reset: whether to perform a hardware reset before running a case
            timeout: timeout in second
            start_retry (int): number of retries for a single case when it is failed to start
        """
        if case.type != 'multi_device':
            logging.warning('case %s is not a multi device case', case.name)
            return

        if not self.is_multi_dut:
            logging.warning(
                'multi-device mode is not activated. Please refer to '
                'https://docs.espressif.com/projects/pytest-embedded/en/latest/key_concepts/#multi-duts '
                'for detailed documents'
            )
            return

        if reset:
            for dev_res in self.group:
                dev_res.dut.serial.hard_reset()

        start_time = time.perf_counter()
        for sub_case in case.subcases:
            if isinstance(sub_case['index'], str):
                index = int(sub_case['index'], 10)
            else:
                index = sub_case['index']
            self._start_sub_case_thread(
                dev_res=self.group[index - 1],
                case=case,
                sub_case_index=index,
                case_start_time=start_time,
                start_retry=start_retry,
            )
        # Waiting all the devices to finish their test cases
        self._wait_multi_dev_case_finish(timeout=timeout)

    def run_all_multi_dev_cases(
        self,
        reset: bool = False,
        timeout: float = DEFAULT_TIMEOUT,
        start_retry: int = DEFAULT_START_RETRY,
    ) -> None:
        """
        Run only multi_device cases

        Args:
            reset: whether to perform a hardware reset before running a case
            timeout: timeout in second
            start_retry (int): number of retries for a single case when it is failed to start
        """
        for case in self.test_menu:
            # Run multi_device case on every device
            self.run_multi_dev_case(case, reset, timeout, start_retry)

    def run_all_cases(
        self,
        reset: bool = False,
        timeout: int = DEFAULT_TIMEOUT,
        start_retry: int = DEFAULT_START_RETRY,
    ) -> None:
        """
        Run all cases

        Args:
            reset: whether to perform a hardware reset before running a case
            timeout: timeout in second
            start_retry (int): number of retries for a single case when it is failed to start
        """
        for case in self.test_menu:
            self.run_case(case, reset, timeout=timeout, start_retry=start_retry)

    def run_case(
        self,
        case: UnittestMenuCase,
        reset: bool = False,
        timeout: int = DEFAULT_TIMEOUT,
        start_retry: int = DEFAULT_START_RETRY,
    ) -> None:
        """
        Run a specific case

        Args:
            case: the specific case that parsed in test menu
            reset: whether to perform a hardware reset before running a case
            timeout: timeout in second
            start_retry (int): number of retries for a single case when it is failed to start
        """
        if case.type == 'normal':
            self.first_dut._run_normal_case(case, reset=reset, timeout=timeout)
        elif case.type == 'multi_stage':
            self.first_dut._run_multi_stage_case(case, reset=reset, timeout=timeout)
        elif case.type == 'multi_device':
            self.run_multi_dev_case(case, reset=reset, timeout=timeout, start_retry=start_retry)

__init__(dut)

Create the object for every dut and put them into the group

Source code in pytest_embedded_idf/unity_tester.py
def __init__(self, dut: t.Union['IdfDut', t.List['IdfDut']]) -> None:  # type: ignore
    """
    Create the object for every dut and put them into the group
    """
    if isinstance(dut, Iterable):
        self.is_multi_dut = True
        self.dut = list(dut)
        self.first_dut = self.dut[0]
        self.test_menu = self.first_dut.test_menu
    else:
        self.is_multi_dut = False
        self.dut = dut
        self.first_dut = dut
        self.test_menu = self.dut.test_menu

    if self.is_multi_dut:
        self.group: t.List[MultiDevResource] = []
        if isinstance(dut, list):
            for item in dut:
                dev_res = MultiDevResource(item)
                self.group.append(dev_res)

run_all_cases(reset=False, timeout=DEFAULT_TIMEOUT, start_retry=DEFAULT_START_RETRY)

Run all cases

Parameters:

Name Type Description Default
reset bool

whether to perform a hardware reset before running a case

False
timeout int

timeout in second

DEFAULT_TIMEOUT
start_retry int

number of retries for a single case when it is failed to start

DEFAULT_START_RETRY
Source code in pytest_embedded_idf/unity_tester.py
def run_all_cases(
    self,
    reset: bool = False,
    timeout: int = DEFAULT_TIMEOUT,
    start_retry: int = DEFAULT_START_RETRY,
) -> None:
    """
    Run all cases

    Args:
        reset: whether to perform a hardware reset before running a case
        timeout: timeout in second
        start_retry (int): number of retries for a single case when it is failed to start
    """
    for case in self.test_menu:
        self.run_case(case, reset, timeout=timeout, start_retry=start_retry)

run_all_multi_dev_cases(reset=False, timeout=DEFAULT_TIMEOUT, start_retry=DEFAULT_START_RETRY)

Run only multi_device cases

Parameters:

Name Type Description Default
reset bool

whether to perform a hardware reset before running a case

False
timeout float

timeout in second

DEFAULT_TIMEOUT
start_retry int

number of retries for a single case when it is failed to start

DEFAULT_START_RETRY
Source code in pytest_embedded_idf/unity_tester.py
def run_all_multi_dev_cases(
    self,
    reset: bool = False,
    timeout: float = DEFAULT_TIMEOUT,
    start_retry: int = DEFAULT_START_RETRY,
) -> None:
    """
    Run only multi_device cases

    Args:
        reset: whether to perform a hardware reset before running a case
        timeout: timeout in second
        start_retry (int): number of retries for a single case when it is failed to start
    """
    for case in self.test_menu:
        # Run multi_device case on every device
        self.run_multi_dev_case(case, reset, timeout, start_retry)

run_case(case, reset=False, timeout=DEFAULT_TIMEOUT, start_retry=DEFAULT_START_RETRY)

Run a specific case

Parameters:

Name Type Description Default
case UnittestMenuCase

the specific case that parsed in test menu

required
reset bool

whether to perform a hardware reset before running a case

False
timeout int

timeout in second

DEFAULT_TIMEOUT
start_retry int

number of retries for a single case when it is failed to start

DEFAULT_START_RETRY
Source code in pytest_embedded_idf/unity_tester.py
def run_case(
    self,
    case: UnittestMenuCase,
    reset: bool = False,
    timeout: int = DEFAULT_TIMEOUT,
    start_retry: int = DEFAULT_START_RETRY,
) -> None:
    """
    Run a specific case

    Args:
        case: the specific case that parsed in test menu
        reset: whether to perform a hardware reset before running a case
        timeout: timeout in second
        start_retry (int): number of retries for a single case when it is failed to start
    """
    if case.type == 'normal':
        self.first_dut._run_normal_case(case, reset=reset, timeout=timeout)
    elif case.type == 'multi_stage':
        self.first_dut._run_multi_stage_case(case, reset=reset, timeout=timeout)
    elif case.type == 'multi_device':
        self.run_multi_dev_case(case, reset=reset, timeout=timeout, start_retry=start_retry)

run_multi_dev_case(case, reset=False, timeout=DEFAULT_TIMEOUT, start_retry=DEFAULT_START_RETRY)

Run a specific multi_device case

Notes

Will skip with a warning if the case type is not multi_device

Parameters:

Name Type Description Default
case UnittestMenuCase

the specific case that parsed in test menu

required
reset bool

whether to perform a hardware reset before running a case

False
timeout float

timeout in second

DEFAULT_TIMEOUT
start_retry int

number of retries for a single case when it is failed to start

DEFAULT_START_RETRY
Source code in pytest_embedded_idf/unity_tester.py
def run_multi_dev_case(
    self,
    case: UnittestMenuCase,
    reset: bool = False,
    timeout: float = DEFAULT_TIMEOUT,
    start_retry: int = DEFAULT_START_RETRY,
) -> None:
    """
    Run a specific multi_device case

    Notes:
        Will skip with a warning if the case type is not multi_device

    Args:
        case: the specific case that parsed in test menu
        reset: whether to perform a hardware reset before running a case
        timeout: timeout in second
        start_retry (int): number of retries for a single case when it is failed to start
    """
    if case.type != 'multi_device':
        logging.warning('case %s is not a multi device case', case.name)
        return

    if not self.is_multi_dut:
        logging.warning(
            'multi-device mode is not activated. Please refer to '
            'https://docs.espressif.com/projects/pytest-embedded/en/latest/key_concepts/#multi-duts '
            'for detailed documents'
        )
        return

    if reset:
        for dev_res in self.group:
            dev_res.dut.serial.hard_reset()

    start_time = time.perf_counter()
    for sub_case in case.subcases:
        if isinstance(sub_case['index'], str):
            index = int(sub_case['index'], 10)
        else:
            index = sub_case['index']
        self._start_sub_case_thread(
            dev_res=self.group[index - 1],
            case=case,
            sub_case_index=index,
            case_start_time=start_time,
            start_retry=start_retry,
        )
    # Waiting all the devices to finish their test cases
    self._wait_multi_dev_case_finish(timeout=timeout)

MultiDevResource

Resources of multi_dev dut

Attributes:

Name Type Description
dut IdfDut

Object of the Device under test

sem Semaphore

Semaphore of monitoring whether the case finished

recv_sig t.List[str]

The list of received signals from other dut

thread Thread

The thread of monitoring the signals

Source code in pytest_embedded_idf/unity_tester.py
class MultiDevResource:
    """
    Resources of multi_dev dut

    Attributes:
        dut (IdfDut): Object of the Device under test
        sem (Semaphore): Semaphore of monitoring whether the case finished
        recv_sig (t.List[str]): The list of received signals from other dut
        thread (Thread): The thread of monitoring the signals
    """

    def __init__(self, dut: 'IdfDut') -> None:
        self.dut = dut
        self.sem = Semaphore()
        self.recv_sig: t.List[str] = []
        self.thread: Thread = None  # type: ignore

UnittestMenuCase dataclass

Dataclass of esp-idf unit test cases parsed from test menu

Attributes:

Name Type Description
index int

The index of the case, which can be used to run this case.

name str

The name of the case.

type str

Type of this case, which can be normal multi_stage or multi_device.

keywords t.List[str]

List of additional keywords of this case. For now, we have disable and ignore.

groups t.List[str]

List of groups of this case, this is usually the component which this case belongs to.

attributes t.Dict[str, t.Any]

Dict of attributes of this case, which is used to describe timeout duration, test environment, etc.

subcases t.List[t.Dict[str, t.Any]]

List of dict of subcases of this case, if this case is a multi_stage or multi_device one.

Source code in pytest_embedded_idf/unity_tester.py
@dataclass
class UnittestMenuCase:
    """
    Dataclass of esp-idf unit test cases parsed from test menu

    Attributes:
        index: The index of the case, which can be used to run this case.
        name: The name of the case.
        type: Type of this case, which can be `normal` `multi_stage` or `multi_device`.
        keywords: List of additional keywords of this case. For now, we have `disable` and `ignore`.
        groups: List of groups of this case, this is usually the component which this case belongs to.
        attributes: Dict of attributes of this case, which is used to describe timeout duration,
            test environment, etc.
        subcases: List of dict of subcases of this case, if this case is a `multi_stage` or `multi_device` one.
    """

    index: int
    name: str
    type: str
    keywords: t.List[str]
    groups: t.List[str]
    attributes: t.Dict[str, t.Any]
    subcases: t.List[t.Dict[str, t.Any]]

    @property
    def is_ignored(self):
        return 'ignore' in self.keywords or 'disable' in self.keywords