From 5090f55eba388a664d7e5ee214178ab4791e8367 Mon Sep 17 00:00:00 2001 From: Elias Hohl Date: Tue, 27 Feb 2024 13:31:23 +0100 Subject: [PATCH] fix(agent/security): Mitigate shell injection vulnerabilities (#6903) * Mitigate shell injection in `MacOSTTS._speech` implementation * Mitigate shell command control bypassing in `execute_shell` and `execute_shell_popen` commands - Improve implementation and docstring of `validate_command` function --------- Co-authored-by: Reinier van der Leer --- .../autogpt/autogpt/commands/execute_code.py | 42 +++++++++++++------ autogpts/autogpt/autogpt/speech/macos_tts.py | 8 ++-- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/autogpts/autogpt/autogpt/commands/execute_code.py b/autogpts/autogpt/autogpt/commands/execute_code.py index 93bfd53d40e3..dff993504284 100644 --- a/autogpts/autogpt/autogpt/commands/execute_code.py +++ b/autogpts/autogpt/autogpt/commands/execute_code.py @@ -2,6 +2,7 @@ import logging import os +import shlex import subprocess from pathlib import Path from tempfile import NamedTemporaryFile @@ -224,25 +225,31 @@ def execute_python_file( raise CommandExecutionError(f"Could not run the script in a container: {e}") -def validate_command(command: str, config: Config) -> bool: - """Validate a command to ensure it is allowed +def validate_command(command_line: str, config: Config) -> tuple[bool, bool]: + """Check whether a command is allowed and whether it may be executed in a shell. + + If shell command control is enabled, we disallow executing in a shell, because + otherwise the model could easily circumvent the command filter using shell features. Args: - command (str): The command to validate - config (Config): The config to use to validate the command + command_line (str): The command line to validate + config (Config): The application config including shell command control settings Returns: bool: True if the command is allowed, False otherwise + bool: True if the command may be executed in a shell, False otherwise """ - if not command: - return False + if not command_line: + return False, False - command_name = command.split()[0] + command_name = shlex.split(command_line)[0] if config.shell_command_control == ALLOWLIST_CONTROL: - return command_name in config.shell_allowlist + return command_name in config.shell_allowlist, False + elif config.shell_command_control == DENYLIST_CONTROL: + return command_name not in config.shell_denylist, False else: - return command_name not in config.shell_denylist + return True, True @command( @@ -269,7 +276,8 @@ def execute_shell(command_line: str, agent: Agent) -> str: Returns: str: The output of the command """ - if not validate_command(command_line, agent.legacy_config): + allow_execute, allow_shell = validate_command(command_line, agent.legacy_config) + if not allow_execute: logger.info(f"Command '{command_line}' not allowed") raise OperationNotAllowedError("This shell command is not allowed.") @@ -282,7 +290,11 @@ def execute_shell(command_line: str, agent: Agent) -> str: f"Executing command '{command_line}' in working directory '{os.getcwd()}'" ) - result = subprocess.run(command_line, capture_output=True, shell=True) + result = subprocess.run( + command_line if allow_shell else shlex.split(command_line), + capture_output=True, + shell=allow_shell, + ) output = f"STDOUT:\n{result.stdout.decode()}\nSTDERR:\n{result.stderr.decode()}" # Change back to whatever the prior working dir was @@ -316,7 +328,8 @@ def execute_shell_popen(command_line: str, agent: Agent) -> str: Returns: str: Description of the fact that the process started and its id """ - if not validate_command(command_line, agent.legacy_config): + allow_execute, allow_shell = validate_command(command_line, agent.legacy_config) + if not allow_execute: logger.info(f"Command '{command_line}' not allowed") raise OperationNotAllowedError("This shell command is not allowed.") @@ -331,7 +344,10 @@ def execute_shell_popen(command_line: str, agent: Agent) -> str: do_not_show_output = subprocess.DEVNULL process = subprocess.Popen( - command_line, shell=True, stdout=do_not_show_output, stderr=do_not_show_output + command_line if allow_shell else shlex.split(command_line), + shell=allow_shell, + stdout=do_not_show_output, + stderr=do_not_show_output, ) # Change back to whatever the prior working dir was diff --git a/autogpts/autogpt/autogpt/speech/macos_tts.py b/autogpts/autogpt/autogpt/speech/macos_tts.py index e88331d21207..6a1dd99d5102 100644 --- a/autogpts/autogpt/autogpt/speech/macos_tts.py +++ b/autogpts/autogpt/autogpt/speech/macos_tts.py @@ -1,7 +1,7 @@ """ MacOS TTS Voice. """ from __future__ import annotations -import os +import subprocess from autogpt.speech.base import VoiceBase @@ -15,9 +15,9 @@ def _setup(self) -> None: def _speech(self, text: str, voice_index: int = 0) -> bool: """Play the given text.""" if voice_index == 0: - os.system(f'say "{text}"') + subprocess.run(["say", text], shell=False) elif voice_index == 1: - os.system(f'say -v "Ava (Premium)" "{text}"') + subprocess.run(["say", "-v", "Ava (Premium)", text], shell=False) else: - os.system(f'say -v Samantha "{text}"') + subprocess.run(["say", "-v", "Samantha", text], shell=False) return True