Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## [Unreleased]
- Added basic Zephyr board support.


## [0.8.3] - 2020-09-22
Expand Down
4 changes: 4 additions & 0 deletions tbot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .selectable import (
acquire_lab,
acquire_board,
acquire_zephyr,
acquire_uboot,
acquire_linux,
acquire_local,
Expand All @@ -31,6 +32,7 @@
testcase as _testcase_decorator,
named_testcase,
with_lab,
with_zephyr,
with_uboot,
with_linux,
)
Expand All @@ -39,6 +41,7 @@
"selectable",
"acquire_lab",
"acquire_board",
"acquire_zephyr",
"acquire_uboot",
"acquire_linux",
"acquire_local",
Expand All @@ -47,6 +50,7 @@
"testcase",
"named_testcase",
"with_lab",
"with_zephyr",
"with_uboot",
"with_linux",
)
Expand Down
104 changes: 103 additions & 1 deletion tbot/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __new__(cls, ty: typing.Any, name: typing.Optional[str] = None) -> None:
pass


__all__ = ("testcase", "with_lab", "with_uboot", "with_linux")
__all__ = ("testcase", "with_lab", "with_zephyr", "with_uboot", "with_linux")

F_tc = typing.TypeVar("F_tc", bound=typing.Callable[..., typing.Any])

