Skip to content

Minor Typing Improvements #1962

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions can/bit_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from can.typechecking import BitTimingDict, BitTimingFdDict


class BitTiming(Mapping):
class BitTiming(Mapping[str, int]):
"""Representation of a bit timing configuration for a CAN 2.0 bus.

The class can be constructed in multiple ways, depending on the information
Expand Down Expand Up @@ -477,7 +477,7 @@ def __hash__(self) -> int:
return tuple(self._data.values()).__hash__()


class BitTimingFd(Mapping):
class BitTimingFd(Mapping[str, int]):
"""Representation of a bit timing configuration for a CAN FD bus.

The class can be constructed in multiple ways, depending on the information
Expand Down
10 changes: 4 additions & 6 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import contextlib
import logging
import threading
from abc import ABC, ABCMeta, abstractmethod
from abc import ABC, abstractmethod
from collections.abc import Iterator, Sequence
from enum import Enum, auto
from time import time
Expand All @@ -19,7 +19,6 @@

from typing_extensions import Self

import can
import can.typechecking
from can.broadcastmanager import CyclicSendTaskABC, ThreadBasedCyclicSendTask
from can.message import Message
Expand All @@ -44,7 +43,7 @@ class CanProtocol(Enum):
CAN_XL = auto()


class BusABC(metaclass=ABCMeta):
class BusABC(ABC):
"""The CAN Bus Abstract Base Class that serves as the basis
for all concrete interfaces.

Expand All @@ -68,7 +67,7 @@ class BusABC(metaclass=ABCMeta):
@abstractmethod
def __init__(
self,
channel: Optional[can.typechecking.Channel],
channel: can.typechecking.Channel,
can_filters: Optional[can.typechecking.CanFilters] = None,
**kwargs: object,
):
Expand Down Expand Up @@ -447,7 +446,6 @@ def _matches_filters(self, msg: Message) -> bool:
for _filter in self._filters:
# check if this filter even applies to the message
if "extended" in _filter:
_filter = cast("can.typechecking.CanFilterExtended", _filter)
if _filter["extended"] != msg.is_extended_id:
continue

Expand Down Expand Up @@ -524,7 +522,7 @@ def protocol(self) -> CanProtocol:
return self._can_protocol

@staticmethod
def _detect_available_configs() -> list[can.typechecking.AutoDetectedConfig]:
def _detect_available_configs() -> Sequence[can.typechecking.AutoDetectedConfig]:
"""Detect all configurations/channels that this interface could
currently connect with.

Expand Down
6 changes: 3 additions & 3 deletions can/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import concurrent.futures.thread
import importlib
import logging
from collections.abc import Callable, Iterable
from collections.abc import Callable, Iterable, Sequence
from typing import Any, Optional, Union, cast

