Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Dump seeds improvements #684

Merged
merged 3 commits into from
Feb 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 103 additions & 53 deletions src/gallia/commands/scan/uds/sa_dump_seeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from gallia.command.config import AutoInt, Field, HexBytes
from gallia.command.uds import UDSScannerConfig
from gallia.log import get_logger
from gallia.services.uds import NegativeResponse, UDSRequestConfig
from gallia.services.uds import NegativeResponse, UDSErrorCodes, UDSRequestConfig
from gallia.services.uds.core.utils import g_repr

logger = get_logger(__name__)
Expand All @@ -25,16 +25,22 @@ class SASeedsDumperConfig(UDSScannerConfig):
level: AutoInt = Field(
0x11, description="Set security access level to request seed from", metavar="INT"
)
send_zero_key: int = Field(
0,
description="Attempt to fool brute force protection by pretending to send a key after requesting a seed (all zero bytes, length can be specified)",
send_zero_key: int | None = Field(
None,
description="Attempt to fool brute force protection by sending an all-zero key after each seed request. The length of the key can be specified or will otherwise be automatically determined.",
metavar="BYTE_LENGTH",
const=96,
const=0,
)
determine_key_size_max_length: int = Field(
1024,
description="When trying to automatically determine the key size expected by the ECU, test key lengths from 1 up to N bytes.",
metavar="INT",
)
reset: int | None = Field(
None,
description="Attempt to fool brute force protection by resetting the ECU after every nth requested seed.",
const=1,
metavar="N",
description="Attempt to fool brute force protection by resetting the ECU when needed or after every N-th requested seed.",
const=0,
)
duration: float = Field(
0,
Expand All @@ -56,6 +62,10 @@ class SASeedsDumper(UDSScanner):
CONFIG_TYPE = SASeedsDumperConfig
SHORT_HELP = "dump security access seeds"

attempt_reset: bool = False
is_key_length_determined: bool = False
key_length: int = 0

def __init__(self, config: SASeedsDumperConfig):
super().__init__(config)
self.config: SASeedsDumperConfig = config
Expand All @@ -67,18 +77,49 @@ async def request_seed(self, level: int, data: bytes) -> bytes | None:
)
if isinstance(resp, NegativeResponse):
logger.warning(f"ECU replied with an error: {resp}")
self.attempt_reset = True
return None
return resp.security_seed

async def send_key(self, level: int, key: bytes) -> bool:
# TODO: Optimize by checking well-known lenghts first: 0x60, 125, 250, 512, 1024.
async def send_key(self, level: int) -> bool:
"""Returns `True` when main loop should exit, e.g. on successful unlock or exhaustion of key length search space."""
if (
self.is_key_length_determined is False
and self.key_length > self.config.determine_key_size_max_length
):
logger.error(
f"Unable to identify valid key length for SecurityAccess between 1 and {self.config.determine_key_size_max_length} bytes."
)
return True

if not self.is_key_length_determined:
logger.info(f"No key length given, testing key length {self.key_length}…")

resp = await self.ecu.security_access_send_key(
level + 1, key, config=UDSRequestConfig(tags=["ANALYZE"])
level + 1, bytes(self.key_length), config=UDSRequestConfig(tags=["ANALYZE"])
)
if isinstance(resp, NegativeResponse):
if not isinstance(resp, NegativeResponse):
logger.result(
f"That's unexpected: Unlocked SA level {g_repr(level)} with all-zero key of length {self.key_length}."
)
return True

if (
not self.is_key_length_determined
and resp.response_code == UDSErrorCodes.incorrectMessageLengthOrInvalidFormat
):
logger.debug(
f"{self.key_length} does not seem to be the correct key length, incrementing."
)
self.key_length += 1
elif not self.is_key_length_determined:
logger.result(f"The ECU seems to be expecting keys of length {self.key_length}.")
self.is_key_length_determined = True
else:
logger.debug(f"Key was rejected: {resp}")
return False
logger.result(f'Unlocked SA level {g_repr(level)} with key "{key.hex()}"! resp: {resp}')
return True

return False

def log_size(self, path: Path, time_delta: float) -> None:
size = path.stat().st_size / 1024
Expand All @@ -102,15 +143,24 @@ async def main(self) -> None:
logger.critical(f"could not change to session: {resp}")
return

i = -1
seeds_file = Path.joinpath(self.artifacts_dir, "seeds.bin")
seeds_file = self.artifacts_dir.joinpath("seeds.bin")
file = seeds_file.open("wb", buffering=0)
duration = self.config.duration * 60
start_time = time.time()
last_seed = b""
reset = False
runs_since_last_reset = 0
requests_since_last_reset = 0
print_speed = False
if self.config.send_zero_key is None:
# Set this to True to not suppress log messages or evaluation of
# replies later on even though we never send a key
self.is_key_length_determined = True
elif self.config.send_zero_key > 0:
# A specific key_length was given, so use it and no automation
self.is_key_length_determined = True
self.key_length = self.config.send_zero_key
else:
# Start with length 1 in automatic search
self.key_length = 1

while duration <= 0 or time.time() - start_time < duration:
# Print information about current dump speed every `interval` seconds.
Expand All @@ -124,37 +174,60 @@ async def main(self) -> None:
self.log_size(seeds_file, time.time() - start_time)
print_speed = False

if self.config.check_session or reset:
if self.config.reset is not None:
if (self.config.reset == 0 and self.attempt_reset) or (
self.config.reset > 0 and requests_since_last_reset == self.config.reset
):
logger.info(
f"Resetting the ECU after {requests_since_last_reset} seed requests"
)
await self.ecu.ecu_reset(0x01)

if await self.ecu.wait_for_ecu() is False:
logger.error("ECU did not respond after reset; exiting…")
sys.exit(1)

# Re-enter session. Checking/logging will be done a few lines later if required
await self.ecu.set_session(session)

requests_since_last_reset = 0
self.attempt_reset = False

if self.config.check_session:
if not await self.ecu.check_and_set_session(self.config.session):
logger.error(f"ECU persistently lost session {g_repr(self.config.session)}")
sys.exit(1)

reset = False

try:
seed = await self.request_seed(self.config.level, self.config.data_record)
if seed is None:
continue # Errors are already logged in .request_seed()
except TimeoutError:
logger.error("Timeout while requesting seed")
continue
except Exception as e:
logger.critical(f"Error while requesting seed: {g_repr(e)}")
sys.exit(1)
finally:
requests_since_last_reset += 1

if seed is None:
# Errors are already logged in .request_seed()
continue

logger.info(f"Received seed of length {len(seed)}")
# During detection of key length the same seed might be returned multiple times.
# This is, however, not considered critical and should not affect statistical evaluations of the seeds.
if self.is_key_length_determined is False:
logger.debug("Still trying to find key size, not evaluating/saving seed responses")
else:
file.write(seed)

file.write(seed)
if last_seed == seed:
logger.warning("Received the same seed as before")
if last_seed == seed:
logger.warning("Received the same seed as before")
else:
logger.info(f"Received seed of length {len(seed)}")

last_seed = seed

if self.config.send_zero_key > 0:
if self.key_length > 0:
try:
if await self.send_key(self.config.level, bytes(self.config.send_zero_key)):
if await self.send_key(self.config.level):
break
except TimeoutError:
logger.warning("Timeout while sending key")
Expand All @@ -163,29 +236,6 @@ async def main(self) -> None:
logger.critical(f"Error while sending key: {g_repr(e)}")
sys.exit(1)

runs_since_last_reset += 1

if runs_since_last_reset == self.config.reset:
reset = True
runs_since_last_reset = 0

try:
logger.info("Resetting the ECU")
await self.ecu.ecu_reset(0x01)
logger.info("Waiting for the ECU to recover…")
await self.ecu.wait_for_ecu()
except TimeoutError:
logger.error("ECU did not respond after reset; exiting…")
sys.exit(1)
except ConnectionError:
logger.warning(
"Lost connection to the ECU after performing a reset. Attempting to reconnect…"
)
await self.ecu.reconnect()

# Re-enter session. Checking/logging will be done at the beginning of next iteration
await self.ecu.set_session(session)

if self.config.sleep is not None:
logger.info(f"Sleeping for {self.config.sleep} seconds between seed requests…")
await asyncio.sleep(self.config.sleep)
Expand Down
Loading