• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hardbyte / python-can / 13482016967

23 Feb 2025 10:10AM UTC coverage: 70.188% (-0.5%) from 70.724%
13482016967

Pull #1923

github

web-flow
Merge 0fcea6483 into 2bd475803
Pull Request #1923: Add can bridge

0 of 82 new or added lines in 1 file covered. (0.0%)

7588 of 10811 relevant lines covered (70.19%)

15.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/can/bridge.py
1
"""
2
Creates a bridge between two CAN busses.
3

4
This will connect to two CAN busses. Messages received on one
5
bus will be sent to the other bus and vice versa.
6
"""
7

NEW
8
import argparse
×
NEW
9
import errno
×
NEW
10
import logging
×
NEW
11
import sys
×
NEW
12
import time
×
NEW
13
from datetime import datetime
×
14

NEW
15
import can
×
16

NEW
17
from .logger import _create_base_argument_parser, _create_bus, _parse_additional_config
×
18

NEW
19
USAGE = """
×
20
usage: can_bridge [{general config} --] {can A config} -- {can B config}
21

22
Bridge two CAN busses.
23

24
Both can busses will be connected so that messages from bus A will be sent on
25
bus B and messages on bus B will be sent to bus A. The busses are separated by a `--`
26

27
positional arguments:
28
  {general config}      The configuration for this program excluding
29
                        the config for each bus. Can be omitted
30
  {can A config}        The configuration for the first bus
31
  {can B config}        The configuration for the second bus
32

33
Example usage:
34
    can_bridge -i socketcan -c can0 -- -i socketcan can1
35
    can_bridge -vvv -- -i socketcan -c can0 -- -i socketcan can1
36

37
Type `can_bridge help_bus` for information about single bus configuration.
38
"""
39

NEW
40
LOG = logging.getLogger(__name__)
×
41

42

NEW
43
class UserError(Exception):
×
NEW
44
    pass
×
45

46

NEW
47
def get_config_list(it, separator, conf):
×
48
    while True:
NEW
49
        el = next(it)
×
NEW
50
        if el == separator:
×
NEW
51
            break
×
52
        else:
NEW
53
            conf.append(el)
×
54

55

NEW
56
def split_configurations(arg_list, separator="--"):
×
NEW
57
    general = []
×
NEW
58
    conf_a = []
×
NEW
59
    conf_b = []
×
60

NEW
61
    found_sep = False
×
NEW
62
    it = iter(arg_list)
×
NEW
63
    try:
×
NEW
64
        get_config_list(it, separator, conf_a)
×
NEW
65
        found_sep = True
×
NEW
66
        get_config_list(it, separator, conf_b)
×
67

68
        # When we reached this point we found two separators so we have
69
        # a general config. We will treate the first config as general
NEW
70
        general = conf_a
×
NEW
71
        conf_a = conf_b
×
NEW
72
        get_config_list(it, separator, conf_b)
×
73

74
        # When we reached this point we found three separators so this is
75
        # an error.
NEW
76
        raise UserError("To many configurations")
×
NEW
77
    except StopIteration:
×
NEW
78
        LOG.debug("All configurations were split")
×
NEW
79
        if not found_sep:
×
NEW
80
            raise UserError("Missing separator") from None
×
81

NEW
82
    return general, conf_a, conf_b
×
83

84

NEW
85
def main() -> None:
×
NEW
86
    general_parser = argparse.ArgumentParser()
×
NEW
87
    general_parser.add_argument(
×
88
        "-v",
89
        action="count",
90
        dest="verbosity",
91
        help="""How much information do you want to see at the command line?
92
                        You can add several of these e.g., -vv is DEBUG""",
93
        default=2,
94
    )
95

NEW
96
    bus_parser = argparse.ArgumentParser(description="Bridge two CAN busses.")
×
97

NEW
98
    _create_base_argument_parser(bus_parser)
×
99

NEW
100
    parser = argparse.ArgumentParser(description="Bridge two CAN busses.")
×
NEW
101
    parser.add_argument("configs", nargs=argparse.REMAINDER)
×
102

103
    # print help message when no arguments were given
NEW
104
    if len(sys.argv) < 2:
×
NEW
105
        print(USAGE, file=sys.stderr)
×
NEW
106
        raise SystemExit(errno.EINVAL)
×
107

NEW
108
    args = sys.argv[1:]
×
NEW
109
    try:
×
NEW
110
        general, conf_a, conf_b = split_configurations(args)
×
NEW
111
    except UserError as exc:
×
NEW
112
        if len(args) == 1 and args[0] == "help_bus":
×
NEW
113
            bus_parser.print_help(sys.stderr)
×
114
        else:
NEW
115
            print(f"Error while processing arguments: {exc}", file=sys.stderr)
×
NEW
116
        raise SystemExit(errno.EINVAL) from exc
×
117

NEW
118
    LOG.debug("General configuration: %s", general)
×
NEW
119
    LOG.debug("Bus A configuration: %s", conf_a)
×
NEW
120
    LOG.debug("Bus B configuration: %s", conf_b)
×
NEW
121
    g_results = general_parser.parse_args(general)
×
NEW
122
    verbosity = g_results.verbosity
×
123

NEW
124
    a_results, a_unknown_args = bus_parser.parse_known_args(conf_a)
×
NEW
125
    a_additional_config = _parse_additional_config(
×
126
        [*a_results.extra_args, *a_unknown_args]
127
    )
NEW
128
    a_results.__dict__["verbosity"] = verbosity
×
129

NEW
130
    b_results, b_unknown_args = bus_parser.parse_known_args(conf_b)
×
NEW
131
    b_additional_config = _parse_additional_config(
×
132
        [*b_results.extra_args, *b_unknown_args]
133
    )
NEW
134
    b_results.__dict__["verbosity"] = verbosity
×
135

NEW
136
    LOG.debug("General configuration results: %s", g_results)
×
NEW
137
    LOG.debug("Bus A configuration results: %s", a_results)
×
NEW
138
    LOG.debug("Bus A additional configuration results: %s", a_additional_config)
×
NEW
139
    LOG.debug("Bus B configuration results: %s", b_results)
×
NEW
140
    LOG.debug("Bus B additional configuration results: %s", b_additional_config)
×
NEW
141
    with _create_bus(a_results, **a_additional_config) as bus_a:
×
NEW
142
        with _create_bus(b_results, **b_additional_config) as bus_b:
×
NEW
143
            reader_a = can.RedirectReader(bus_b)
×
NEW
144
            reader_b = can.RedirectReader(bus_a)
×
NEW
145
            can.Notifier(bus_a, [reader_a])
×
NEW
146
            can.Notifier(bus_b, [reader_b])
×
NEW
147
            print(f"CAN Bridge (Started on {datetime.now()})")
×
NEW
148
            try:
×
149
                while True:
NEW
150
                    time.sleep(1)
×
NEW
151
            except KeyboardInterrupt:
×
NEW
152
                pass
×
153

NEW
154
    print(f"CAN Bridge (Stopped on {datetime.now()})")
×
155

156

157
if __name__ == "__main__":
158
    main()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc