diff --git a/src/gallia/commands/scan/uds/sa_dump_seeds.py b/src/gallia/commands/scan/uds/sa_dump_seeds.py index 40d8204e0..2de7b5d3e 100644 --- a/src/gallia/commands/scan/uds/sa_dump_seeds.py +++ b/src/gallia/commands/scan/uds/sa_dump_seeds.py @@ -11,7 +11,7 @@ from gallia.command.config import AutoInt, Field, HexBytes from gallia.command.uds import UDSScannerConfig from gallia.log import get_logger -from gallia.services.uds import NegativeResponse, UDSRequestConfig +from gallia.services.uds import NegativeResponse, UDSErrorCodes, UDSRequestConfig from gallia.services.uds.core.utils import g_repr logger = get_logger(__name__) @@ -25,16 +25,22 @@ class SASeedsDumperConfig(UDSScannerConfig): level: AutoInt = Field( 0x11, description="Set security access level to request seed from", metavar="INT" ) - send_zero_key: int = Field( - 0, - description="Attempt to fool brute force protection by pretending to send a key after requesting a seed (all zero bytes, length can be specified)", + send_zero_key: int | None = Field( + None, + description="Attempt to fool brute force protection by sending an all-zero key after each seed request. The length of the key can be specified or will otherwise be automatically determined.", metavar="BYTE_LENGTH", - const=96, + const=0, + ) + determine_key_size_max_length: int = Field( + 1024, + description="When trying to automatically determine the key size expected by the ECU, test key lengths from 1 up to N bytes.", + metavar="INT", ) reset: int | None = Field( None, - description="Attempt to fool brute force protection by resetting the ECU after every nth requested seed.", - const=1, + metavar="N", + description="Attempt to fool brute force protection by resetting the ECU when needed or after every N-th requested seed.", + const=0, ) duration: float = Field( 0, @@ -56,6 +62,10 @@ class SASeedsDumper(UDSScanner): CONFIG_TYPE = SASeedsDumperConfig SHORT_HELP = "dump security access seeds" + attempt_reset: bool = False + is_key_length_determined: bool = False + key_length: int = 0 + def __init__(self, config: SASeedsDumperConfig): super().__init__(config) self.config: SASeedsDumperConfig = config @@ -67,18 +77,49 @@ async def request_seed(self, level: int, data: bytes) -> bytes | None: ) if isinstance(resp, NegativeResponse): logger.warning(f"ECU replied with an error: {resp}") + self.attempt_reset = True return None return resp.security_seed - async def send_key(self, level: int, key: bytes) -> bool: + # TODO: Optimize by checking well-known lenghts first: 0x60, 125, 250, 512, 1024. + async def send_key(self, level: int) -> bool: + """Returns `True` when main loop should exit, e.g. on successful unlock or exhaustion of key length search space.""" + if ( + self.is_key_length_determined is False + and self.key_length > self.config.determine_key_size_max_length + ): + logger.error( + f"Unable to identify valid key length for SecurityAccess between 1 and {self.config.determine_key_size_max_length} bytes." + ) + return True + + if not self.is_key_length_determined: + logger.info(f"No key length given, testing key length {self.key_length}…") + resp = await self.ecu.security_access_send_key( - level + 1, key, config=UDSRequestConfig(tags=["ANALYZE"]) + level + 1, bytes(self.key_length), config=UDSRequestConfig(tags=["ANALYZE"]) ) - if isinstance(resp, NegativeResponse): + if not isinstance(resp, NegativeResponse): + logger.result( + f"That's unexpected: Unlocked SA level {g_repr(level)} with all-zero key of length {self.key_length}." + ) + return True + + if ( + not self.is_key_length_determined + and resp.response_code == UDSErrorCodes.incorrectMessageLengthOrInvalidFormat + ): + logger.debug( + f"{self.key_length} does not seem to be the correct key length, incrementing." + ) + self.key_length += 1 + elif not self.is_key_length_determined: + logger.result(f"The ECU seems to be expecting keys of length {self.key_length}.") + self.is_key_length_determined = True + else: logger.debug(f"Key was rejected: {resp}") - return False - logger.result(f'Unlocked SA level {g_repr(level)} with key "{key.hex()}"! resp: {resp}') - return True + + return False def log_size(self, path: Path, time_delta: float) -> None: size = path.stat().st_size / 1024 @@ -102,15 +143,24 @@ async def main(self) -> None: logger.critical(f"could not change to session: {resp}") return - i = -1 - seeds_file = Path.joinpath(self.artifacts_dir, "seeds.bin") + seeds_file = self.artifacts_dir.joinpath("seeds.bin") file = seeds_file.open("wb", buffering=0) duration = self.config.duration * 60 start_time = time.time() last_seed = b"" - reset = False - runs_since_last_reset = 0 + requests_since_last_reset = 0 print_speed = False + if self.config.send_zero_key is None: + # Set this to True to not suppress log messages or evaluation of + # replies later on even though we never send a key + self.is_key_length_determined = True + elif self.config.send_zero_key > 0: + # A specific key_length was given, so use it and no automation + self.is_key_length_determined = True + self.key_length = self.config.send_zero_key + else: + # Start with length 1 in automatic search + self.key_length = 1 while duration <= 0 or time.time() - start_time < duration: # Print information about current dump speed every `interval` seconds. @@ -124,37 +174,60 @@ async def main(self) -> None: self.log_size(seeds_file, time.time() - start_time) print_speed = False - if self.config.check_session or reset: + if self.config.reset is not None: + if (self.config.reset == 0 and self.attempt_reset) or ( + self.config.reset > 0 and requests_since_last_reset == self.config.reset + ): + logger.info( + f"Resetting the ECU after {requests_since_last_reset} seed requests" + ) + await self.ecu.ecu_reset(0x01) + + if await self.ecu.wait_for_ecu() is False: + logger.error("ECU did not respond after reset; exiting…") + sys.exit(1) + + # Re-enter session. Checking/logging will be done a few lines later if required + await self.ecu.set_session(session) + + requests_since_last_reset = 0 + self.attempt_reset = False + + if self.config.check_session: if not await self.ecu.check_and_set_session(self.config.session): logger.error(f"ECU persistently lost session {g_repr(self.config.session)}") sys.exit(1) - reset = False - try: seed = await self.request_seed(self.config.level, self.config.data_record) + if seed is None: + continue # Errors are already logged in .request_seed() except TimeoutError: logger.error("Timeout while requesting seed") continue except Exception as e: logger.critical(f"Error while requesting seed: {g_repr(e)}") sys.exit(1) + finally: + requests_since_last_reset += 1 - if seed is None: - # Errors are already logged in .request_seed() - continue - - logger.info(f"Received seed of length {len(seed)}") + # During detection of key length the same seed might be returned multiple times. + # This is, however, not considered critical and should not affect statistical evaluations of the seeds. + if self.is_key_length_determined is False: + logger.debug("Still trying to find key size, not evaluating/saving seed responses") + else: + file.write(seed) - file.write(seed) - if last_seed == seed: - logger.warning("Received the same seed as before") + if last_seed == seed: + logger.warning("Received the same seed as before") + else: + logger.info(f"Received seed of length {len(seed)}") last_seed = seed - if self.config.send_zero_key > 0: + if self.key_length > 0: try: - if await self.send_key(self.config.level, bytes(self.config.send_zero_key)): + if await self.send_key(self.config.level): break except TimeoutError: logger.warning("Timeout while sending key") @@ -163,29 +236,6 @@ async def main(self) -> None: logger.critical(f"Error while sending key: {g_repr(e)}") sys.exit(1) - runs_since_last_reset += 1 - - if runs_since_last_reset == self.config.reset: - reset = True - runs_since_last_reset = 0 - - try: - logger.info("Resetting the ECU") - await self.ecu.ecu_reset(0x01) - logger.info("Waiting for the ECU to recover…") - await self.ecu.wait_for_ecu() - except TimeoutError: - logger.error("ECU did not respond after reset; exiting…") - sys.exit(1) - except ConnectionError: - logger.warning( - "Lost connection to the ECU after performing a reset. Attempting to reconnect…" - ) - await self.ecu.reconnect() - - # Re-enter session. Checking/logging will be done at the beginning of next iteration - await self.ecu.set_session(session) - if self.config.sleep is not None: logger.info(f"Sleeping for {self.config.sleep} seconds between seed requests…") await asyncio.sleep(self.config.sleep)