From 2b6e5397d7b1ff3dd337fb5ac161a059ecd1b333 Mon Sep 17 00:00:00 2001 From: Jake Wells <38321395+madebyjake@users.noreply.github.com> Date: Fri, 20 Dec 2024 12:53:08 -0500 Subject: [PATCH] feat: fallbacks and process warnings --- md5sift.py | 59 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 15 deletions(-) diff --git a/md5sift.py b/md5sift.py index 84f9148..bd658b3 100644 --- a/md5sift.py +++ b/md5sift.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + import os import hashlib import csv @@ -14,16 +16,18 @@ def calculate_hash(file_path: str, algorithm: str, verbose: bool = False) -> Tuple[str, Optional[str], Optional[str]]: """Calculate the hash of a file using the specified algorithm.""" # Select the hash function based on the provided algorithm - if algorithm.lower() == 'md5': + algorithm_lower = algorithm.lower() + if algorithm_lower == 'md5': hash_func = hashlib.md5() - elif algorithm.lower() == 'sha1': + elif algorithm_lower == 'sha1': hash_func = hashlib.sha1() - elif algorithm.lower() == 'sha256': + elif algorithm_lower == 'sha256': hash_func = hashlib.sha256() else: - # Log an error if the algorithm is unsupported - logging.error(f"Unsupported algorithm: {algorithm}") - return file_path, None, None + # Unsupported algorithm: log warning and fallback to MD5 + logging.warning(f"Unsupported algorithm: {algorithm}. Falling back to MD5.") + hash_func = hashlib.md5() + try: if verbose: logging.info(f"Calculating {algorithm.upper()} for: {file_path}") @@ -64,25 +68,35 @@ def walk_directory_and_log( file_extension: Optional[str] = None, file_names: Optional[Set[str]] = None, verbose: bool = False, - limit: Optional[int] = None + limit: Optional[int] = None, + threads: Optional[int] = None ) -> None: """Walk through the directory, calculate hash, and log to CSV.""" os.makedirs(os.path.dirname(csv_file_path), exist_ok=True) + processed_files_count = 0 + try: with open(csv_file_path, mode="w", newline="") as csv_file: csv_writer = csv.writer(csv_file) csv_writer.writerow(["File Name", "Hash", "Last Modified Time"]) - with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: + max_workers = threads if threads else os.cpu_count() + + if verbose and threads: + logging.info(f"Using {threads} threads for processing.") + + with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] count = 0 for root, dirs, files in os.walk(directory): - # Exclude specified directories - dirs[:] = [d for d in dirs if os.path.join(root, d) not in exclude_paths] + # Exclude specified directories if provided + if exclude_paths: + dirs[:] = [d for d in dirs if os.path.join(root, d) not in exclude_paths] + for file in files: file_path = os.path.join(root, file) - if file_path in exclude_paths: + if exclude_paths and file_path in exclude_paths: continue if file_extension and not file.endswith(file_extension): continue @@ -100,13 +114,18 @@ def walk_directory_and_log( file_path, hash_value, modified_time = future.result() if hash_value: csv_writer.writerow([file_path, hash_value, modified_time]) + processed_files_count += 1 - if verbose: + if verbose and file_path: logging.info(f"Processed {file_path}") except Exception as e: logging.error(f"Error during directory walk or file writing: {e}") + # If no files were successfully processed, log a warning + if processed_files_count == 0: + logging.warning("No files were processed. Check your filters, directory, or exclude paths.") + def main(): # Set up the argument parser for command-line options @@ -142,7 +161,7 @@ def main(): file_extension = args.extension verbose = args.verbose algorithm = args.algorithm - exclude_paths = set(args.exclude) + exclude_paths = set(args.exclude) if args.exclude else None file_names = None if args.filelist: @@ -164,7 +183,17 @@ def main(): logging.info(f"Filtering for specific file names from: {args.filelist}") logging.info(f"Writing output to: {output_csv_path}") - walk_directory_and_log(scan_path, output_csv_path, algorithm, exclude_paths, file_extension, file_names, verbose, limit=args.test) + walk_directory_and_log( + directory=scan_path, + csv_file_path=output_csv_path, + algorithm=algorithm, + exclude_paths=exclude_paths, + file_extension=file_extension, + file_names=file_names, + verbose=verbose, + limit=args.test, + threads=args.threads + ) if verbose: logging.info(f"File report generated: {output_csv_path}") @@ -173,4 +202,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file