Skip to content

Add CAN bridge script #1961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Jul 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6444092
Add basic implementation for can bridge
pkess Feb 22, 2025
8308331
Implement basic configuration parsing
pkess Feb 22, 2025
c2b7413
Add implementation for bridge
pkess Feb 22, 2025
12f037d
Improve exit handling
pkess Feb 22, 2025
21ac3a0
Add debug logging
pkess Feb 22, 2025
59e64f6
Add error handling for wrong arguments
pkess Feb 22, 2025
07b6e37
Use stderr
pkess Feb 22, 2025
3dce956
Add custom usage
pkess Feb 22, 2025
6016ae7
Add bus configuration info
pkess Feb 22, 2025
b0e7582
Code format
pkess Feb 23, 2025
78787d9
Add exception for prints for can_bridge
pkess Feb 23, 2025
d9391f7
Add from to exception in exception
pkess Feb 23, 2025
aca4798
Remove assignment to unused variable
pkess Feb 23, 2025
4643492
Shorten line length
pkess Feb 23, 2025
e117b38
Organize imports
pkess Feb 23, 2025
e40ee33
Remove unnecessary else
pkess Feb 23, 2025
166b882
Add documentation for new script
pkess Feb 23, 2025
0ef6ca2
Add handling for -h and help sub command
pkess Feb 23, 2025
f1401ab
Add from none to exception
pkess Feb 23, 2025
7057f69
Fix typo busses to bus
pkess Feb 23, 2025
91ffbdb
Add type annotations
pkess Feb 23, 2025
e0b5368
Fix type annotations
pkess Feb 23, 2025
deb06fd
Fix type annotations again
pkess Feb 23, 2025
c7222b1
Add --help to get help
pkess Feb 23, 2025
69c039a
Add basic print help test
pkess Feb 23, 2025
037eb23
Add basic test file for bridge script
pkess Feb 23, 2025
991aa05
Add very basic test
pkess Feb 23, 2025
d2227d7
Add different channels for virtual bus
pkess Feb 23, 2025
3974c51
Add assert for call to exit
pkess Feb 23, 2025
8f6ae52
Patch correct function
pkess Feb 23, 2025
e86814f
test
pkess Feb 24, 2025
6dd6519
fjkdf
pkess Feb 24, 2025
c3a6906
once again -.-
pkess Feb 24, 2025
9f0950e
Try snakecase
pkess Feb 24, 2025
4e063c3
use new api to create cli args for bus1 and bus2
zariiii9003 Jul 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions can/bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
Creates a bridge between two CAN buses.

This will connect to two CAN buses. Messages received on one
bus will be sent to the other bus and vice versa.
"""

import argparse
import errno
import sys
import time
from datetime import datetime
from typing import Final

from can.cli import add_bus_arguments, create_bus_from_namespace
from can.listener import RedirectReader
from can.notifier import Notifier

BRIDGE_DESCRIPTION: Final = """\
Bridge two CAN buses.

