Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions pycyphal/application/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ class FileServer:
The lifetime of this instance matches the lifetime of its node.
"""

def __init__(
self, node: pycyphal.application.Node, roots: typing.Iterable[typing.Union[str, pathlib.Path]]
) -> None:
def __init__(self, node: pycyphal.application.Node, roots: typing.Iterable[str | pathlib.Path]) -> None:
"""
:param node:
The node instance to initialize the file server on.
Expand Down Expand Up @@ -85,7 +83,7 @@ def close() -> None:
node.add_lifetime_hooks(start, close)

@property
def roots(self) -> typing.List[pathlib.Path]:
def roots(self) -> list[pathlib.Path]:
"""
File operations will be performed within these root directories.
The first directory to match takes precedence.
Expand All @@ -94,7 +92,7 @@ def roots(self) -> typing.List[pathlib.Path]:
"""
return self._roots

def locate(self, p: typing.Union[pathlib.Path, str, Path]) -> typing.Tuple[pathlib.Path, pathlib.Path]:
def locate(self, p: pathlib.Path | str | Path) -> tuple[pathlib.Path, pathlib.Path]:
"""
Iterate through :attr:`roots` until a root r is found such that ``r/p`` exists and return ``(r, p)``.
Otherwise, return nonexistent ``(roots[0], p)``.
Expand Down Expand Up @@ -413,7 +411,7 @@ async def move(self, src: str, dst: str, overwrite: bool = False) -> int:
assert isinstance(res, Modify.Response)
return int(res.error.value)

async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = None) -> typing.Union[int, bytes]:
async def read(self, path: str, offset: int = 0, size: int | None = None) -> int | bytes:
"""
Proxy for ``uavcan.file.Read``.

Expand All @@ -434,7 +432,7 @@ async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = No
data on success (empty if the offset is out of bounds or the file is empty).
"""

async def once() -> typing.Union[int, bytes]:
async def once() -> int | bytes:
res = await self._call(Read, Read.Request(offset=offset, path=Path(path)))
assert isinstance(res, Read.Response)
if res.error.value != 0:
Expand All @@ -455,9 +453,7 @@ async def once() -> typing.Union[int, bytes]:
offset += len(out)
return data

async def write(
self, path: str, data: typing.Union[memoryview, bytes], offset: int = 0, *, truncate: bool = True
) -> int:
async def write(self, path: str, data: memoryview | bytes, offset: int = 0, *, truncate: bool = True) -> int:
"""
Proxy for ``uavcan.file.Write``.

Expand All @@ -479,7 +475,7 @@ async def write(
:returns: See ``uavcan.file.Error``
"""

async def once(d: typing.Union[memoryview, bytes]) -> int:
async def once(d: memoryview | bytes) -> int:
res = await self._call(
Write,
Write.Request(offset, path=Path(path), data=Unstructured(np.frombuffer(d, np.uint8))),
Expand Down Expand Up @@ -658,7 +654,13 @@ async def move(self, src: str, dst: str, overwrite: bool = False) -> None:
assert isinstance(res, Modify.Response)
_raise_on_error(res.error, f"{src}->{dst}")

async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = None) -> bytes:
async def read(
self,
path: str,
offset: int = 0,
size: int | None = None,
progress: typing.Callable[[int, int | None], None] | None = None,
) -> bytes:
"""
Proxy for ``uavcan.file.Read``.

Expand All @@ -674,6 +676,10 @@ async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = No
If None (default), the entire file will be read (this may exhaust local memory).
If zero, this call is a no-op.

:param progress:
Optional callback function that receives (bytes_read, total_size)
total_size will be None if size parameter is None

:raises OSError: If the read operation failed; see ``uavcan.file.Error``

:returns:
Expand All @@ -686,20 +692,26 @@ async def once() -> bytes:
_raise_on_error(res.error, path)
return bytes(res.data.value.tobytes())

if size is None:
size = 2**64
data = b""
while len(data) < size:
while len(data) < (size or 2**64):
out = await once()
assert isinstance(out, bytes)
if not out:
break
data += out
offset += len(out)
if progress:
progress(len(data), size)
return data

async def write(
self, path: str, data: typing.Union[memoryview, bytes], offset: int = 0, *, truncate: bool = True
self,
path: str,
data: memoryview | bytes,
offset: int = 0,
*,
truncate: bool = True,
progress: typing.Callable[[int, int], None] | None = None,
) -> None:
"""
Proxy for ``uavcan.file.Write``.
Expand All @@ -719,22 +731,30 @@ async def write(
If True, the rest of the file after ``offset + len(data)`` will be truncated.
This is done by sending an empty write request, as prescribed by the Specification.

:param progress:
Optional callback function that receives (bytes_written, total_size)

:raises OSError: If the write operation failed; see ``uavcan.file.Error``
"""

async def once(d: typing.Union[memoryview, bytes]) -> None:
async def once(d: memoryview | bytes) -> None:
res = await self._call(
Write,
Write.Request(offset, path=Path(path), data=Unstructured(np.frombuffer(d, np.uint8))),
)
assert isinstance(res, Write.Response)
_raise_on_error(res.error, path)

total_size = len(data)
bytes_written = 0
limit = self.data_transfer_capacity
while len(data) > 0:
frag, data = data[:limit], data[limit:]
await once(frag)
offset += len(frag)
bytes_written += len(frag)
if progress:
progress(bytes_written, total_size)
if truncate:
await once(b"")

Expand Down
32 changes: 29 additions & 3 deletions tests/application/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko <[email protected]>

import math
import sys
import shutil
import typing
Expand All @@ -13,8 +14,13 @@
import pycyphal


class ProgressTracker:
def __init__(self) -> None:
self.counter = 0


@pytest.mark.asyncio
async def _unittest_file(compiled: typing.List[pycyphal.dsdl.GeneratedPackageInfo]) -> None:
async def _unittest_file(compiled: list[pycyphal.dsdl.GeneratedPackageInfo]) -> None:
from pycyphal.application import make_node, NodeInfo
from pycyphal.transport.udp import UDPTransport
from pycyphal.application.file import FileClient, FileServer, Error
Expand Down Expand Up @@ -254,8 +260,28 @@ async def ls(path: str) -> typing.List[str]:
assert e.value.errno == errno.ENOENT

# Write into empty file
await cln.write("a/foo/x", bytes(range(200)) * 3)
assert await cln.read("a/foo/x") == bytes(range(200)) * 3
data = bytes(range(200)) * 3
data_chunks = math.ceil(len(data) / cln.data_transfer_capacity)
write_tracker = ProgressTracker()

def write_progress_cb(bytes_written: int, bytes_total: int) -> None:
write_tracker.counter += 1
assert bytes_total == len(data)
assert bytes_written == min(write_tracker.counter * cln.data_transfer_capacity, len(data))

await cln.write("a/foo/x", data, progress=write_progress_cb)
assert write_tracker.counter == data_chunks

read_tracker = ProgressTracker()

def read_progress_cb(bytes_read: int, bytes_total: int | None) -> None:
read_tracker.counter += 1
assert bytes_total is None
assert bytes_read == min(read_tracker.counter * cln.data_transfer_capacity, len(data))

assert await cln.read("a/foo/x", progress=read_progress_cb) == data
assert read_tracker.counter == data_chunks

assert (await cln.get_info("a/foo/x")).size == 600

# Truncation -- this write is shorter
Expand Down