Skip to content

A few UDS, GMLAN and AutomotiveScanner updates #4777

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions scapy/contrib/automotive/bmw/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from scapy.packet import Packet, bind_layers
from scapy.fields import ByteField, ShortField, ByteEnumField, X3BytesField, \
StrField, StrFixedLenField, LEIntField, LEThreeBytesField, \
StrField, StrFixedLenField, LEThreeBytesField, \
PacketListField, IntField, IPField, ThreeBytesField, ShortEnumField, \
XStrFixedLenField
from scapy.contrib.automotive.uds import UDS, UDS_RDBI, UDS_DSC, UDS_IOCBI, \
Expand Down Expand Up @@ -321,14 +321,16 @@ class SVK(Packet):
3: "software entry incompatible to hardware entry",
4: "software entry incompatible with other software entry"}

@staticmethod
def get_length(p: Packet):
return len(p.original) - (8 * p.entries_count + 7)

fields_desc = [
ByteEnumField("prog_status1", 0, prog_status_enum),
ByteEnumField("prog_status2", 0, prog_status_enum),
ShortField("entries_count", 0),
SVK_DateField("prog_date", 0),
ByteField("pad1", 0),
LEIntField("prog_milage", 0),
StrFixedLenField("pad2", b'\x00\x00\x00\x00\x00', length=5),
StrFixedLenField("pad", b'\x00', length_from=get_length),
PacketListField("entries", [], SVK_Entry,
count_from=lambda x: x.entries_count)]

Expand Down
10 changes: 5 additions & 5 deletions scapy/contrib/automotive/gm/gmlan.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@
if conf.contribs['GMLAN']['treat-response-pending-as-answer']:
pass
except KeyError:
log_automotive.info("Specify \"conf.contribs['GMLAN'] = "
"{'treat-response-pending-as-answer': True}\" to treat "
"a negative response 'RequestCorrectlyReceived-"
"ResponsePending' as answer of a request. \n"
"The default value is False.")
# log_automotive.info("Specify \"conf.contribs['GMLAN'] = "
# "{'treat-response-pending-as-answer': True}\" to treat "
# "a negative response 'RequestCorrectlyReceived-"
# "ResponsePending' as answer of a request. \n"
# "The default value is False.")
conf.contribs['GMLAN'] = {'treat-response-pending-as-answer': False}

conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = None
Expand Down
11 changes: 5 additions & 6 deletions scapy/contrib/automotive/obd/obd.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import struct

from scapy.contrib.automotive import log_automotive
from scapy.contrib.automotive.obd.iid.iids import *
from scapy.contrib.automotive.obd.mid.mids import *
from scapy.contrib.automotive.obd.pid.pids import *
Expand All @@ -24,11 +23,11 @@
if conf.contribs['OBD']['treat-response-pending-as-answer']:
pass
except KeyError:
log_automotive.info("Specify \"conf.contribs['OBD'] = "
"{'treat-response-pending-as-answer': True}\" to treat "
"a negative response 'requestCorrectlyReceived-"
"ResponsePending' as answer of a request. \n"
"The default value is False.")
# log_automotive.info("Specify \"conf.contribs['OBD'] = "
# "{'treat-response-pending-as-answer': True}\" to treat "
# "a negative response 'requestCorrectlyReceived-"
# "ResponsePending' as answer of a request. \n"
# "The default value is False.")
conf.contribs['OBD'] = {'treat-response-pending-as-answer': False}


Expand Down
10 changes: 10 additions & 0 deletions scapy/contrib/automotive/scanner/enumerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ def _get_initial_requests(self, **kwargs):

def __reduce__(self): # type: ignore
f, t, d = super(ServiceEnumerator, self).__reduce__() # type: ignore

try:
del d["_tester_present_sender"]
except KeyError:
pass

try:
for k, v in d["_request_iterators"].items():
d["_request_iterators"][k] = list(v)
Expand Down Expand Up @@ -287,6 +293,10 @@ def pre_execute(self, socket, state, global_configuration):
except KeyError:
self._tester_present_sender = None