from . import util
Expand Down Expand Up @@ -142,7 +142,7 @@ def Bus( # noqa: N802
def detect_available_configs(
interfaces: Union[None, str, Iterable[str]] = None,
timeout: float = 5.0,
) -> list[AutoDetectedConfig]:
) -> Sequence[AutoDetectedConfig]:
"""Detect all configurations/channels that the interfaces could
currently connect with.

Expand Down Expand Up @@ -175,7 +175,7 @@ def detect_available_configs(
# otherwise assume iterable of strings

# Collect detection callbacks
callbacks: dict[str, Callable[[], list[AutoDetectedConfig]]] = {}
callbacks: dict[str, Callable[[], Sequence[AutoDetectedConfig]]] = {}
for interface_keyword in interfaces:
try:
bus_class = _get_class_for_interface(interface_keyword)
Expand Down
21 changes: 15 additions & 6 deletions can/interfaces/cantact.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import time
from collections.abc import Sequence
from typing import Any, Optional, Union
from unittest.mock import Mock

Expand All @@ -14,6 +15,7 @@
CanInterfaceNotImplementedError,
error_check,
)
from ..typechecking import AutoDetectedConfig
from ..util import check_or_adjust_timing_clock, deprecated_args_alias

logger = logging.getLogger(__name__)
Expand All @@ -31,7 +33,7 @@ class CantactBus(BusABC):
"""CANtact interface"""

@staticmethod
def _detect_available_configs():
def _detect_available_configs() -> Sequence[AutoDetectedConfig]:
try:
interface = cantact.Interface()
except (NameError, SystemError, AttributeError):
Expand All @@ -40,7 +42,7 @@ def _detect_available_configs():
)
return []

channels = []
channels: list[AutoDetectedConfig] = []
for i in range(0, interface.channel_count()):
channels.append({"interface": "cantact", "channel": f"ch:{i}"})
return channels
Expand Down Expand Up @@ -121,7 +123,14 @@ def __init__(
**kwargs,
)

def _recv_internal(self, timeout):
def _recv_internal(
self, timeout: Optional[float]
) -> tuple[Optional[Message], bool]:
if timeout is None:
raise TypeError(
f"{self.__class__.__name__} expects a numeric `timeout` value."
)

with error_check("Cannot receive message"):
frame = self.interface.recv(int(timeout * 1000))
if frame is None:
Expand All @@ -140,7 +149,7 @@ def _recv_internal(self, timeout):
)
return msg, False

def send(self, msg, timeout=None):
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
with error_check("Cannot send message"):
self.interface.send(
self.channel,
Expand All @@ -151,13 +160,13 @@ def send(self, msg, timeout=None):
msg.data,
)

def shutdown(self):
def shutdown(self) -> None:
super().shutdown()
with error_check("Cannot shutdown interface"):
self.interface.stop()


def mock_recv(timeout):
def mock_recv(timeout: int) -> Optional[dict[str, Any]]:
if timeout > 0:
return {
"id": 0x123,
Expand Down
3 changes: 1 addition & 2 deletions can/interfaces/ixxat/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
CyclicSendTaskABC,
Message,
)
from can.typechecking import AutoDetectedConfig


class IXXATBus(BusABC):
Expand Down Expand Up @@ -175,5 +174,5 @@ def state(self) -> BusState:
return self.bus.state

@staticmethod
def _detect_available_configs() -> list[AutoDetectedConfig]:
def _detect_available_configs() -> Sequence[vcinpl.AutoDetectedIxxatConfig]:
return vcinpl._detect_available_configs()
8 changes: 6 additions & 2 deletions can/interfaces/ixxat/canlib_vcinpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,8 +976,8 @@ def get_ixxat_hwids():
return hwids


def _detect_available_configs() -> list[AutoDetectedConfig]:
config_list = [] # list in wich to store the resulting bus kwargs
def _detect_available_configs() -> Sequence["AutoDetectedIxxatConfig"]:
config_list = [] # list in which to store the resulting bus kwargs

# used to detect HWID
device_handle = HANDLE()
Expand Down Expand Up @@ -1026,3 +1026,7 @@ def _detect_available_configs() -> list[AutoDetectedConfig]:
pass # _canlib is None in the CI tests -> return a blank list

return config_list


class AutoDetectedIxxatConfig(AutoDetectedConfig):
unique_hardware_id: int
33 changes: 15 additions & 18 deletions can/interfaces/serial/serial_can.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import io
import logging
import struct
from typing import Any, Optional
from collections.abc import Sequence
from typing import Any, Optional, cast

from can import (
BusABC,
Expand All @@ -26,21 +27,14 @@
logger = logging.getLogger("can.serial")

try:
import serial
import serial.tools.list_ports
except ImportError:
logger.warning(
"You won't be able to use the serial can backend without "
"the serial module installed!"
"the `pyserial` package installed!"
)
serial = None

try:
from serial.tools.list_ports import comports as list_comports
except ImportError:
# If unavailable on some platform, just return nothing
def list_comports() -> list[Any]:
return []


CAN_ERR_FLAG = 0x20000000
CAN_RTR_FLAG = 0x40000000
Expand All @@ -63,8 +57,7 @@ def __init__(
baudrate: int = 115200,
timeout: float = 0.1,
rtscts: bool = False,
*args,
**kwargs,
**kwargs: Any,
) -> None:
"""
:param channel:
Expand Down Expand Up @@ -107,7 +100,7 @@ def __init__(
"could not create the serial device"
) from error

