From 138f6cce501b3543893130d78d2ea1c995f7662c Mon Sep 17 00:00:00 2001 From: chemicstry Date: Fri, 16 May 2025 16:50:15 +0300 Subject: [PATCH 1/3] Implement FileClient2 progress callbacks --- pycyphal/application/file.py | 34 +++++++++++++++++++++++++++++----- tests/application/file.py | 30 ++++++++++++++++++++++++++++-- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/pycyphal/application/file.py b/pycyphal/application/file.py index ac5b1e9f..2fbac4b6 100644 --- a/pycyphal/application/file.py +++ b/pycyphal/application/file.py @@ -658,7 +658,13 @@ async def move(self, src: str, dst: str, overwrite: bool = False) -> None: assert isinstance(res, Modify.Response) _raise_on_error(res.error, f"{src}->{dst}") - async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = None) -> bytes: + async def read( + self, + path: str, + offset: int = 0, + size: typing.Optional[int] = None, + progress: typing.Optional[typing.Callable[[int, int], None]] = None, + ) -> bytes: """ Proxy for ``uavcan.file.Read``. @@ -674,6 +680,10 @@ async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = No If None (default), the entire file will be read (this may exhaust local memory). If zero, this call is a no-op. + :param progress: + Optional callback function that receives (bytes_read, total_size) + total_size will be None if size parameter is None + :raises OSError: If the read operation failed; see ``uavcan.file.Error`` :returns: @@ -686,20 +696,26 @@ async def once() -> bytes: _raise_on_error(res.error, path) return bytes(res.data.value.tobytes()) - if size is None: - size = 2**64 data = b"" - while len(data) < size: + while len(data) < (size or 2**64): out = await once() assert isinstance(out, bytes) if not out: break data += out offset += len(out) + if progress: + progress(len(data), size) return data async def write( - self, path: str, data: typing.Union[memoryview, bytes], offset: int = 0, *, truncate: bool = True + self, + path: str, + data: typing.Union[memoryview, bytes], + offset: int = 0, + *, + truncate: bool = True, + progress: typing.Optional[typing.Callable[[int, int], None]] = None, ) -> None: """ Proxy for ``uavcan.file.Write``. @@ -719,6 +735,9 @@ async def write( If True, the rest of the file after ``offset + len(data)`` will be truncated. This is done by sending an empty write request, as prescribed by the Specification. + :param progress: + Optional callback function that receives (bytes_written, total_size) + :raises OSError: If the write operation failed; see ``uavcan.file.Error`` """ @@ -730,11 +749,16 @@ async def once(d: typing.Union[memoryview, bytes]) -> None: assert isinstance(res, Write.Response) _raise_on_error(res.error, path) + total_size = len(data) + bytes_written = 0 limit = self.data_transfer_capacity while len(data) > 0: frag, data = data[:limit], data[limit:] await once(frag) offset += len(frag) + bytes_written += len(frag) + if progress: + progress(bytes_written, total_size) if truncate: await once(b"") diff --git a/tests/application/file.py b/tests/application/file.py index 0cd5aad6..a63b0517 100644 --- a/tests/application/file.py +++ b/tests/application/file.py @@ -2,6 +2,7 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko +import math import sys import shutil import typing @@ -13,6 +14,11 @@ import pycyphal +class ProgressTracker: + def __init__(self): + self.counter = 0 + + @pytest.mark.asyncio async def _unittest_file(compiled: typing.List[pycyphal.dsdl.GeneratedPackageInfo]) -> None: from pycyphal.application import make_node, NodeInfo @@ -254,8 +260,28 @@ async def ls(path: str) -> typing.List[str]: assert e.value.errno == errno.ENOENT # Write into empty file - await cln.write("a/foo/x", bytes(range(200)) * 3) - assert await cln.read("a/foo/x") == bytes(range(200)) * 3 + data = bytes(range(200)) * 3 + data_chunks = math.ceil(len(data) / cln.data_transfer_capacity) + write_tracker = ProgressTracker() + + def write_progress_cb(bytes_written, bytes_total): + write_tracker.counter += 1 + assert bytes_total == len(data) + assert bytes_written == min(write_tracker.counter * cln.data_transfer_capacity, len(data)) + + await cln.write("a/foo/x", data, progress=write_progress_cb) + assert write_tracker.counter == data_chunks + + read_tracker = ProgressTracker() + + def read_progress_cb(bytes_read, bytes_total): + read_tracker.counter += 1 + assert bytes_total == None + assert bytes_read == min(read_tracker.counter * cln.data_transfer_capacity, len(data)) + + assert await cln.read("a/foo/x", progress=read_progress_cb) == data + assert read_tracker.counter == data_chunks + assert (await cln.get_info("a/foo/x")).size == 600 # Truncation -- this write is shorter From 0b6d6e4b18da671d80520d3aaec7fdd235ced032 Mon Sep 17 00:00:00 2001 From: chemicstry Date: Mon, 19 May 2025 15:48:34 +0300 Subject: [PATCH 2/3] Update typings --- pycyphal/application/file.py | 24 ++++++++++++------------ tests/application/file.py | 10 +++++----- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pycyphal/application/file.py b/pycyphal/application/file.py index 2fbac4b6..fa10835e 100644 --- a/pycyphal/application/file.py +++ b/pycyphal/application/file.py @@ -44,7 +44,7 @@ class FileServer: """ def __init__( - self, node: pycyphal.application.Node, roots: typing.Iterable[typing.Union[str, pathlib.Path]] + self, node: pycyphal.application.Node, roots: typing.Iterable[str | pathlib.Path] ) -> None: """ :param node: @@ -85,7 +85,7 @@ def close() -> None: node.add_lifetime_hooks(start, close) @property - def roots(self) -> typing.List[pathlib.Path]: + def roots(self) -> list[pathlib.Path]: """ File operations will be performed within these root directories. The first directory to match takes precedence. @@ -94,7 +94,7 @@ def roots(self) -> typing.List[pathlib.Path]: """ return self._roots - def locate(self, p: typing.Union[pathlib.Path, str, Path]) -> typing.Tuple[pathlib.Path, pathlib.Path]: + def locate(self, p: pathlib.Path | str | Path) -> tuple[pathlib.Path, pathlib.Path]: """ Iterate through :attr:`roots` until a root r is found such that ``r/p`` exists and return ``(r, p)``. Otherwise, return nonexistent ``(roots[0], p)``. @@ -413,7 +413,7 @@ async def move(self, src: str, dst: str, overwrite: bool = False) -> int: assert isinstance(res, Modify.Response) return int(res.error.value) - async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = None) -> typing.Union[int, bytes]: + async def read(self, path: str, offset: int = 0, size: int | None = None) -> int | bytes: """ Proxy for ``uavcan.file.Read``. @@ -434,7 +434,7 @@ async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = No data on success (empty if the offset is out of bounds or the file is empty). """ - async def once() -> typing.Union[int, bytes]: + async def once() -> int | bytes: res = await self._call(Read, Read.Request(offset=offset, path=Path(path))) assert isinstance(res, Read.Response) if res.error.value != 0: @@ -456,7 +456,7 @@ async def once() -> typing.Union[int, bytes]: return data async def write( - self, path: str, data: typing.Union[memoryview, bytes], offset: int = 0, *, truncate: bool = True + self, path: str, data: memoryview | bytes, offset: int = 0, *, truncate: bool = True ) -> int: """ Proxy for ``uavcan.file.Write``. @@ -479,7 +479,7 @@ async def write( :returns: See ``uavcan.file.Error`` """ - async def once(d: typing.Union[memoryview, bytes]) -> int: + async def once(d: memoryview | bytes) -> int: res = await self._call( Write, Write.Request(offset, path=Path(path), data=Unstructured(np.frombuffer(d, np.uint8))), @@ -662,8 +662,8 @@ async def read( self, path: str, offset: int = 0, - size: typing.Optional[int] = None, - progress: typing.Optional[typing.Callable[[int, int], None]] = None, + size: int | None = None, + progress: typing.Callable[[int, int | None], None] | None = None, ) -> bytes: """ Proxy for ``uavcan.file.Read``. @@ -711,11 +711,11 @@ async def once() -> bytes: async def write( self, path: str, - data: typing.Union[memoryview, bytes], + data: memoryview | bytes, offset: int = 0, *, truncate: bool = True, - progress: typing.Optional[typing.Callable[[int, int], None]] = None, + progress: typing.Callable[[int, int], None] | None = None, ) -> None: """ Proxy for ``uavcan.file.Write``. @@ -741,7 +741,7 @@ async def write( :raises OSError: If the write operation failed; see ``uavcan.file.Error`` """ - async def once(d: typing.Union[memoryview, bytes]) -> None: + async def once(d: memoryview | bytes) -> None: res = await self._call( Write, Write.Request(offset, path=Path(path), data=Unstructured(np.frombuffer(d, np.uint8))), diff --git a/tests/application/file.py b/tests/application/file.py index a63b0517..bb23588c 100644 --- a/tests/application/file.py +++ b/tests/application/file.py @@ -15,12 +15,12 @@ class ProgressTracker: - def __init__(self): + def __init__(self) -> None: self.counter = 0 @pytest.mark.asyncio -async def _unittest_file(compiled: typing.List[pycyphal.dsdl.GeneratedPackageInfo]) -> None: +async def _unittest_file(compiled: list[pycyphal.dsdl.GeneratedPackageInfo]) -> None: from pycyphal.application import make_node, NodeInfo from pycyphal.transport.udp import UDPTransport from pycyphal.application.file import FileClient, FileServer, Error @@ -264,7 +264,7 @@ async def ls(path: str) -> typing.List[str]: data_chunks = math.ceil(len(data) / cln.data_transfer_capacity) write_tracker = ProgressTracker() - def write_progress_cb(bytes_written, bytes_total): + def write_progress_cb(bytes_written: int, bytes_total: int) -> None: write_tracker.counter += 1 assert bytes_total == len(data) assert bytes_written == min(write_tracker.counter * cln.data_transfer_capacity, len(data)) @@ -274,9 +274,9 @@ def write_progress_cb(bytes_written, bytes_total): read_tracker = ProgressTracker() - def read_progress_cb(bytes_read, bytes_total): + def read_progress_cb(bytes_read: int, bytes_total: int | None) -> None: read_tracker.counter += 1 - assert bytes_total == None + assert bytes_total is None assert bytes_read == min(read_tracker.counter * cln.data_transfer_capacity, len(data)) assert await cln.read("a/foo/x", progress=read_progress_cb) == data From e94c1d3ec587a17f830b8016fd39173bf72c3d32 Mon Sep 17 00:00:00 2001 From: chemicstry Date: Mon, 19 May 2025 16:23:58 +0300 Subject: [PATCH 3/3] Fix formatting --- pycyphal/application/file.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pycyphal/application/file.py b/pycyphal/application/file.py index fa10835e..4ca5790a 100644 --- a/pycyphal/application/file.py +++ b/pycyphal/application/file.py @@ -43,9 +43,7 @@ class FileServer: The lifetime of this instance matches the lifetime of its node. """ - def __init__( - self, node: pycyphal.application.Node, roots: typing.Iterable[str | pathlib.Path] - ) -> None: + def __init__(self, node: pycyphal.application.Node, roots: typing.Iterable[str | pathlib.Path]) -> None: """ :param node: The node instance to initialize the file server on. @@ -455,9 +453,7 @@ async def once() -> int | bytes: offset += len(out) return data - async def write( - self, path: str, data: memoryview | bytes, offset: int = 0, *, truncate: bool = True - ) -> int: + async def write(self, path: str, data: memoryview | bytes, offset: int = 0, *, truncate: bool = True) -> int: """ Proxy for ``uavcan.file.Write``.