Skip to content

Commit

Permalink
feat: add GitExtractor component
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelchristi committed Dec 26, 2024
1 parent b4673d3 commit b077360
Showing 1 changed file with 202 additions and 0 deletions.
202 changes: 202 additions & 0 deletions src/backend/base/langflow/components/git/gitextractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@

from langflow.custom import Component
from langflow.schema import Data
from langflow.schema.message import Message
from langflow.io import MessageTextInput, Output
import git
import os
import tempfile
import shutil

class GitExtractorComponent(Component):

Check failure on line 11 in src/backend/base/langflow/components/git/gitextractor.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (I001)

src/backend/base/langflow/components/git/gitextractor.py:2:1: I001 Import block is un-sorted or un-formatted
display_name = "GitExtractor"
description = "Analyzes a Git repository and returns file contents and complete repository information"
icon = "GitLoader"

inputs = [
MessageTextInput(
name="repository_url",
display_name="Repository URL",
info="URL of the Git repository (e.g., https://github.com/username/repo)",
value="",
),
]

outputs = [
Output(display_name="Text-Based File Contents", name="text_based_file_contents", method="get_text_based_file_contents"),

Check failure on line 26 in src/backend/base/langflow/components/git/gitextractor.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (E501)

src/backend/base/langflow/components/git/gitextractor.py:26:121: E501 Line too long (128 > 120)
Output(display_name="Directory Structure", name="directory_structure", method="get_directory_structure"),
Output(display_name="Repository Info", name="repository_info", method="get_repository_info"),
Output(display_name="Statistics", name="statistics", method="get_statistics"),
Output(display_name="Files Content", name="files_content", method="get_files_content"),

Check failure on line 31 in src/backend/base/langflow/components/git/gitextractor.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (W293)

src/backend/base/langflow/components/git/gitextractor.py:31:1: W293 Blank line contains whitespace
]

def get_repository_info(self) -> list[Data]:
try:
temp_dir = tempfile.mkdtemp()
try:
repo = git.Repo.clone_from(self.repository_url, temp_dir)
repo_info = {
"name": self.repository_url.split('/')[-1],

Check failure on line 40 in src/backend/base/langflow/components/git/gitextractor.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (Q000)

src/backend/base/langflow/components/git/gitextractor.py:40:55: Q000 Single quotes found but double quotes preferred
"url": self.repository_url,
"default_branch": repo.active_branch.name,
"remote_urls": [remote.url for remote in repo.remotes],
"last_commit": {
"hash": repo.head.commit.hexsha,
"author": str(repo.head.commit.author),
"message": repo.head.commit.message.strip(),
"date": str(repo.head.commit.committed_datetime)
},
"branches": [str(branch) for branch in repo.branches]
}
result = [Data(data=repo_info)]
self.status = result
return result
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:

Check failure on line 57 in src/backend/base/langflow/components/git/gitextractor.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (BLE001)

src/backend/base/langflow/components/git/gitextractor.py:57:16: BLE001 Do not catch blind exception: `Exception`
error_result = [Data(data={"error": f"Error getting repository info: {str(e)}"})]

Check failure on line 58 in src/backend/base/langflow/components/git/gitextractor.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (RUF010)

src/backend/base/langflow/components/git/gitextractor.py:58:83: RUF010 Use explicit conversion flag
self.status = error_result
return error_result

def get_statistics(self) -> list[Data]:
try:
temp_dir = tempfile.mkdtemp()
try:
git.Repo.clone_from(self.repository_url, temp_dir)
total_files = 0
total_size = 0
total_lines = 0
binary_files = 0
directories = 0
for root, dirs, files in os.walk(temp_dir):
total_files += len(files)
directories += len(dirs)
for file in files:
file_path = os.path.join(root, file)

Check failure on line 76 in src/backend/base/langflow/components/git/gitextractor.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (PTH118)

src/backend/base/langflow/components/git/gitextractor.py:76:37: PTH118 `os.path.join()` should be replaced by `Path` with `/` operator
total_size += os.path.getsize(file_path)