super().__init__(channel, *args, **kwargs)
super().__init__(channel, **kwargs)

def shutdown(self) -> None:
"""
Expand Down Expand Up @@ -232,7 +225,7 @@ def _recv_internal(

def fileno(self) -> int:
try:
return self._ser.fileno()
return cast("int", self._ser.fileno())
except io.UnsupportedOperation:
raise NotImplementedError(
"fileno is not implemented using current CAN bus on this platform"
Expand All @@ -241,7 +234,11 @@ def fileno(self) -> int:
raise CanOperationError("Cannot fetch fileno") from exception

@staticmethod
def _detect_available_configs() -> list[AutoDetectedConfig]:
return [
{"interface": "serial", "channel": port.device} for port in list_comports()
]
def _detect_available_configs() -> Sequence[AutoDetectedConfig]:
configs: list[AutoDetectedConfig] = []
if serial is None:
return configs

for port in serial.tools.list_ports.comports():
configs.append({"interface": "serial", "channel": port.device})
return configs
3 changes: 1 addition & 2 deletions can/interfaces/socketcan/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import struct
import subprocess
import sys
from typing import Optional, cast
from typing import Optional

from can import typechecking
from can.interfaces.socketcan.constants import CAN_EFF_FLAG
Expand All @@ -28,7 +28,6 @@ def pack_filters(can_filters: Optional[typechecking.CanFilters] = None) -> bytes
can_id = can_filter["can_id"]
can_mask = can_filter["can_mask"]
if "extended" in can_filter:
can_filter = cast("typechecking.CanFilterExtended", can_filter)
# Match on either 11-bit OR 29-bit messages instead of both
can_mask |= CAN_EFF_FLAG
if can_filter["extended"]:
Expand Down
12 changes: 7 additions & 5 deletions can/interfaces/udp_multicast/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import struct
import time
import warnings
from typing import Optional, Union
from typing import Any, Optional, Union

import can
from can import BusABC, CanProtocol
from can import BusABC, CanProtocol, Message
from can.typechecking import AutoDetectedConfig

from .utils import is_msgpack_installed, pack_message, unpack_message
Expand Down Expand Up @@ -98,7 +98,7 @@ def __init__(
hop_limit: int = 1,
receive_own_messages: bool = False,
fd: bool = True,
**kwargs,
**kwargs: Any,
) -> None:
is_msgpack_installed()

Expand Down Expand Up @@ -126,7 +126,9 @@ def is_fd(self) -> bool:
)
return self._can_protocol is CanProtocol.CAN_FD

def _recv_internal(self, timeout: Optional[float]):
def _recv_internal(
self, timeout: Optional[float]
) -> tuple[Optional[Message], bool]:
result = self._multicast.recv(timeout)
if not result:
return None, False
Expand Down Expand Up @@ -204,7 +206,7 @@ def __init__(

# Look up multicast group address in name server and find out IP version of the first suitable target
# and then get the address family of it (socket.AF_INET or socket.AF_INET6)
connection_candidates = socket.getaddrinfo( # type: ignore
connection_candidates = socket.getaddrinfo(
group, self.port, type=socket.SOCK_DGRAM
)
sock = None
Expand Down
4 changes: 2 additions & 2 deletions can/interfaces/udp_multicast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Defines common functions.
"""

from typing import Any, Optional
from typing import Any, Optional, cast

from can import CanInterfaceNotImplementedError, Message
from can.typechecking import ReadableBytesLike
Expand Down Expand Up @@ -51,7 +51,7 @@ def pack_message(message: Message) -> bytes:
"bitrate_switch": message.bitrate_switch,
"error_state_indicator": message.error_state_indicator,
}
return msgpack.packb(as_dict, use_bin_type=True)
return cast("bytes", msgpack.packb(as_dict, use_bin_type=True))


def unpack_message(
Expand Down
Loading
Loading