|
| 1 | +""" |
| 2 | +Dettectinator - The Python library to your DeTT&CT YAML files. |
| 3 | +Authors: |
| 4 | + Martijn Veken, Sirius Security |
| 5 | + Ruben Bouman, Sirius Security |
| 6 | +License: GPL-3.0 License |
| 7 | +""" |
| 8 | + |
| 9 | +import json |
| 10 | +import sys |
| 11 | +import os |
| 12 | +import importlib |
| 13 | +import inspect |
| 14 | + |
| 15 | +from dettectinator import DettectTechniquesAdministration, DettectDataSourcesAdministration |
| 16 | +from plugins.technique_import import TechniqueBase |
| 17 | +from plugins.datasources_import import DatasourceBase |
| 18 | +from argparse import ArgumentParser, Namespace |
| 19 | + |
| 20 | + |
| 21 | +class CommandLine: |
| 22 | + |
| 23 | + @staticmethod |
| 24 | + def _get_raw_commandline(argument_names: list) -> str: |
| 25 | + """ |
| 26 | + Some arguments need to be read from the command line before being processed with ArgumentParser. |
| 27 | + This function provides a way to read these values from the command line in a simple way. |
| 28 | + It's ugly, but it works :) |
| 29 | + :return: The value of the requested argument. |
| 30 | + """ |
| 31 | + prev = '' |
| 32 | + value = '' |
| 33 | + for item in sys.argv: |
| 34 | + if prev in argument_names: |
| 35 | + value = item |
| 36 | + break |
| 37 | + prev = item |
| 38 | + return value |
| 39 | + |
| 40 | + @staticmethod |
| 41 | + def _print_plugins(import_plugins: dict) -> None: |
| 42 | + """ |
| 43 | + Prints the list of available data import plugins in a module |
| 44 | + :param import_plugins: dictionary containing the plugins |
| 45 | + """ |
| 46 | + for name in import_plugins.keys(): |
| 47 | + print(f' - {name}') |
| 48 | + |
| 49 | + @staticmethod |
| 50 | + def _get_plugins() -> dict: |
| 51 | + """ |
| 52 | + Retrieves all plugins from the plugin folder |
| 53 | + :return: dict containing plugins and modules |
| 54 | + """ |
| 55 | + import_plugins = {} |
| 56 | + path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'plugins') |
| 57 | + for module in [x for x in os.listdir(path) if x[-3:] == '.py']: |
| 58 | + plugin_mod = importlib.import_module('plugins.' + module[:-3]) |
| 59 | + for name, cls in inspect.getmembers(plugin_mod, inspect.isclass): |
| 60 | + if ('Technique' in name or 'Datasource' in name) and 'Base' not in name: |
| 61 | + import_plugins[name] = plugin_mod |
| 62 | + return import_plugins |
| 63 | + |
| 64 | + @staticmethod |
| 65 | + def _set_default_params(parser: ArgumentParser): |
| 66 | + """ |
| 67 | + Set the default command line arguments |
| 68 | + """ |
| 69 | + required = parser.add_argument_group('required arguments') |
| 70 | + parser.add_argument('-c', '--config', help='Configuration file location.') |
| 71 | + required.add_argument('-p', '--plugin', help='Data import plugin name.', required=True) |
| 72 | + required.add_argument('-a', '--applicable_to', |
| 73 | + help='Systems that the detections are applicable to (comma seperated list).', |
| 74 | + required=True) |
| 75 | + parser.add_argument('-d', '--domain', |
| 76 | + help='The ATT&CK domain (default = enterprise). This argument is ignored if a domain is specified in the YAML file.', |
| 77 | + required=False, choices=['enterprise', 'ics', 'mobile']) |
| 78 | + parser.add_argument('-i', '--input_file', help='YAML filename for input.', default=None) |
| 79 | + parser.add_argument('-o', '--output_file', help='YAML filename for output.', default=None) |
| 80 | + parser.add_argument('-n', '--name', help='Value for the name attribute in the YAML file.', default=None) |
| 81 | + parser.add_argument('-s', '--stix_location', help='Local STIX repository location.', default=None) |
| 82 | + |
| 83 | + parser.add_argument('-ch', '--check_unused', action='store_true', help='Check unused detections.') |
| 84 | + parser.add_argument('-cl', '--clean_unused', action='store_true', help='Clean unused detections.') |
| 85 | + |
| 86 | + def _get_argument_values_from_config_file(self) -> dict: |
| 87 | + """ |
| 88 | + Read the command line arguments from the config file if applicable |
| 89 | + """ |
| 90 | + config_file_name = self._get_raw_commandline(['-c', '--config']) |
| 91 | + if config_file_name: |
| 92 | + print(f'Reading settings from "{config_file_name}".') |
| 93 | + with open(config_file_name, 'r') as f: |
| 94 | + config_file_arguments = json.load(f) |
| 95 | + else: |
| 96 | + config_file_arguments = {} |
| 97 | + return config_file_arguments |
| 98 | + |
| 99 | + @staticmethod |
| 100 | + def process_techniques(applicable_to: list, arguments: Namespace, plugin: TechniqueBase) -> tuple: |
| 101 | + """ |
| 102 | + Process all techniques from the source system |
| 103 | + """ |
| 104 | + # Get the technique data |
| 105 | + techniques = plugin.get_attack_techniques(applicable_to) |
| 106 | + # Convert data to yaml |
| 107 | + print('Generating techniques YAML file.') |
| 108 | + dettect = DettectTechniquesAdministration(arguments.input_file, domain=arguments.domain, |
| 109 | + local_stix_path=arguments.stix_location) |
| 110 | + |
| 111 | + location_prefix_unused_detections = arguments.location_prefix if arguments.clean_unused_location_prefix else '' |
| 112 | + |
| 113 | + warnings, results = dettect.update_detections(techniques, check_unused_detections=arguments.check_unused, |
| 114 | + clean_unused_detections=arguments.clean_unused, |
| 115 | + location_prefix_unused_detections=location_prefix_unused_detections) |
| 116 | + return dettect, results, warnings |
| 117 | + |
| 118 | + @staticmethod |
| 119 | + def process_datasource(applicable_to: list, arguments: Namespace, plugin: DatasourceBase) -> tuple: |
| 120 | + """ |
| 121 | + Process all data sources from the source system |
| 122 | + """ |
| 123 | + # Get the data source data |
| 124 | + datasources = plugin.get_attack_datasources(applicable_to) |
| 125 | + # Convert data to yaml |
| 126 | + print('Generating datasources YAML file.') |
| 127 | + dettect = DettectDataSourcesAdministration(arguments.input_file, domain=arguments.domain, |
| 128 | + local_stix_path=arguments.stix_location) |
| 129 | + warnings, results = dettect.update_data_sources(datasources, check_unused_data_sources=arguments.check_unused, |
| 130 | + clean_unused_data_sources=arguments.clean_unused) |
| 131 | + return dettect, results, warnings |
| 132 | + |
| 133 | + def start(self) -> None: |
| 134 | + """ |
| 135 | + Dettectinator has been started from the commandline. |
| 136 | + Process the command line arguments and launch the appropriate plugin. |
| 137 | + """ |
| 138 | + |
| 139 | + # Load default argument values from the config file |
| 140 | + config_file_arguments = self._get_argument_values_from_config_file() |
| 141 | + |
| 142 | + # Retrieve all available plugins |
| 143 | + plugins = self._get_plugins() |
| 144 | + |
| 145 | + # Get the plugin name from the arguments |
| 146 | + plugin_name = config_file_arguments.get('plugin', self._get_raw_commandline(['-p', '--plugin'])) |
| 147 | + |
| 148 | + if plugin_name: |
| 149 | + # Get the plugin class if it exists |
| 150 | + if plugin_name in plugins.keys(): |
| 151 | + plugin_class = getattr(plugins[plugin_name], plugin_name) |
| 152 | + print(f'Plugin "{plugin_name}" has been found.') |
| 153 | + else: |
| 154 | + print(f'data import plugin "{plugin_name}" does not exist. Valid plugins:') |
| 155 | + self._print_plugins(plugins) |
| 156 | + sys.exit() |
| 157 | + |
| 158 | + # Add the default command line params |
| 159 | + parser = ArgumentParser(add_help=True, conflict_handler='error', ) |
| 160 | + self._set_default_params(parser) |
| 161 | + |
| 162 | + # Add the parameters from the plugin |
| 163 | + plugin_group = parser.add_argument_group(plugin_name) |
| 164 | + plugin_class.set_plugin_params(plugin_group) |
| 165 | + |
| 166 | + # Set the default values from the config file |
| 167 | + # Default and required don't work together, so set required to False |
| 168 | + parser.set_defaults(**config_file_arguments) |
| 169 | + for action in parser._actions: |
| 170 | + if action.dest in config_file_arguments.keys(): |
| 171 | + action.required = False |
| 172 | + |
| 173 | + # Evaluate command line arguments |
| 174 | + arguments = parser.parse_args() |
| 175 | + applicable_to = [at.strip() for at in arguments.applicable_to.split(',')] |
| 176 | + output_file = arguments.output_file or arguments.input_file |
| 177 | + |
| 178 | + # Read the data from the source |
| 179 | + print(f'Using "{plugin_name}" to collect data.') |
| 180 | + plugin = plugin_class(vars(arguments)) |
| 181 | + |
| 182 | + if plugin_name.startswith('Technique'): |
| 183 | + dettect, results, warnings = self.process_techniques(applicable_to, arguments, plugin) |
| 184 | + else: |
| 185 | + dettect, results, warnings = self.process_datasource(applicable_to, arguments, plugin) |
| 186 | + |
| 187 | + if arguments.name: |
| 188 | + dettect.set_name(arguments.name) |
| 189 | + |
| 190 | + dettect.save_yaml_file(output_file) |
| 191 | + print(f'DeTT&CT YAML file written: {dettect.get_filename()}') |
| 192 | + |
| 193 | + output = warnings + results |
| 194 | + if len(output) > 0: |
| 195 | + print('\nPlease review the following items:') |
| 196 | + print(' - ' + '\n - '.join(output)) |
| 197 | + else: |
| 198 | + print('Please specify a valid data import plugin using the "-p" argument:') |
| 199 | + self._print_plugins(plugins) |
0 commit comments