Expand Down Expand Up @@ -163,6 +163,108 @@ def wrapped(
return typing.cast(F_lab, wrapped)


F_ze = typing.TypeVar("F_ze", bound=typing.Callable[..., typing.Any])
F_zephyr = typing.Callable[
[
mypy.DefaultArg(typing.Union[selectable.LabHost, board.ZephyrShell, None]),
mypy.VarArg(typing.Any),
mypy.KwArg(typing.Any),
],
typing.Any,
]


def with_zephyr(tc: F_ze) -> F_zephyr:
"""
Decorate a function to automatically supply a Zephyr machine as an argument.

The idea is that when using this decorator and calling the testcase
without an already initialized Zephyr machine, tbot will automatically
acquire the selected one.

**Example**::

from tbot.machine import board

@tbot.testcase
@tbot.with_zephyr
def testcase_with_zephyr(ze: board.ZephyrShell) -> None:
ze.exec0("version")

This is essentially syntactic sugar for::

import contextlib
import typing
import tbot
from tbot.machine import board, linux

@tbot.testcase
def testcase_with_zephyr(
lab_or_ze: typing.Union[linux.Lab, board.ZephyrShell, None] = None,
) -> None:
with contextlib.ExitStack() as cx:
lh: linux.Lab
ze: board.ZephyrShell

if isinstance(lab_or_ze, linux.Lab):
lh = cx.enter_context(lab_or_ze)
elif isinstance(lab_or_ze, board.ZephyrShell):
lh = cx.enter_context(lab_or_ze.host)
else:
lh = cx.enter_context(tbot.acquire_lab())

if isinstance(lab_or_ze, board.ZephyrShell):
ze = cx.enter_context(lab_or_ze)
else:
b = cx.enter_context(tbot.acquire_board(lh))
ze = cx.enter_context(tbot.acquire_zephyr(b))

ze.exec("help")

.. warning::
While making your life a lot easier, this decorator unfortunately has
a drawback: It will erase the type signature of your testcase, so you
can no longer rely on type-checking when using the testcase downstream.
"""

@functools.wraps(tc)
def wrapped(
arg: typing.Union[selectable.LabHost, board.ZephyrShell, None] = None,
*args: typing.Any,
**kwargs: typing.Any,
) -> typing.Any:
with contextlib.ExitStack() as cx:
lh: selectable.LabHost
ze: board.ZephyrShell

# Acquire LabHost
if arg is None:
lh = cx.enter_context(selectable.acquire_lab())
elif isinstance(arg, linux.Lab):
lh = cx.enter_context(arg)
elif not isinstance(arg, board.ZephyrShell):
raise TypeError(
f"Argument to {tc!r} must either be a lab-host or a ZephyrShell (found {arg!r})"
)

# Acquire Zephyr
if isinstance(arg, board.ZephyrShell):
ze = cx.enter_context(arg)
else:
b = cx.enter_context(selectable.acquire_board(lh))
ze = cx.enter_context(selectable.acquire_zephyr(b))

return tc(ze, *args, **kwargs)

# Adjust annotation
argname = tc.__code__.co_varnames[0]
wrapped.__annotations__[argname] = typing.Union[
selectable.LabHost, board.ZephyrShell, None
]

return typing.cast(F_zephyr, wrapped)


F_ub = typing.TypeVar("F_ub", bound=typing.Callable[..., typing.Any])
F_uboot = typing.Callable[
[
Expand Down
3 changes: 3 additions & 0 deletions tbot/machine/board/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import typing
import tbot

from .zephyr import ZephyrShell
from .uboot import UBootShell, UBootAutobootIntercept
from .board import PowerControl, Board, Connector
from .linux import LinuxUbootConnector, LinuxBootLogin
Expand All @@ -19,6 +20,8 @@
"UBootAutobootIntercept",
"UBootMachine",
"UBootShell",
"ZephyrMachine",
"ZephyrShell",
)


Expand Down
208 changes: 208 additions & 0 deletions tbot/machine/board/zephyr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# tbot, Embedded Automation Tool
# Copyright (C) 2019 Harald Seiler
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import contextlib
import time
import typing

import tbot
from .. import shell, machine
from ..linux import special


class ZephyrStartupEvent(tbot.log.EventIO):
def __init__(self, ze: machine.Machine) -> None:
self.ze = ze
super().__init__(
["board", "zephyr", ze.name],
tbot.log.c("ZEPHYR").bold + f" ({ze.name})",
verbosity=tbot.log.Verbosity.QUIET,
)

self.verbosity = tbot.log.Verbosity.STDOUT
self.prefix = " <> "

def close(self) -> None:
setattr(self.ze, "bootlog", self.getvalue())
self.data["output"] = self.getvalue()
super().close()


class ZephyrStartup(machine.Machine):
_zephyr_init_event: typing.Optional[tbot.log.EventIO] = None
_timeout_start: typing.Optional[float] = None

boot_timeout: typing.Optional[float] = None
"""
Maximum time from power-on to Zephyr shell.

If tbot can't reach the Zephyr shell during this time, an exception will be thrown.
"""

def _zephyr_startup_event(self) -> tbot.log.EventIO:
if self._zephyr_init_event is None:
self._zephyr_init_event = ZephyrStartupEvent(self)

self._timeout_start = time.monotonic()

return self._zephyr_init_event


ArgTypes = typing.Union[str, special.Special]


class ZephyrShell(shell.Shell, ZephyrStartup):
"""
Zephyr shell.

The interface of this shell is really simple. It provides

- :py:meth:`ze.exec() <tbot.machine.board.ZephyrShell.exec>` - Run command
and return output.
- :py:meth:`ze.interactive() <tbot.machine.board.ZephyrShell.interactive>` -
Start an interactive session for this machine.
"""

prompt: typing.Union[str, bytes] = "shell> "
"""
Prompt which was configured for Zephyr.

Commonly ``"shell> "``.

.. warning::

**Don't forget the trailing space, if your prompt has one!**
"""

bootlog: str
"""Transcript of console output during boot."""

@contextlib.contextmanager
def _init_shell(self) -> typing.Iterator:
with self._zephyr_startup_event() as ev, self.ch.with_stream(ev):
self.ch.prompt = (
self.prompt.encode("utf-8")
if isinstance(self.prompt, str)
else self.prompt
)

# Set a blacklist of control characters. These characters were
# copied over from the U-Boot board.
self.ch._write_blacklist = [
0x00, # NUL | Null
0x01, # SOH | Start of Heading
0x02, # STX | Start of Text
0x03, # ETX | End of Text / Interrupt
0x04, # EOT | End of Transmission
0x05, # ENQ | Enquiry
0x06, # ACK | Acknowledge
0x07, # BEL | Bell, Alert
0x08, # BS | Backspace
0x09, # HT | Character Tabulation, Horizontal Tabulation
0x0B, # VT | Line Tabulation, Vertical Tabulation
0x0C, # FF | Form Feed
0x0E, # SO | Shift Out
0x0F, # SI | Shift In
0x10, # DLE | Data Link Escape
0x11, # DC1 | Device Control One (XON)
0x12, # DC2 | Device Control Two
0x13, # DC3 | Device Control Three (XOFF)
0x14, # DC4 | Device Control Four
0x15, # NAK | Negative Acknowledge
0x16, # SYN | Synchronous Idle
0x17, # ETB | End of Transmission Block
0x18, # CAN | Cancel
0x1A, # SUB | Substitute / Suspend Process
0x1B, # ESC | Escape
0x1C, # FS | File Separator
0x7F, # DEL | Delete
]

while True:
if self.boot_timeout is not None:
assert self._timeout_start is not None
if (time.monotonic() - self._timeout_start) > self.boot_timeout:
raise TimeoutError("Zephyr did not reach shell in time")
try:
self.ch.read_until_prompt(timeout=0.2)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an issue with that code: my testcases succeed 95% of the time, but miss the prompt every so often which results in a TimeoutError exception. Adding print(buf) in read_until_prompt() just before [1] shows that buf is truncated when this happens. Here's an sample output of such a timeout, looking for b'# MY_PROMPT':

[...]
bytearray(b'[... hundreds\r\n of\r\n bytes]\r\n # MY_P')                                                                                 
Ebytearray(b'R')                                                                                                                                                                                                                        
SUbytearray(b'ROM')                                                                                                                                                                                                                     
Lbytearray(b'ROMP')                                                                                                                                                                                                                     
bytearray(b'ROMPT\r')                                                                                                                                                                                                                   
│   │   │    <>                                                                                                     
bytearray(b'ROMPT\r\n')                                                                                                                                                                                                                 
│   │   ├─Result: TIMEOUT                                                                                           
│   │   └─Fail. (46.198s)                                                                                                                                                                                                               

The problem disappears when I set this timeout to something big (10 seconds) (in my case, boot takes around 2 seconds to get to the prompt). As this timeout value is given to read_iter() in channel.py, I believe that based on luck, a timeout can occur right during the display of the prompt. This splits the buffer in the middle of the prompt and we therefore miss it.

This problem seems to be present for U-Boot boards too (I took that code from there), but perhaps boot time is far less than 200ms in most cases, so it doesn't happen in practice. I'm not sure about how Linux boards handle that part.

I don't know what would be the best way to handle this. Perhaps read_iter() could store the buffer and restore it after a Timeout exception? I'm new to that Iterator/yield() stuff, so bear with me here.

[1] https://github.com/Rahix/tbot/blob/master/tbot/machine/channel/channel.py#L881

Copy link
Author

@zkrx zkrx Oct 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that ^C is sent on timeout, which displays the prompt again. So U-Boot has probably no issue with that, I guess you can dismiss that comment :).

EDIT: ^C doesn't have the same effect on a Zephyr shell though. Replacing ^C by \n should work for Zephyr.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the idea is that e.g. if the board is already turned on, the prompt won't re-appear on its own when attaching to the serial console. So I send ^C which makes U-Boot show the prompt again and tbot can then continue.

I guess the same would be useful for Zephyr but as you noticed ^C does not work, we'll have to find another mechanism ... Using \n is a bit dangerous: For example, there might be a half-written command currently waiting on the console before tbot attaches. Sending \n would then execute it which should be avoided. Is there any way we could prevent this? E.g. is there any key sequence we can use to clear everything currently written after the prompt (short of sending a bunch of backspaces :p)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced self.ch.sendintr() below with self.ch.sendline() and that seems to do the trick.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't refresh my page and missed your reply.

Using \n is a bit dangerous

Oh yeah, I didn't think about that. I was mainly concerned by the "prompt split in two" issue.

E.g. is there any key sequence we can use to clear everything currently written after the prompt (short of sending a bunch of backspaces :p)?

So it looks like latest Zephyr versions handle ctrl keys [1] but I've never tried. Looks like it should work though!

[1] https://github.com/zephyrproject-rtos/zephyr/blob/master/subsys/shell/shell.c#L822

break
except TimeoutError:
self.ch.sendintr()

yield None

def escape(self, *args: ArgTypes) -> str:
"""Escape a string so it can be used safely on the Zephyr command-line."""
string_args = []
for arg in args:
if isinstance(arg, str):
string_args.append(arg)
elif isinstance(arg, special.Special):
string_args.append(arg._to_string(self))
else:
raise TypeError(f"{type(arg)!r} is not a supported argument type!")

return " ".join(string_args)

def exec(self, *args: ArgTypes) -> str:
"""
Run a command in Zephyr.

**Example**:

.. code-block:: python

output = ze.exec("help")

:rtype: str
:returns: A str with the console output. The output will also contain a
trailing newline in most cases.
"""
cmd = self.escape(*args)

with tbot.log_event.command(self.name, cmd) as ev:
self.ch.sendline(cmd, read_back=True)
with self.ch.with_stream(ev, show_prompt=False):
out = self.ch.read_until_prompt()
ev.data["stdout"] = out

return out

def interactive(self) -> None:
"""
Start an interactive session on this machine.

This method will connect tbot's stdio to the machine's channel so you
can interactively run commands. This method is used by the
``interactive_zephyr`` testcase.
"""
tbot.log.message(
f"Entering interactive shell ({tbot.log.c('CTRL+D to exit').bold}) ..."
)

# Unlike U-Boot, I don't think Zephyr needs the space before the newline,
# but it doesn't harm.
self.ch.sendline(" ")
self.ch.attach_interactive()
print("")
self.ch.sendline(" ")

try:
self.ch.read_until_prompt(timeout=0.5)
except TimeoutError:
raise Exception("Failed to reacquire Zephyr after interactive session!")

tbot.log.message("Exiting interactive shell ...")
2 changes: 2 additions & 0 deletions tbot/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ def main() -> None: # noqa: C901
board = loader.load_module(pathlib.Path(args.board).resolve())
if hasattr(board, "BOARD"):
tbot.selectable.Board = board.BOARD # type: ignore
if hasattr(board, "ZEPHYR"):
tbot.selectable.ZephyrMachine = board.ZEPHYR # type: ignore
if hasattr(board, "UBOOT"):
tbot.selectable.UBootMachine = board.UBOOT # type: ignore
if hasattr(board, "LINUX"):
Expand Down
Loading