Check failure on line 77 in src/backend/base/langflow/components/git/gitextractor.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (PTH202)

src/backend/base/langflow/components/git/gitextractor.py:77:39: PTH202 `os.path.getsize` should be replaced by `Path.stat().st_size`
try:
with open(file_path, 'r', encoding='utf-8') as f:

Check failure on line 79 in src/backend/base/langflow/components/git/gitextractor.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (UP015)

src/backend/base/langflow/components/git/gitextractor.py:79:34: UP015 Unnecessary open mode parameters

Check failure on line 79 in src/backend/base/langflow/components/git/gitextractor.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (PTH123)

src/backend/base/langflow/components/git/gitextractor.py:79:34: PTH123 `open()` should be replaced by `Path.open()`
total_lines += sum(1 for _ in f)
except UnicodeDecodeError:
binary_files += 1
statistics = {
"total_files": total_files,
"total_size_bytes": total_size,
"total_size_kb": round(total_size / 1024, 2),
"total_size_mb": round(total_size / (1024 * 1024), 2),
"total_lines": total_lines,
"binary_files": binary_files,
"directories": directories
}
result = [Data(data=statistics)]
self.status = result
return result
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
error_result = [Data(data={"error": f"Error calculating statistics: {str(e)}"})]
self.status = error_result
return error_result

def get_directory_structure(self) -> Message:
try:
temp_dir = tempfile.mkdtemp()
try:
git.Repo.clone_from(self.repository_url, temp_dir)
tree = ["Directory structure:"]
for root, dirs, files in os.walk(temp_dir):
level = root.replace(temp_dir, '').count(os.sep)
indent = ' ' * level
if level == 0:
tree.append(f"└── {os.path.basename(root)}")
else:
tree.append(f"{indent}├── {os.path.basename(root)}")
subindent = ' ' * (level + 1)
for f in files:
tree.append(f"{subindent}├── {f}")
directory_structure = '\n'.join(tree)
self.status = directory_structure
return Message(text=directory_structure)
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
error_message = f"Error getting directory structure: {str(e)}"
self.status = error_message
return Message(text=error_message)

def get_files_content(self) -> list[Data]:
try:
temp_dir = tempfile.mkdtemp()
try:
git.Repo.clone_from(self.repository_url, temp_dir)
content_list = []
for root, _, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, temp_dir)
file_size = os.path.getsize(file_path)
try:
with open(file_path, 'r', encoding='utf-8') as f:
file_content = f.read()
except UnicodeDecodeError:
file_content = "[BINARY FILE]"
content_list.append(Data(data={
"path": relative_path,
"size": file_size,
"content": file_content
}))
self.status = content_list
return content_list
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
error_result = [Data(data={"error": f"Error getting files content: {str(e)}"})]
self.status = error_result
return error_result

def get_text_based_file_contents(self) -> Message:
try:
temp_dir = tempfile.mkdtemp()
try:
git.Repo.clone_from(self.repository_url, temp_dir)
content_list = ["(Files content cropped to 300k characters, download full ingest to see more)"]
total_chars = 0
char_limit = 300000

for root, _, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, temp_dir)
content_list.append("=" * 50)
content_list.append(f"File: /{relative_path}")
content_list.append("=" * 50)

try:
with open(file_path, 'r', encoding='utf-8') as f:
file_content = f.read()
if total_chars + len(file_content) > char_limit:
remaining_chars = char_limit - total_chars
file_content = file_content[:remaining_chars] + "\n... (content truncated)"
content_list.append(file_content)
total_chars += len(file_content)
except UnicodeDecodeError:
content_list.append("[BINARY FILE]")

content_list.append("") # Add an empty line between files

if total_chars >= char_limit:
break

if total_chars >= char_limit:
break

text_content = "\n".join(content_list)
self.status = text_content
return Message(text=text_content)
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
error_message = f"Error getting text-based file contents: {str(e)}"
self.status = error_message
return Message(text=error_message)

0 comments on commit b077360

Please # to comment.