Skip to content

Pytest embedded serial

pytest_embedded_serial.serial

Serial

Bases: DuplicateStdoutMixin

Custom serial class

Attributes:

Name Type Description
port_config dict[str, Any]

port configs

proc serial.Serial

process created by serial.serial_for_url()

Source code in pytest_embedded_serial/serial.py
class Serial(DuplicateStdoutMixin):
    """
    Custom serial class

    Attributes:
        port_config (dict[str, Any]): port configs
        proc (serial.Serial): process created by `serial.serial_for_url()`
    """

    DEFAULT_BAUDRATE = 115200

    DEFAULT_PORT_CONFIG = {
        'baudrate': DEFAULT_BAUDRATE,
        'bytesize': pyserial.EIGHTBITS,
        'parity': pyserial.PARITY_NONE,
        'stopbits': pyserial.STOPBITS_ONE,
        'timeout': 0.05,  # read timeout
        'xonxoff': False,
        'rtscts': False,
    }

    occupied_ports: Dict[str, None] = dict()

    def __init__(
        self, pexpect_proc: PexpectProcess, port: Union[str, pyserial.Serial], baud: int = DEFAULT_BAUDRATE, **kwargs
    ):
        """
        Args:
            pexpect_proc: `PexpectProcess` instance
            port: port string or pyserial Serial instance
        """
        super().__init__()

        if port is None:
            raise ValueError('Please specify port')

        if isinstance(port, str):
            self.port = port
            self.port_config = copy.deepcopy(self.DEFAULT_PORT_CONFIG)
            self.port_config['baudrate'] = baud
            self.port_config.update(**kwargs)
            self.proc = pyserial.serial_for_url(self.port, **self.port_config)
        else:  # pyserial instance
            self.proc = port
            self.proc.timeout = self.DEFAULT_PORT_CONFIG['timeout']  # set read timeout
            self.port = self.proc.port

        self.pexpect_proc = pexpect_proc
        self.baud = baud

        self._post_init()

        self._start()

        self._finalize_init()

    def _post_init(self):
        pass

    def _start(self):
        pass

    def _finalize_init(self):
        self.occupied_ports[self.port] = None
        logging.debug(f'occupied {self.port}')

    def _forward_io(self, pexpect_proc: PexpectProcess) -> None:
        while self.proc.is_open:
            try:
                s = self.proc.read_all()
                pexpect_proc.write(s)
            except:  # noqa daemon thread may run at any case
                break

    def stop_redirect_thread(self) -> bool:
        """
        Close the serial port and reopen it to kill the redirect daemon thread.

        Returns:
            Killed the redirect thread or not
        """
        killed = False
        if self._forward_io_thread and self._forward_io_thread.is_alive():
            self.proc.close()
            time.sleep(0.1)
            self.proc.open()  # to kill the redirect stdout thread
            killed = True

        return killed

    @contextlib.contextmanager
    def disable_redirect_thread(self):
        killed = self.stop_redirect_thread()

        yield killed

        if killed:
            self.create_forward_io_thread(self.pexpect_proc)

__init__(pexpect_proc, port, baud=DEFAULT_BAUDRATE, **kwargs)

Parameters:

Name Type Description Default
pexpect_proc PexpectProcess

PexpectProcess instance

required
port Union[str, pyserial.Serial]

port string or pyserial Serial instance

required
Source code in pytest_embedded_serial/serial.py
def __init__(
    self, pexpect_proc: PexpectProcess, port: Union[str, pyserial.Serial], baud: int = DEFAULT_BAUDRATE, **kwargs
):
    """
    Args:
        pexpect_proc: `PexpectProcess` instance
        port: port string or pyserial Serial instance
    """
    super().__init__()

    if port is None:
        raise ValueError('Please specify port')

    if isinstance(port, str):
        self.port = port
        self.port_config = copy.deepcopy(self.DEFAULT_PORT_CONFIG)
        self.port_config['baudrate'] = baud
        self.port_config.update(**kwargs)
        self.proc = pyserial.serial_for_url(self.port, **self.port_config)
    else:  # pyserial instance
        self.proc = port
        self.proc.timeout = self.DEFAULT_PORT_CONFIG['timeout']  # set read timeout
        self.port = self.proc.port

    self.pexpect_proc = pexpect_proc
    self.baud = baud

    self._post_init()

    self._start()

    self._finalize_init()

stop_redirect_thread()

Close the serial port and reopen it to kill the redirect daemon thread.

Returns:

Type Description
bool

Killed the redirect thread or not

Source code in pytest_embedded_serial/serial.py
def stop_redirect_thread(self) -> bool:
    """
    Close the serial port and reopen it to kill the redirect daemon thread.

    Returns:
        Killed the redirect thread or not
    """
    killed = False
    if self._forward_io_thread and self._forward_io_thread.is_alive():
        self.proc.close()
        time.sleep(0.1)
        self.proc.open()  # to kill the redirect stdout thread
        killed = True

    return killed

pytest_embedded_serial.dut

SerialDut

Bases: Dut

Dut class for serial ports

Source code in pytest_embedded_serial/dut.py
class SerialDut(Dut):
    """
    Dut class for serial ports
    """

    def __init__(self, pexpect_proc: PexpectProcess, app: App, serial: Serial, **kwargs) -> None:
        """
        Args:
            pexpect_proc: `PexpectProcess` instance
            app: `App` instance
            serial: `Serial` instance
        """
        super().__init__(pexpect_proc, app, **kwargs)

        self.serial = serial
        self.serial.create_forward_io_thread(self.pexpect_proc)

    def write(self, data: AnyStr) -> int:
        return self.serial.proc.write(to_bytes(data, '\n'))

    def close(self) -> None:
        self.serial.proc.close()
        self.serial.occupied_ports.pop(self.serial.port, None)
        logging.debug(f'released {self.serial.port}')

        super().close()

__init__(pexpect_proc, app, serial, **kwargs)

Parameters:

Name Type Description Default
pexpect_proc PexpectProcess

PexpectProcess instance

required
app App

App instance

required
serial Serial

Serial instance

required
Source code in pytest_embedded_serial/dut.py
def __init__(self, pexpect_proc: PexpectProcess, app: App, serial: Serial, **kwargs) -> None:
    """
    Args:
        pexpect_proc: `PexpectProcess` instance
        app: `App` instance
        serial: `Serial` instance
    """
    super().__init__(pexpect_proc, app, **kwargs)

    self.serial = serial
    self.serial.create_forward_io_thread(self.pexpect_proc)