diff --git a/CHANGELOG.md b/CHANGELOG.md index a408b164..6cc8bf31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## [Unreleased] +- Added basic Zephyr board support. ## [0.8.3] - 2020-09-22 diff --git a/tbot/__init__.py b/tbot/__init__.py index a884557a..99c4c6fc 100644 --- a/tbot/__init__.py +++ b/tbot/__init__.py @@ -23,6 +23,7 @@ from .selectable import ( acquire_lab, acquire_board, + acquire_zephyr, acquire_uboot, acquire_linux, acquire_local, @@ -31,6 +32,7 @@ testcase as _testcase_decorator, named_testcase, with_lab, + with_zephyr, with_uboot, with_linux, ) @@ -39,6 +41,7 @@ "selectable", "acquire_lab", "acquire_board", + "acquire_zephyr", "acquire_uboot", "acquire_linux", "acquire_local", @@ -47,6 +50,7 @@ "testcase", "named_testcase", "with_lab", + "with_zephyr", "with_uboot", "with_linux", ) diff --git a/tbot/decorators.py b/tbot/decorators.py index f94b9a15..67857aa9 100644 --- a/tbot/decorators.py +++ b/tbot/decorators.py @@ -40,7 +40,7 @@ def __new__(cls, ty: typing.Any, name: typing.Optional[str] = None) -> None: pass -__all__ = ("testcase", "with_lab", "with_uboot", "with_linux") +__all__ = ("testcase", "with_lab", "with_zephyr", "with_uboot", "with_linux") F_tc = typing.TypeVar("F_tc", bound=typing.Callable[..., typing.Any]) @@ -163,6 +163,108 @@ def wrapped( return typing.cast(F_lab, wrapped) +F_ze = typing.TypeVar("F_ze", bound=typing.Callable[..., typing.Any]) +F_zephyr = typing.Callable[ + [ + mypy.DefaultArg(typing.Union[selectable.LabHost, board.ZephyrShell, None]), + mypy.VarArg(typing.Any), + mypy.KwArg(typing.Any), + ], + typing.Any, +] + + +def with_zephyr(tc: F_ze) -> F_zephyr: + """ + Decorate a function to automatically supply a Zephyr machine as an argument. + + The idea is that when using this decorator and calling the testcase + without an already initialized Zephyr machine, tbot will automatically + acquire the selected one. + + **Example**:: + + from tbot.machine import board + + @tbot.testcase + @tbot.with_zephyr + def testcase_with_zephyr(ze: board.ZephyrShell) -> None: + ze.exec0("version") + + This is essentially syntactic sugar for:: + + import contextlib + import typing + import tbot + from tbot.machine import board, linux + + @tbot.testcase + def testcase_with_zephyr( + lab_or_ze: typing.Union[linux.Lab, board.ZephyrShell, None] = None, + ) -> None: + with contextlib.ExitStack() as cx: + lh: linux.Lab + ze: board.ZephyrShell + + if isinstance(lab_or_ze, linux.Lab): + lh = cx.enter_context(lab_or_ze) + elif isinstance(lab_or_ze, board.ZephyrShell): + lh = cx.enter_context(lab_or_ze.host) + else: + lh = cx.enter_context(tbot.acquire_lab()) + + if isinstance(lab_or_ze, board.ZephyrShell): + ze = cx.enter_context(lab_or_ze) + else: + b = cx.enter_context(tbot.acquire_board(lh)) + ze = cx.enter_context(tbot.acquire_zephyr(b)) + + ze.exec("help") + + .. warning:: + While making your life a lot easier, this decorator unfortunately has + a drawback: It will erase the type signature of your testcase, so you + can no longer rely on type-checking when using the testcase downstream. + """ + + @functools.wraps(tc) + def wrapped( + arg: typing.Union[selectable.LabHost, board.ZephyrShell, None] = None, + *args: typing.Any, + **kwargs: typing.Any, + ) -> typing.Any: + with contextlib.ExitStack() as cx: + lh: selectable.LabHost + ze: board.ZephyrShell + + # Acquire LabHost + if arg is None: + lh = cx.enter_context(selectable.acquire_lab()) + elif isinstance(arg, linux.Lab): + lh = cx.enter_context(arg) + elif not isinstance(arg, board.ZephyrShell): + raise TypeError( + f"Argument to {tc!r} must either be a lab-host or a ZephyrShell (found {arg!r})" + ) + + # Acquire Zephyr + if isinstance(arg, board.ZephyrShell): + ze = cx.enter_context(arg) + else: + b = cx.enter_context(selectable.acquire_board(lh)) + ze = cx.enter_context(selectable.acquire_zephyr(b)) + + return tc(ze, *args, **kwargs) + + # Adjust annotation + argname = tc.__code__.co_varnames[0] + wrapped.__annotations__[argname] = typing.Union[ + selectable.LabHost, board.ZephyrShell, None + ] + + return typing.cast(F_zephyr, wrapped) + + F_ub = typing.TypeVar("F_ub", bound=typing.Callable[..., typing.Any]) F_uboot = typing.Callable[ [ diff --git a/tbot/machine/board/__init__.py b/tbot/machine/board/__init__.py index a6a90435..549311f2 100644 --- a/tbot/machine/board/__init__.py +++ b/tbot/machine/board/__init__.py @@ -1,6 +1,7 @@ import typing import tbot +from .zephyr import ZephyrShell from .uboot import UBootShell, UBootAutobootIntercept from .board import PowerControl, Board, Connector from .linux import LinuxUbootConnector, LinuxBootLogin @@ -19,6 +20,8 @@ "UBootAutobootIntercept", "UBootMachine", "UBootShell", + "ZephyrMachine", + "ZephyrShell", ) diff --git a/tbot/machine/board/zephyr.py b/tbot/machine/board/zephyr.py new file mode 100644 index 00000000..27bf7585 --- /dev/null +++ b/tbot/machine/board/zephyr.py @@ -0,0 +1,208 @@ +# tbot, Embedded Automation Tool +# Copyright (C) 2019 Harald Seiler +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import contextlib +import time +import typing + +import tbot +from .. import shell, machine +from ..linux import special + + +class ZephyrStartupEvent(tbot.log.EventIO): + def __init__(self, ze: machine.Machine) -> None: + self.ze = ze + super().__init__( + ["board", "zephyr", ze.name], + tbot.log.c("ZEPHYR").bold + f" ({ze.name})", + verbosity=tbot.log.Verbosity.QUIET, + ) + + self.verbosity = tbot.log.Verbosity.STDOUT + self.prefix = " <> " + + def close(self) -> None: + setattr(self.ze, "bootlog", self.getvalue()) + self.data["output"] = self.getvalue() + super().close() + + +class ZephyrStartup(machine.Machine): + _zephyr_init_event: typing.Optional[tbot.log.EventIO] = None + _timeout_start: typing.Optional[float] = None + + boot_timeout: typing.Optional[float] = None + """ + Maximum time from power-on to Zephyr shell. + + If tbot can't reach the Zephyr shell during this time, an exception will be thrown. + """ + + def _zephyr_startup_event(self) -> tbot.log.EventIO: + if self._zephyr_init_event is None: + self._zephyr_init_event = ZephyrStartupEvent(self) + + self._timeout_start = time.monotonic() + + return self._zephyr_init_event + + +ArgTypes = typing.Union[str, special.Special] + + +class ZephyrShell(shell.Shell, ZephyrStartup): + """ + Zephyr shell. + + The interface of this shell is really simple. It provides + + - :py:meth:`ze.exec() ` - Run command + and return output. + - :py:meth:`ze.interactive() ` - + Start an interactive session for this machine. + """ + + prompt: typing.Union[str, bytes] = "shell> " + """ + Prompt which was configured for Zephyr. + + Commonly ``"shell> "``. + + .. warning:: + + **Don't forget the trailing space, if your prompt has one!** + """ + + bootlog: str + """Transcript of console output during boot.""" + + @contextlib.contextmanager + def _init_shell(self) -> typing.Iterator: + with self._zephyr_startup_event() as ev, self.ch.with_stream(ev): + self.ch.prompt = ( + self.prompt.encode("utf-8") + if isinstance(self.prompt, str) + else self.prompt + ) + + # Set a blacklist of control characters. These characters were + # copied over from the U-Boot board. + self.ch._write_blacklist = [ + 0x00, # NUL | Null + 0x01, # SOH | Start of Heading + 0x02, # STX | Start of Text + 0x03, # ETX | End of Text / Interrupt + 0x04, # EOT | End of Transmission + 0x05, # ENQ | Enquiry + 0x06, # ACK | Acknowledge + 0x07, # BEL | Bell, Alert + 0x08, # BS | Backspace + 0x09, # HT | Character Tabulation, Horizontal Tabulation + 0x0B, # VT | Line Tabulation, Vertical Tabulation + 0x0C, # FF | Form Feed + 0x0E, # SO | Shift Out + 0x0F, # SI | Shift In + 0x10, # DLE | Data Link Escape + 0x11, # DC1 | Device Control One (XON) + 0x12, # DC2 | Device Control Two + 0x13, # DC3 | Device Control Three (XOFF) + 0x14, # DC4 | Device Control Four + 0x15, # NAK | Negative Acknowledge + 0x16, # SYN | Synchronous Idle + 0x17, # ETB | End of Transmission Block + 0x18, # CAN | Cancel + 0x1A, # SUB | Substitute / Suspend Process + 0x1B, # ESC | Escape + 0x1C, # FS | File Separator + 0x7F, # DEL | Delete + ] + + while True: + if self.boot_timeout is not None: + assert self._timeout_start is not None + if (time.monotonic() - self._timeout_start) > self.boot_timeout: + raise TimeoutError("Zephyr did not reach shell in time") + try: + self.ch.read_until_prompt(timeout=0.2) + break + except TimeoutError: + self.ch.sendintr() + + yield None + + def escape(self, *args: ArgTypes) -> str: + """Escape a string so it can be used safely on the Zephyr command-line.""" + string_args = [] + for arg in args: + if isinstance(arg, str): + string_args.append(arg) + elif isinstance(arg, special.Special): + string_args.append(arg._to_string(self)) + else: + raise TypeError(f"{type(arg)!r} is not a supported argument type!") + + return " ".join(string_args) + + def exec(self, *args: ArgTypes) -> str: + """ + Run a command in Zephyr. + + **Example**: + + .. code-block:: python + + output = ze.exec("help") + + :rtype: str + :returns: A str with the console output. The output will also contain a + trailing newline in most cases. + """ + cmd = self.escape(*args) + + with tbot.log_event.command(self.name, cmd) as ev: + self.ch.sendline(cmd, read_back=True) + with self.ch.with_stream(ev, show_prompt=False): + out = self.ch.read_until_prompt() + ev.data["stdout"] = out + + return out + + def interactive(self) -> None: + """ + Start an interactive session on this machine. + + This method will connect tbot's stdio to the machine's channel so you + can interactively run commands. This method is used by the + ``interactive_zephyr`` testcase. + """ + tbot.log.message( + f"Entering interactive shell ({tbot.log.c('CTRL+D to exit').bold}) ..." + ) + + # Unlike U-Boot, I don't think Zephyr needs the space before the newline, + # but it doesn't harm. + self.ch.sendline(" ") + self.ch.attach_interactive() + print("") + self.ch.sendline(" ") + + try: + self.ch.read_until_prompt(timeout=0.5) + except TimeoutError: + raise Exception("Failed to reacquire Zephyr after interactive session!") + + tbot.log.message("Exiting interactive shell ...") diff --git a/tbot/main.py b/tbot/main.py index a977dda1..cf448584 100644 --- a/tbot/main.py +++ b/tbot/main.py @@ -255,6 +255,8 @@ def main() -> None: # noqa: C901 board = loader.load_module(pathlib.Path(args.board).resolve()) if hasattr(board, "BOARD"): tbot.selectable.Board = board.BOARD # type: ignore + if hasattr(board, "ZEPHYR"): + tbot.selectable.ZephyrMachine = board.ZEPHYR # type: ignore if hasattr(board, "UBOOT"): tbot.selectable.UBootMachine = board.UBOOT # type: ignore if hasattr(board, "LINUX"): diff --git a/tbot/selectable.py b/tbot/selectable.py index 501e4a46..3e342787 100644 --- a/tbot/selectable.py +++ b/tbot/selectable.py @@ -114,6 +114,53 @@ def acquire_board(lh: LabHost) -> Board: return Board(lh) # type: ignore +class ZephyrMachine(board.ZephyrShell, typing.ContextManager): + """Dummy type that will be replaced by the actual selected Zephyr machine at runtime.""" + + _unselected = True + + def __init__(self, lab: LabHost, *args: typing.Any) -> None: + raise NotImplementedError("no zephyr selected") + + +def acquire_zephyr(board: Board, *args: typing.Any) -> ZephyrMachine: + """ + Acquire the selected board's Zephyr shell. + + As there can only be one instance of the selected board's :class:`ZephyrShell` at a time, + your testcases should optionally take the :class:`ZephyrShell` as a + parameter. The recipe looks like this: + + .. code-block:: python + + import contextlib + import typing + import tbot + from tbot.machine import board + + + @tbot.testcase + def my_testcase( + lab: typing.Optional[tbot.selectable.LabHost] = None, + zephyr: typing.Optional[board.ZephyrShell] = None, + ) -> None: + with contextlib.ExitStack() as cx: + lh = cx.enter_context(lab or tbot.acquire_lab()) + if zephyr is not None: + ze = zephyr + else: + b = cx.enter_context(tbot.acquire_board(lh)) + ze = cx.enter_context(tbot.acquire_zephyr(b)) + + ... + + :rtype: tbot.selectable.ZephyrMachine + """ + if hasattr(ZephyrMachine, "_unselected"): + raise NotImplementedError("Maybe you haven't set a board?") + return ZephyrMachine(board, *args) # type: ignore + + class UBootMachine(board.UBootShell, typing.ContextManager): """Dummy type that will be replaced by the actual selected U-Boot machine at runtime."""