def post_execute(self, socket, state, global_configuration):
# type: (_SocketUnion, EcuState, AutomotiveTestCaseExecutorConfiguration) -> None # noqa: E501
self._tester_present_sender = None

def execute(self, socket, state, **kwargs):
# type: (_SocketUnion, EcuState, Any) -> None
self.check_kwargs(kwargs)
Expand Down
55 changes: 35 additions & 20 deletions scapy/contrib/automotive/scanner/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,20 +162,20 @@ def reset_target(self):

def reconnect(self):
# type: () -> None
if self.reconnect_handler:
try:
if self.socket:
self.socket.close()
except Exception as e:
log_automotive.exception(
"Exception '%s' during socket.close", e)

log_automotive.info("Target reconnect")
socket = self.reconnect_handler()
if not isinstance(socket, SingleConversationSocket):
self.socket = SingleConversationSocket(socket)
else:
self.socket = socket
if not self.reconnect_handler:
return

try:
if self.socket:
self.socket.close()
except Exception as e:
log_automotive.exception(
"Exception '%s' during socket.close", e)

log_automotive.info("Target reconnect")
socket = self.reconnect_handler()
self.socket = socket if isinstance(socket, SingleConversationSocket) \
else SingleConversationSocket(socket)

if self.socket and self.socket.closed:
raise Scapy_Exception(
Expand Down Expand Up @@ -290,7 +290,12 @@ def scan(self, timeout=None):
kill_time = None
else:
kill_time = time.monotonic() + timeout
while kill_time is None or kill_time > time.monotonic():
while True:
terminate = kill_time and kill_time <= time.monotonic()
if terminate:
log_automotive.debug(
"Execution time exceeded. Terminating scan!")
return
test_case_executed = False
log_automotive.info("[i] Scan progress %0.2f", self.progress())
log_automotive.debug("[i] Scan paths %s", self.state_paths)
Expand Down Expand Up @@ -400,6 +405,20 @@ def enter_state(self, prev_state, next_state):
trans_func, trans_kwargs, clean_func = funcs
state_changed = trans_func(
self.socket, self.configuration, trans_kwargs)

if self.socket.closed:
for i in range(5):
try:
self.reconnect()
break
except Exception:
if i == 4:
raise
if self.configuration.stop_event:
self.configuration.stop_event.wait(1)
else:
time.sleep(1)

if state_changed:
self.target_state = next_state

Expand All @@ -416,15 +435,11 @@ def cleanup_state(self):
Executes all collected cleanup functions from a traversed path
:return: None
"""
if not self.socket:
log_automotive.warning("Socket is None! Leaving cleanup_state")
return

for f in self.cleanup_functions:
if not callable(f):
continue
try:
if not f(self.socket, self.configuration):
if not f(self.socket, self.configuration): # type: ignore
log_automotive.info(
"Cleanup function %s failed", repr(f))
except (OSError, ValueError, Scapy_Exception) as e:
Expand Down
3 changes: 3 additions & 0 deletions scapy/contrib/automotive/scanner/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def add_edge(self, edge, transition_function=None):
:param edge: edge from node to node
:param transition_function: tuple with enter and cleanup function
"""
if edge[1] in self.edges[edge[0]]:
# Edge already exists
return
self.edges[edge[0]].append(edge[1])
self.weights[edge] = 1
self.__transition_functions[edge] = transition_function
Expand Down
39 changes: 23 additions & 16 deletions scapy/contrib/automotive/uds.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
PacketField
from scapy.packet import Packet, bind_layers, NoPayload, Raw
from scapy.config import conf
from scapy.error import log_loading
from scapy.utils import PeriodicSenderThread
from scapy.contrib.isotp import ISOTP

Expand All @@ -35,14 +34,13 @@
if conf.contribs['UDS']['treat-response-pending-as-answer']:
pass
except KeyError:
log_loading.info("Specify \"conf.contribs['UDS'] = "
"{'treat-response-pending-as-answer': True}\" to treat "
"a negative response 'requestCorrectlyReceived-"
"ResponsePending' as answer of a request. \n"
"The default value is False.")
# log_loading.info("Specify \"conf.contribs['UDS'] = "
# "{'treat-response-pending-as-answer': True}\" to treat "
# "a negative response 'requestCorrectlyReceived-"
# "ResponsePending' as answer of a request. \n"
# "The default value is False.")
conf.contribs['UDS'] = {'treat-response-pending-as-answer': False}


conf.debug_dissector = True


Expand Down Expand Up @@ -1096,9 +1094,12 @@ class UDS_RDTCIPR(Packet):
ByteEnumField('reportType', 0, UDS_RDTCI.reportTypes),
ConditionalField(
FlagsField('DTCStatusAvailabilityMask', 0, 8, UDS_RDTCI.dtcStatus),
lambda pkt: pkt.reportType in [0x01, 0x07, 0x11, 0x12, 0x02, 0x0A,
0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x13,
0x15]),
lambda pkt: pkt.reportType in [0x01, 0x07, 0x09, 0x11, 0x12, 0x02, 0x0A,
0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x13, 0x15]),
ConditionalField(ByteField('DTCSeverity', 0),
lambda pkt: pkt.reportType in [0x09]),
ConditionalField(ByteField('DTCFunctionalUnit', 0),
lambda pkt: pkt.reportType in [0x09]),
ConditionalField(ByteEnumField('DTCFormatIdentifier', 0,
{0: 'ISO15031-6DTCFormat',
1: 'UDS-1DTCFormat',
Expand All @@ -1111,12 +1112,11 @@ class UDS_RDTCIPR(Packet):
0x11, 0x12]),
ConditionalField(PacketListField('DTCAndStatusRecord', None,
pkt_cls=DTCAndStatusRecord),
lambda pkt: pkt.reportType in [0x02, 0x0A, 0x0B,
lambda pkt: pkt.reportType in [0x02, 0x09, 0x0A, 0x0B,
0x0C, 0x0D, 0x0E,
0x0F, 0x13, 0x15]),
ConditionalField(StrField('dataRecord', b""),
lambda pkt: pkt.reportType in [0x03, 0x08, 0x09,
0x10, 0x14]),
lambda pkt: pkt.reportType in [0x03, 0x08, 0x10, 0x14]),
ConditionalField(PacketField('snapshotRecord', None,
pkt_cls=DTCSnapshotRecord),
lambda pkt: pkt.reportType in [0x04]),
Expand All @@ -1130,6 +1130,8 @@ def answers(self, other):
return False
if not other.reportType == self.reportType:
return False
if self.reportType == 0x02:
return other.DTCStatusMask & self.DTCStatusAvailabilityMask
if self.reportType == 0x06:
return other.dtc == self.extendedDataRecord.dtcAndStatus.dtc
if self.reportType == 0x04:
Expand Down Expand Up @@ -1168,9 +1170,14 @@ class UDS_RCPR(Packet):
]

