diff --git a/app/modules/code_provider/github/github_service.py b/app/modules/code_provider/github/github_service.py index dca1ae75..651215f8 100644 --- a/app/modules/code_provider/github/github_service.py +++ b/app/modules/code_provider/github/github_service.py @@ -2,6 +2,7 @@ import logging import os import random +import re from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, List, Optional, Tuple @@ -152,13 +153,34 @@ def get_github_oauth_token(self, uid: str) -> str: raise HTTPException(status_code=404, detail="User not found") return user.provider_info["access_token"] + def _parse_link_header(self, link_header: str) -> Dict[str, str]: + """Parse GitHub Link header to extract pagination URLs.""" + links = {} + if not link_header: + return links + + for link in link_header.split(","): + parts = link.strip().split(";") + if len(parts) < 2: + continue + url = parts[0].strip()[1:-1] # Remove < and > + for p in parts[1:]: + if 'rel=' in p: + rel = p.strip().split('=')[1].strip('"') + links[rel] = url + break + return links + async def get_repos_for_user(self, user_id: str): + import time # Import the time module + + start_time = time.time() # Start timing the entire method try: user = self.db.query(User).filter(User.uid == user_id).first() if user is None: raise HTTPException(status_code=404, detail="User not found") - firebase_uid = user.uid # Assuming `uid` is the Firebase UID + firebase_uid = user.uid github_username = user.provider_username if not github_username: @@ -166,19 +188,17 @@ async def get_repos_for_user(self, user_id: str): status_code=400, detail="GitHub username not found for this user" ) - # Retrieve GitHub OAuth token from Firestore github_oauth_token = self.get_github_oauth_token(firebase_uid) if not github_oauth_token: raise HTTPException( status_code=400, detail="GitHub OAuth token not found for this user" ) - # Initialize GitHub client with user's OAuth token user_github = Github(github_oauth_token) + user_orgs = user_github.get_user().get_orgs() org_logins = [org.login.lower() for org in user_orgs] - # Authenticate as GitHub App private_key = ( "-----BEGIN RSA PRIVATE KEY-----\n" + config_provider.get_github_key() @@ -189,7 +209,6 @@ async def get_repos_for_user(self, user_id: str): auth = AppAuth(app_id=app_id, private_key=private_key) jwt = auth.create_jwt() - # Fetch all installations with pagination all_installations = [] base_url = "https://api.github.com/app/installations" headers = { @@ -200,7 +219,7 @@ async def get_repos_for_user(self, user_id: str): async with aiohttp.ClientSession() as session: # Get first page to determine total pages - async with session.get(base_url, headers=headers) as response: + async with session.get(f"{base_url}?per_page=100", headers=headers) as response: if response.status != 200: error_text = await response.text() logger.error(f"Failed to get installations. Response: {error_text}") @@ -212,21 +231,18 @@ async def get_repos_for_user(self, user_id: str): # Extract last page number from Link header last_page = 1 if "Link" in response.headers: - link_header = response.headers["Link"] - links = requests.utils.parse_header_links(link_header) - for link in links: - if link["rel"] == "last": - last_page = int(link["url"].split("page=")[-1]) - break - - # Add first page's data + links = self._parse_link_header(response.headers["Link"]) + if "last" in links: + last_url = links["last"] + match = re.search(r"[?&]page=(\d+)", last_url) + if match: + last_page = int(match.group(1)) + first_page_data = await response.json() all_installations.extend(first_page_data) - logger.info(f"Total pages to fetch: {last_page}") - - # Generate remaining page URLs (skip page 1 since we already have it) - page_urls = [f"{base_url}?page={page}" for page in range(2, last_page + 1)] + # Generate remaining page URLs (skip page 1) + page_urls = [f"{base_url}?page={page}&per_page=100" for page in range(2, last_page + 1)] # Process URLs in batches of 10 async def fetch_page(url): @@ -234,7 +250,6 @@ async def fetch_page(url): async with session.get(url, headers=headers) as response: if response.status == 200: installations = await response.json() - logger.info(f"Fetched {len(installations)} installations from {url}") return installations else: error_text = await response.text() @@ -252,72 +267,90 @@ async def fetch_page(url): for installations in batch_results: all_installations.extend(installations) - logger.info(f"Final number of installations collected: {len(all_installations)}") - - # Filter installations: user's personal installation + org installations where user is a member - user_installations = [] - for installation in all_installations: - account = installation["account"] - account_login = account["login"].lower() - account_type = account["type"] # 'User' or 'Organization' - - if account_type == "User" and account_login == github_username.lower(): - user_installations.append(installation) - elif account_type == "Organization" and account_login in org_logins: - user_installations.append(installation) - - - repos = [] - for installation in user_installations: - app_auth = auth.get_installation_auth(installation["id"]) - github = Github(auth=app_auth) - repos_url = installation["repositories_url"] - - # Handle pagination for repositories as well - next_repos_url = repos_url - while next_repos_url: - repos_response = requests.get( - next_repos_url, - headers={"Authorization": f"Bearer {app_auth.token}"} - ) - - if repos_response.status_code == 200: - repos.extend(repos_response.json().get("repositories", [])) - - # Check for next page in Link header - next_repos_url = None - if "links" in repos_response.__dict__: - for link in repos_response.links.values(): - if link.get("rel") == "next": - next_repos_url = link.get("url") - break - else: - logger.error( - f"Failed to fetch repositories for installation ID {installation['id']}. Response: {repos_response.text}" - ) - - # Remove duplicate repositories if any - unique_repos = {repo["id"]: repo for repo in repos}.values() - - repo_list = [ - { - "id": repo["id"], - "name": repo["name"], - "full_name": repo["full_name"], - "private": repo["private"], - "url": repo["html_url"], - "owner": repo["owner"]["login"], - } - for repo in unique_repos - ] - - return {"repositories": repo_list} + # Filter installations + user_installations = [] + for installation in all_installations: + account = installation["account"] + account_login = account["login"].lower() + account_type = account["type"] + + if account_type == "User" and account_login == github_username.lower(): + user_installations.append(installation) + elif account_type == "Organization" and account_login in org_logins: + user_installations.append(installation) + + # Fetch repositories for each installation + repos = [] + for installation in user_installations: + app_auth = auth.get_installation_auth(installation["id"]) + repos_url = installation["repositories_url"] + github = Github(auth=app_auth) # do not remove this line + auth_headers = {"Authorization": f"Bearer {app_auth.token}"} + + async with session.get(f"{repos_url}?per_page=100", headers=auth_headers) as response: + if response.status != 200: + logger.error( + f"Failed to fetch repositories for installation ID {installation['id']}. Response: {await response.text()}" + ) + continue + + first_page_data = await response.json() + repos.extend(first_page_data.get("repositories", [])) + + # Get last page from Link header + last_page = 1 + if "Link" in response.headers: + links = self._parse_link_header(response.headers["Link"]) + if "last" in links: + last_url = links["last"] + match = re.search(r"[?&]page=(\d+)", last_url) + if match: + last_page = int(match.group(1)) + + if last_page > 1: + # Generate remaining page URLs (skip page 1) + page_urls = [f"{repos_url}?page={page}&per_page=100" for page in range(2, last_page + 1)] + + # Process URLs in batches of 10 + for i in range(0, len(page_urls), 10): + batch = page_urls[i:i + 10] + tasks = [session.get(url, headers=auth_headers) for url in batch] + responses = await asyncio.gather(*tasks) + + for response in responses: + async with response: + if response.status == 200: + page_data = await response.json() + repos.extend(page_data.get("repositories", [])) + else: + logger.error( + f"Failed to fetch repositories page. Response: {await response.text()}" + ) + + # Remove duplicate repositories + unique_repos = {repo["id"]: repo for repo in repos}.values() + repo_list = [ + { + "id": repo["id"], + "name": repo["name"], + "full_name": repo["full_name"], + "private": repo["private"], + "url": repo["html_url"], + "owner": repo["owner"]["login"], + } + for repo in unique_repos + ] + + return {"repositories": repo_list} except Exception as e: logger.error(f"Failed to fetch repositories: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail=f"Failed to fetch repositories: {str(e)}" ) + finally: + total_duration = time.time() - start_time # Calculate total duration + logger.info(f"get_repos_for_user executed in {total_duration:.2f} seconds") # Log total duration async def get_combined_user_repos(self, user_id: str): subquery = (