Skip to content

Commit

Permalink
Merge pull request #248 from potpie-ai/pagination
Browse files Browse the repository at this point in the history
Update fetch user repo logic
  • Loading branch information
dhirenmathur authored Feb 9, 2025
2 parents a07007c + b024861 commit b0e1086
Showing 1 changed file with 112 additions and 79 deletions.
191 changes: 112 additions & 79 deletions app/modules/code_provider/github/github_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import random
import re
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Tuple

Expand Down Expand Up @@ -152,33 +153,52 @@ def get_github_oauth_token(self, uid: str) -> str:
raise HTTPException(status_code=404, detail="User not found")
return user.provider_info["access_token"]

def _parse_link_header(self, link_header: str) -> Dict[str, str]:
"""Parse GitHub Link header to extract pagination URLs."""
links = {}
if not link_header:
return links

for link in link_header.split(","):
parts = link.strip().split(";")
if len(parts) < 2:
continue
url = parts[0].strip()[1:-1] # Remove < and >
for p in parts[1:]:
if 'rel=' in p:
rel = p.strip().split('=')[1].strip('"')
links[rel] = url
break
return links

async def get_repos_for_user(self, user_id: str):
import time # Import the time module

start_time = time.time() # Start timing the entire method
try:
user = self.db.query(User).filter(User.uid == user_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")

firebase_uid = user.uid # Assuming `uid` is the Firebase UID
firebase_uid = user.uid
github_username = user.provider_username

if not github_username:
raise HTTPException(
status_code=400, detail="GitHub username not found for this user"
)

# Retrieve GitHub OAuth token from Firestore
github_oauth_token = self.get_github_oauth_token(firebase_uid)
if not github_oauth_token:
raise HTTPException(
status_code=400, detail="GitHub OAuth token not found for this user"
)

# Initialize GitHub client with user's OAuth token
user_github = Github(github_oauth_token)

user_orgs = user_github.get_user().get_orgs()
org_logins = [org.login.lower() for org in user_orgs]

# Authenticate as GitHub App
private_key = (
"-----BEGIN RSA PRIVATE KEY-----\n"
+ config_provider.get_github_key()
Expand All @@ -189,7 +209,6 @@ async def get_repos_for_user(self, user_id: str):
auth = AppAuth(app_id=app_id, private_key=private_key)
jwt = auth.create_jwt()

# Fetch all installations with pagination
all_installations = []
base_url = "https://api.github.com/app/installations"
headers = {
Expand All @@ -200,7 +219,7 @@ async def get_repos_for_user(self, user_id: str):

async with aiohttp.ClientSession() as session:
# Get first page to determine total pages
async with session.get(base_url, headers=headers) as response:
async with session.get(f"{base_url}?per_page=100", headers=headers) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Failed to get installations. Response: {error_text}")
Expand All @@ -212,29 +231,25 @@ async def get_repos_for_user(self, user_id: str):
# Extract last page number from Link header
last_page = 1
if "Link" in response.headers:
link_header = response.headers["Link"]
links = requests.utils.parse_header_links(link_header)
for link in links:
if link["rel"] == "last":
last_page = int(link["url"].split("page=")[-1])
break

# Add first page's data
links = self._parse_link_header(response.headers["Link"])
if "last" in links:
last_url = links["last"]
match = re.search(r"[?&]page=(\d+)", last_url)
if match:
last_page = int(match.group(1))

first_page_data = await response.json()
all_installations.extend(first_page_data)

logger.info(f"Total pages to fetch: {last_page}")

# Generate remaining page URLs (skip page 1 since we already have it)
page_urls = [f"{base_url}?page={page}" for page in range(2, last_page + 1)]
# Generate remaining page URLs (skip page 1)
page_urls = [f"{base_url}?page={page}&per_page=100" for page in range(2, last_page + 1)]

# Process URLs in batches of 10
async def fetch_page(url):
try:
async with session.get(url, headers=headers) as response:
if response.status == 200:
installations = await response.json()
logger.info(f"Fetched {len(installations)} installations from {url}")
return installations
else:
error_text = await response.text()
Expand All @@ -252,72 +267,90 @@ async def fetch_page(url):
for installations in batch_results:
all_installations.extend(installations)

logger.info(f"Final number of installations collected: {len(all_installations)}")

# Filter installations: user's personal installation + org installations where user is a member
user_installations = []
for installation in all_installations:
account = installation["account"]
account_login = account["login"].lower()
account_type = account["type"] # 'User' or 'Organization'

if account_type == "User" and account_login == github_username.lower():
user_installations.append(installation)
elif account_type == "Organization" and account_login in org_logins:
user_installations.append(installation)


repos = []
for installation in user_installations:
app_auth = auth.get_installation_auth(installation["id"])
github = Github(auth=app_auth)
repos_url = installation["repositories_url"]

# Handle pagination for repositories as well
next_repos_url = repos_url
while next_repos_url:
repos_response = requests.get(
next_repos_url,
headers={"Authorization": f"Bearer {app_auth.token}"}
)

if repos_response.status_code == 200:
repos.extend(repos_response.json().get("repositories", []))

# Check for next page in Link header
next_repos_url = None
if "links" in repos_response.__dict__:
for link in repos_response.links.values():
if link.get("rel") == "next":
next_repos_url = link.get("url")
break
else:
logger.error(
f"Failed to fetch repositories for installation ID {installation['id']}. Response: {repos_response.text}"
)

# Remove duplicate repositories if any
unique_repos = {repo["id"]: repo for repo in repos}.values()

repo_list = [
{
"id": repo["id"],
"name": repo["name"],
"full_name": repo["full_name"],
"private": repo["private"],
"url": repo["html_url"],
"owner": repo["owner"]["login"],
}
for repo in unique_repos
]

return {"repositories": repo_list}
# Filter installations
user_installations = []
for installation in all_installations:
account = installation["account"]
account_login = account["login"].lower()
account_type = account["type"]

if account_type == "User" and account_login == github_username.lower():
user_installations.append(installation)
elif account_type == "Organization" and account_login in org_logins:
user_installations.append(installation)

# Fetch repositories for each installation
repos = []
for installation in user_installations:
app_auth = auth.get_installation_auth(installation["id"])
repos_url = installation["repositories_url"]
github = Github(auth=app_auth) # do not remove this line
auth_headers = {"Authorization": f"Bearer {app_auth.token}"}

async with session.get(f"{repos_url}?per_page=100", headers=auth_headers) as response:
if response.status != 200:
logger.error(
f"Failed to fetch repositories for installation ID {installation['id']}. Response: {await response.text()}"
)
continue

first_page_data = await response.json()
repos.extend(first_page_data.get("repositories", []))

# Get last page from Link header
last_page = 1
if "Link" in response.headers:
links = self._parse_link_header(response.headers["Link"])
if "last" in links:
last_url = links["last"]
match = re.search(r"[?&]page=(\d+)", last_url)
if match:
last_page = int(match.group(1))

if last_page > 1:
# Generate remaining page URLs (skip page 1)
page_urls = [f"{repos_url}?page={page}&per_page=100" for page in range(2, last_page + 1)]

# Process URLs in batches of 10
for i in range(0, len(page_urls), 10):
batch = page_urls[i:i + 10]
tasks = [session.get(url, headers=auth_headers) for url in batch]
responses = await asyncio.gather(*tasks)

for response in responses:
async with response:
if response.status == 200:
page_data = await response.json()
repos.extend(page_data.get("repositories", []))
else:
logger.error(
f"Failed to fetch repositories page. Response: {await response.text()}"
)

# Remove duplicate repositories
unique_repos = {repo["id"]: repo for repo in repos}.values()
repo_list = [
{
"id": repo["id"],
"name": repo["name"],
"full_name": repo["full_name"],
"private": repo["private"],
"url": repo["html_url"],
"owner": repo["owner"]["login"],
}
for repo in unique_repos
]

return {"repositories": repo_list}

except Exception as e:
logger.error(f"Failed to fetch repositories: {str(e)}", exc_info=True)
raise HTTPException(
status_code=500, detail=f"Failed to fetch repositories: {str(e)}"
)
finally:
total_duration = time.time() - start_time # Calculate total duration
logger.info(f"get_repos_for_user executed in {total_duration:.2f} seconds") # Log total duration

async def get_combined_user_repos(self, user_id: str):
subquery = (
Expand Down

0 comments on commit b0e1086

Please # to comment.