def answers(self, other):
return isinstance(other, UDS_RC) \
and other.routineControlType == self.routineControlType \
and other.routineIdentifier == self.routineIdentifier
if isinstance(other, UDS_RC) \
and other.routineControlType == self.routineControlType \
and other.routineIdentifier == self.routineIdentifier:
if isinstance(self.payload, NoPayload):
return True
else:
return self.payload.answers(other.payload)
return False


bind_layers(UDS, UDS_RCPR, service=0x71)
Expand Down
53 changes: 47 additions & 6 deletions scapy/contrib/automotive/uds_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class UDS_DSCEnumerator(UDS_Enumerator, StateGeneratingServiceEnumerator):
_supported_kwargs = copy.copy(ServiceEnumerator._supported_kwargs)
_supported_kwargs.update({
'delay_state_change': (int, lambda x: x >= 0),
'overwrite_timeout': (bool, None)
'overwrite_timeout': (bool, None),
'close_socket_when_entering_session_2': (bool, None)
})
_supported_kwargs["scan_range"] = (
(list, tuple, range), lambda x: max(x) < 0x100 and min(x) >= 0)
Expand All @@ -107,7 +108,12 @@ class UDS_DSCEnumerator(UDS_Enumerator, StateGeneratingServiceEnumerator):
unit-test scenarios, this value should
be set to False, in order to use the
timeout specified by the 'timeout'
argument."""
argument.
:param bool close_socket_when_entering_session_2: False by default.
This enumerator will close the socket
if session 2 (ProgrammingSession)
was entered, if True. This will
force a reconnect by the executor."""

