Skip to content
This repository has been archived by the owner on Jan 27, 2025. It is now read-only.

Commit

Permalink
feat: fallbacks and process warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
madebyjake committed Dec 20, 2024
1 parent 2823bfc commit 2b6e539
Showing 1 changed file with 44 additions and 15 deletions.
59 changes: 44 additions & 15 deletions md5sift.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env python3

import os
import hashlib
import csv
Expand All @@ -14,16 +16,18 @@
def calculate_hash(file_path: str, algorithm: str, verbose: bool = False) -> Tuple[str, Optional[str], Optional[str]]:
"""Calculate the hash of a file using the specified algorithm."""
# Select the hash function based on the provided algorithm
if algorithm.lower() == 'md5':
algorithm_lower = algorithm.lower()
if algorithm_lower == 'md5':
hash_func = hashlib.md5()
elif algorithm.lower() == 'sha1':
elif algorithm_lower == 'sha1':
hash_func = hashlib.sha1()
elif algorithm.lower() == 'sha256':
elif algorithm_lower == 'sha256':
hash_func = hashlib.sha256()
else:
# Log an error if the algorithm is unsupported
logging.error(f"Unsupported algorithm: {algorithm}")
return file_path, None, None
# Unsupported algorithm: log warning and fallback to MD5
logging.warning(f"Unsupported algorithm: {algorithm}. Falling back to MD5.")
hash_func = hashlib.md5()

try:
if verbose:
logging.info(f"Calculating {algorithm.upper()} for: {file_path}")
Expand Down Expand Up @@ -64,25 +68,35 @@ def walk_directory_and_log(
file_extension: Optional[str] = None,
file_names: Optional[Set[str]] = None,
verbose: bool = False,
limit: Optional[int] = None
limit: Optional[int] = None,
threads: Optional[int] = None
) -> None:
"""Walk through the directory, calculate hash, and log to CSV."""
os.makedirs(os.path.dirname(csv_file_path), exist_ok=True)

processed_files_count = 0

try:
with open(csv_file_path, mode="w", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["File Name", "Hash", "Last Modified Time"])

with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
max_workers = threads if threads else os.cpu_count()

if verbose and threads:
logging.info(f"Using {threads} threads for processing.")

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
count = 0
for root, dirs, files in os.walk(directory):
# Exclude specified directories
dirs[:] = [d for d in dirs if os.path.join(root, d) not in exclude_paths]
# Exclude specified directories if provided
if exclude_paths:
dirs[:] = [d for d in dirs if os.path.join(root, d) not in exclude_paths]

for file in files:
file_path = os.path.join(root, file)
if file_path in exclude_paths:
if exclude_paths and file_path in exclude_paths:
continue
if file_extension and not file.endswith(file_extension):
continue
Expand All @@ -100,13 +114,18 @@ def walk_directory_and_log(
file_path, hash_value, modified_time = future.result()
if hash_value:
csv_writer.writerow([file_path, hash_value, modified_time])
processed_files_count += 1

if verbose:
if verbose and file_path:
logging.info(f"Processed {file_path}")

except Exception as e:
logging.error(f"Error during directory walk or file writing: {e}")

# If no files were successfully processed, log a warning
if processed_files_count == 0:
logging.warning("No files were processed. Check your filters, directory, or exclude paths.")


def main():
# Set up the argument parser for command-line options
Expand Down Expand Up @@ -142,7 +161,7 @@ def main():
file_extension = args.extension
verbose = args.verbose
algorithm = args.algorithm
exclude_paths = set(args.exclude)
exclude_paths = set(args.exclude) if args.exclude else None

file_names = None
if args.filelist:
Expand All @@ -164,7 +183,17 @@ def main():
logging.info(f"Filtering for specific file names from: {args.filelist}")
logging.info(f"Writing output to: {output_csv_path}")

walk_directory_and_log(scan_path, output_csv_path, algorithm, exclude_paths, file_extension, file_names, verbose, limit=args.test)
walk_directory_and_log(
directory=scan_path,
csv_file_path=output_csv_path,
algorithm=algorithm,
exclude_paths=exclude_paths,
file_extension=file_extension,
file_names=file_names,
verbose=verbose,
limit=args.test,
threads=args.threads
)

if verbose:
logging.info(f"File report generated: {output_csv_path}")
Expand All @@ -173,4 +202,4 @@ def main():


if __name__ == "__main__":
main()
main()

0 comments on commit 2b6e539

Please # to comment.