Skip to content

Commit fd745c8

Browse files
committed
driver: add CAN driver
The driver can connect to both local and remote CAN ports. For local ports, it sets up the interface. The driver is also helpful for identifying the interface name using udev. For remote ports, it creates a local vcan interface and establishes a connection to the exported resource through cannelloni. According to the cannelloni documentation, the speed of the network interface is limited by tc to prevent packet losses. XXX: See the code (a few things to fix). Signed-off-by: Tomas Novotny <tomas@novotny.cz>
1 parent 5b9e240 commit fd745c8

File tree

2 files changed

+112
-0
lines changed

2 files changed

+112
-0
lines changed

labgrid/driver/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .bareboxdriver import BareboxDriver
2+
from .candriver import CANDriver
23
from .ubootdriver import UBootDriver
34
from .smallubootdriver import SmallUBootDriver
45
from .serialdriver import SerialDriver

labgrid/driver/candriver.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import contextlib
2+
import shutil
3+
import subprocess
4+
import warnings
5+
6+
import attr
7+
8+
from ..factory import target_factory
9+
from ..resource import NetworkCANPort, RawCANPort
10+
from ..resource.udev import USBCANPort
11+
from ..util.helper import processwrapper
12+
from ..util.proxy import proxymanager
13+
from .common import Driver
14+
15+
16+
@target_factory.reg_driver
17+
@attr.s(eq=False)
18+
class CANDriver(Driver):
19+
bindings = {
20+
"port": {"NetworkCANPort", "RawCANPort", "USBCANPort"},
21+
}
22+
23+
def __attrs_post_init__(self):
24+
super().__attrs_post_init__()
25+
self.ifname = None
26+
self.child = None
27+
self.can_helper = shutil.which("labgrid-can-interface")
28+
if self.can_helper is None:
29+
self.can_helper = "/usr/sbin/labgrid-can-interface"
30+
warnings.warn(f"labgrid-can-interface helper not found, falling back to {self.can_helper}")
31+
self.helper_wrapper = ["sudo", self.can_helper]
32+
self.cannelloni_bin = shutil.which("cannelloni")
33+
if self.cannelloni_bin is None:
34+
self.cannelloni_bin = "/usr/bin/cannelloni"
35+
warnings.warn(f"cannelloni binary not found, falling back to {self.cannelloni_bin}")
36+
37+
def _wrap_command(self, cmd):
38+
return self.helper_wrapper + cmd
39+
40+
def on_activate(self):
41+
if isinstance(self.port, NetworkCANPort):
42+
host, port = proxymanager.get_host_and_port(self.port)
43+
# XXX The port might not be unique, use something better
44+
self.ifname = f"lg_vcan{port}"
45+
46+
cmd_ip_add = self._wrap_command(["ip", self.ifname, "add"])
47+
processwrapper.check_output(cmd_ip_add)
48+
49+
cmd_ip_up = self._wrap_command(["ip", self.ifname, "up"])
50+
processwrapper.check_output(cmd_ip_up)
51+
52+
cmd_tc = self._wrap_command(["tc", self.ifname, "set-bitrate", str(self.port.speed)])
53+
processwrapper.check_output(cmd_tc)
54+
cmd_cannelloni = [
55+
self.cannelloni_bin,
56+
"-C", "c",
57+
"-I", f"{self.ifname}",
58+
"-R", f"{host}",
59+
"-r", f"{port}",
60+
]
61+
self.logger.info("Running command: %s", cmd_cannelloni)
62+
self.child = subprocess.Popen(cmd_cannelloni)
63+
# XXX How to check the process? Ideally read output and find the "connected" string?
64+
elif isinstance(self.port, (RawCANPort, USBCANPort)):
65+
host = None
66+
self.ifname = self.port.ifname
67+
68+
cmd_down = self._wrap_command(["ip", self.ifname, "down"])
69+
processwrapper.check_output(cmd_down)
70+
71+
cmd_type_bitrate = self._wrap_command(["ip", self.ifname, "set-bitrate", str(self.port.speed)])
72+
processwrapper.check_output(cmd_type_bitrate)
73+
74+
cmd_up = self._wrap_command(["ip", self.ifname, "up"])
75+
processwrapper.check_output(cmd_up)
76+
else:
77+
raise NotImplementedError(f"Unsupported CAN resource: {self.port}")
78+
79+
def on_deactivate(self):
80+
ifname = self.ifname
81+
self.ifname = None
82+
if isinstance(self.port, NetworkCANPort):
83+
assert self.child
84+
child = self.child
85+
self.child = None
86+
child.terminate()
87+
try:
88+
child.wait(2.0)
89+
except subprocess.TimeoutExpired:
90+
self.logger.warning("cannelloni on %s still running after SIGTERM", ifname)
91+
child.kill()
92+
child.wait(1.0)
93+
self.logger.info("stopped cannelloni for interface %s", ifname)
94+
95+
cmd_ip_del = self._wrap_command(["ip", ifname, "del"])
96+
processwrapper.check_output(cmd_ip_del)
97+
else:
98+
cmd_down = self._wrap_command(["ip", ifname, "down"])
99+
processwrapper.check_output(cmd_down)
100+
101+
@Driver.check_bound
102+
def get_export_vars(self):
103+
export_vars = {
104+
"ifname": self.ifname,
105+
"speed": str(self.port.speed),
106+
}
107+
if isinstance(self.port, NetworkCANPort):
108+
host, port = proxymanager.get_host_and_port(self.port)
109+
export_vars["host"] = host
110+
export_vars["port"] = str(port)
111+
return export_vars

0 commit comments

Comments
 (0)