def _get_initial_requests(self, **kwargs):
# type: (Any) -> Iterable[Packet]
Expand Down Expand Up @@ -160,10 +166,21 @@ def get_new_edge(self,
config # type: AutomotiveTestCaseExecutorConfiguration
): # type: (...) -> Optional[_Edge]
edge = super(UDS_DSCEnumerator, self).get_new_edge(socket, config)

try:
close_socket = config[UDS_DSCEnumerator.__name__]["close_socket_when_entering_session_2"] # noqa: E501
except KeyError:
close_socket = False

if edge:
state, new_state = edge
# Force TesterPresent if session is changed
new_state.tp = 1 # type: ignore
try:
if close_socket and new_state.session == 2: # type: ignore
new_state.tp = 0 # type: ignore
except (AttributeError, KeyError):
pass
return state, new_state
return None

Expand All @@ -179,9 +196,30 @@ def enter_state_with_tp(sock, # type: _SocketUnion
delay = conf[UDS_DSCEnumerator.__name__]["delay_state_change"]
except KeyError:
delay = 5

try:
close_socket = conf[UDS_DSCEnumerator.__name__]["close_socket_when_entering_session_2"] # noqa: E501
except KeyError:
close_socket = False

conf.stop_event.wait(delay)
state_changed = UDS_DSCEnumerator.enter_state(
sock, conf, kwargs["req"])

try:
session = kwargs["req"].diagnosticSessionType
except AttributeError:
session = 0

if close_socket and session == 2:
if not hasattr(sock, "ip"):
log_automotive.warning("Likely closing a CAN based socket! "
"This might be a configuration issue.")
log_automotive.info(
"Entered Programming Session: Closing socket connection")
sock.close()
conf.stop_event.wait(delay)

if not state_changed:
UDS_TPEnumerator.cleanup(sock, conf)
return state_changed
Expand Down Expand Up @@ -287,7 +325,7 @@ def _get_initial_requests(self, **kwargs):
def _get_table_entry_y(self, tup):
# type: (_AutomotiveTestCaseScanResult) -> str
resp = tup[2]
if resp is not None:
if resp is not None and resp.service != 0x7f:
return "0x%02x %s: %s" % (
tup[1].periodicDataIdentifier,
tup[1].sprintf("%UDS_RDBPI.periodicDataIdentifier%"),
Expand Down Expand Up @@ -1250,19 +1288,22 @@ def default_test_case_clss(self):


def uds_software_reset(connection, # type: _SocketUnion
logger=log_automotive # type: logging.Logger
logger=log_automotive, # type: logging.Logger
timeout=0.5 # type: Union[int, float]
): # type: (...) -> None
logger.debug("Reset procedure of target started.")
resp = connection.sr1(UDS() / UDS_ER(resetType=1),
timeout=5,
timeout=timeout,
verbose=False)
if resp and resp.service != 0x7f:
logger.debug("Reset procedure of target complete")
return

logger.debug("Couldn't reset target with UDS_ER. "
"At least try to set target back to DefaultSession")
resp = connection.sr1(UDS() / UDS_DSC(b"\x01"), verbose=False, timeout=5)
resp = connection.sr1(UDS() / UDS_DSC(b"\x01"),
verbose=False,
timeout=timeout)
if resp and resp.service != 0x7f:
logger.debug("Target in DefaultSession")
return
Expand Down
Loading