Both can buses will be connected so that messages from bus1 will be sent on
bus2 and messages from bus2 will be sent to bus1.
"""
BUS_1_PREFIX: Final = "bus1"
BUS_2_PREFIX: Final = "bus2"


def _parse_bridge_args(args: list[str]) -> argparse.Namespace:
"""Parse command line arguments for bridge script."""

parser = argparse.ArgumentParser(description=BRIDGE_DESCRIPTION)
add_bus_arguments(parser, prefix=BUS_1_PREFIX, group_title="Bus 1 arguments")
add_bus_arguments(parser, prefix=BUS_2_PREFIX, group_title="Bus 2 arguments")

# print help message when no arguments were given
if not args:
parser.print_help(sys.stderr)
raise SystemExit(errno.EINVAL)

results, _unknown_args = parser.parse_known_args(args)
return results


def main() -> None:
results = _parse_bridge_args(sys.argv[1:])

with (
create_bus_from_namespace(results, prefix=BUS_1_PREFIX) as bus1,
create_bus_from_namespace(results, prefix=BUS_2_PREFIX) as bus2,
):
reader1_to_2 = RedirectReader(bus2)
reader2_to_1 = RedirectReader(bus1)
with Notifier(bus1, [reader1_to_2]), Notifier(bus2, [reader2_to_1]):
print(f"CAN Bridge (Started on {datetime.now()})")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass

print(f"CAN Bridge (Stopped on {datetime.now()})")


if __name__ == "__main__":
main()
9 changes: 9 additions & 0 deletions doc/scripts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ The full usage page can be seen below:
:shell:


can.bridge
----------

A small application that can be used to connect two can buses:

.. command-output:: python -m can.bridge -h
:shell:


can.logconvert
--------------

Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ can_logconvert = "can.logconvert:main"
can_logger = "can.logger:main"
can_player = "can.player:main"
can_viewer = "can.viewer:main"
can_bridge = "can.bridge:main"

[project.urls]
homepage = "https://github.com/hardbyte/python-can"
Expand Down Expand Up @@ -186,6 +187,7 @@ ignore = [
"can/cli.py" = ["T20"] # flake8-print
"can/logger.py" = ["T20"] # flake8-print
"can/player.py" = ["T20"] # flake8-print
"can/bridge.py" = ["T20"] # flake8-print
"can/viewer.py" = ["T20"] # flake8-print
"examples/*" = ["T20"] # flake8-print

Expand Down
126 changes: 126 additions & 0 deletions test/test_bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#!/usr/bin/env python

"""
This module tests the functions inside of bridge.py
"""

import random
import string
import sys
import threading
import time
from time import sleep as real_sleep
import unittest.mock

import can
import can.bridge
from can.interfaces import virtual

from .message_helper import ComparingMessagesTestCase


class TestBridgeScriptModule(unittest.TestCase, ComparingMessagesTestCase):

TIMEOUT = 3.0

def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)
ComparingMessagesTestCase.__init__(
self,
allowed_timestamp_delta=None,
preserves_channel=False,
)

def setUp(self) -> None:
self.stop_event = threading.Event()

self.channel1 = "".join(random.choices(string.ascii_letters, k=8))
self.channel2 = "".join(random.choices(string.ascii_letters, k=8))

self.cli_args = [
"--bus1-interface",
"virtual",
"--bus1-channel",
self.channel1,
"--bus2-interface",
"virtual",
"--bus2-channel",
self.channel2,
]

self.testmsg = can.Message(
arbitration_id=0xC0FFEE, data=[0, 25, 0, 1, 3, 1, 4, 1], is_extended_id=True
)

def fake_sleep(self, duration):
"""A fake replacement for time.sleep that checks periodically
whether self.stop_event is set, and raises KeyboardInterrupt
if so.

This allows tests to simulate an interrupt (like Ctrl+C)
during long sleeps, in a controlled and responsive way.
"""
interval = 0.05 # Small interval for responsiveness
t_wakeup = time.perf_counter() + duration
while time.perf_counter() < t_wakeup:
if self.stop_event.is_set():
raise KeyboardInterrupt("Simulated interrupt from fake_sleep")
real_sleep(interval)

def test_bridge(self):
with (
unittest.mock.patch("can.bridge.time.sleep", new=self.fake_sleep),
unittest.mock.patch("can.bridge.sys.argv", [sys.argv[0], *self.cli_args]),
):
# start script
thread = threading.Thread(target=can.bridge.main)
thread.start()

# wait until script instantiates virtual buses
t0 = time.perf_counter()
while True:
with virtual.channels_lock:
if (
self.channel1 in virtual.channels
and self.channel2 in virtual.channels
):
break
if time.perf_counter() > t0 + 2.0:
raise TimeoutError("Bridge script did not create virtual buses")
real_sleep(0.2)

# create buses with the same channels as in scripts
with (
can.interfaces.virtual.VirtualBus(self.channel1) as bus1,
can.interfaces.virtual.VirtualBus(self.channel2) as bus2,
):
# send test message to bus1, it should be received on bus2
bus1.send(self.testmsg)
recv_msg = bus2.recv(self.TIMEOUT)
self.assertMessageEqual(self.testmsg, recv_msg)

# assert that both buses are empty
self.assertIsNone(bus1.recv(0))
self.assertIsNone(bus2.recv(0))

# send test message to bus2, it should be received on bus1
bus2.send(self.testmsg)
recv_msg = bus1.recv(self.TIMEOUT)
self.assertMessageEqual(self.testmsg, recv_msg)

# assert that both buses are empty
self.assertIsNone(bus1.recv(0))
self.assertIsNone(bus2.recv(0))

# stop the bridge script
self.stop_event.set()
thread.join()

# assert that the virtual buses were closed
with virtual.channels_lock:
self.assertNotIn(self.channel1, virtual.channels)
self.assertNotIn(self.channel2, virtual.channels)


if __name__ == "__main__":
unittest.main()
14 changes: 14 additions & 0 deletions test/test_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ def _import(self):
return module


class TestBridgeScript(CanScriptTest):
def _commands(self):
commands = [
"python -m can.bridge --help",
"can_bridge --help",
]
return commands

def _import(self):
import can.bridge as module

return module


class TestLogconvertScript(CanScriptTest):
def _commands(self):
commands = [
Expand Down
Loading