forked from fedora-llvm-team/llvm-snapshots
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Turn off formatting: See psf/black#4268
- Loading branch information
Showing
14 changed files
with
499 additions
and
347 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
""" | ||
FileAccessMixin | ||
""" | ||
|
||
import typing | ||
import pathlib | ||
import requests | ||
import config_mixin | ||
|
||
|
||
class FileAccessMixin(config_mixin.ConfigMixin): | ||
"""Any class that needs to access to the filesystem can derive from this class""" | ||
|
||
def __init__(self, github_token: typing.Optional[str] = None, **kwargs): | ||
""" | ||
Initializes the mixin. | ||
""" | ||
super().__init__(**kwargs) | ||
|
||
@classmethod | ||
def _run_cmd(cls, cmd: str, timeout_secs: int = 5) -> tuple[int, str, str]: | ||
"""Runs the given command and returns the output (stdout and stderr) if any. | ||
Args: | ||
cmd (str): Command to run, e.g. "ls -lha ." | ||
Returns: | ||
tuple[int, str, str]: The command exit code and it's stdout and sterr | ||
""" | ||
import shlex, subprocess | ||
|
||
proc = subprocess.run( | ||
shlex.split(cmd), timeout=timeout_secs, capture_output=True | ||
) | ||
return proc.returncode, proc.stdout.decode(), proc.stderr.decode() | ||
|
||
@classmethod | ||
@typing.overload | ||
def write_to_temp_file(cls, content: bytes) -> pathlib.Path: ... | ||
|
||
@classmethod | ||
@typing.overload | ||
def write_to_temp_file(cls, text: str) -> pathlib.Path: ... | ||
|
||
@classmethod | ||
def write_to_temp_file(cls, content: str | bytes) -> pathlib.Path: | ||
"""Creates a named temporary file that isn't deleted and writes content to it. | ||
Args: | ||
content (str|bytes): String or bytes to be written to the file | ||
Raises: | ||
ValueError: If the content has an unsupported type | ||
Returns: | ||
pathlib.Path: Path object of the temporary file created | ||
Example: Write a string to a temporary file | ||
>>> p = FileAccessMixin.write_to_temp_file("foo") | ||
>>> data = p.read_text() | ||
>>> print(data) | ||
foo | ||
Example: Write bytes to a temporary file | ||
>>> p = FileAccessMixin.write_to_temp_file("bar".encode()) | ||
>>> print(p.read_text()) | ||
bar | ||
Example: Write unsupported content to temp file | ||
>>> p = FileAccessMixin.write_to_temp_file(123) | ||
Traceback (most recent call last): | ||
ValueError: unsupported content type to write to temporary file | ||
""" | ||
import tempfile | ||
|
||
f = tempfile.NamedTemporaryFile(delete_on_close=False, delete=False) | ||
p = pathlib.Path(f.name) | ||
if isinstance(content, bytes): | ||
p.write_bytes(content) | ||
elif isinstance(content, str): | ||
p.write_text(content) | ||
else: | ||
raise ValueError("unsupported content type to write to temporary file") | ||
return p | ||
|
||
@classmethod | ||
def wrap_file_in_md_code_fence(cls, context_file: pathlib.Path) -> None: | ||
"""Surround context file with markdown code fence and shorten it appropriately. | ||
Args: | ||
context_file (pathlib.Path): A file path object pointing to a temporary build log for example | ||
Example: | ||
>>> p = FileAccessMixin.write_to_temp_file("0123456789") | ||
>>> print(p.read_text()) | ||
0123456789 | ||
""" | ||
cls.shorten_file(context_file) | ||
with open(context_file, "r") as original: | ||
data = original.read() | ||
with open(context_file, "w") as modified: | ||
modified.write(f"\n```\n{data}\n```\n") | ||
|
||
@classmethod | ||
def read_url_response_into_file(cls, url: str) -> pathlib.Path: | ||
"""Fetch the given URL and store it in a temporary file whose name is returned. | ||
Args: | ||
url (str): URL to GET | ||
Returns: | ||
pathlib.Path: Path object of the temporary file to which the GET response was written to. | ||
""" | ||
response = requests.get(url) | ||
return cls.write_to_temp_file(response.content) | ||
|
||
@classmethod | ||
def grep_file( | ||
cls, | ||
pattern: str, | ||
filepath: typing.Union[str, pathlib.Path], | ||
lines_before: typing.Optional[int] = 0, | ||
lines_after: typing.Optional[int] = 0, | ||
case_insensitive: typing.Optional[bool] = True, | ||
extra_args: typing.Optional[str] = None, | ||
grep_bin: typing.Optional[str] = "grep", | ||
) -> tuple[int, str, str]: | ||
"""Runs the grep binary on the filepath and includes lines before and after repsectively. | ||
Args: | ||
pattern (str): The pattern to find | ||
filepath (typing.Union[str,pathlib.Path]): The file to search for the pattern | ||
lines_before (typing.Optional[int], optional): Includes N-lines before a given match. Defaults to 0. | ||
lines_after (typing.Optional[int], optional): Includes N-lines after a given match. Defaults to 0. | ||
case_insensitive (typing.Optional[bool], optional): Ignores cases. Defaults to True. | ||
extra_args (typing.Optiona[str], optional): A string of grep extra arguments (e.g. "-P"). Defaults to None. | ||
Raises: | ||
ValueError: If the pattern is empty | ||
ValueError: If the lines_before is negative | ||
ValueError: If the lines_after is negative | ||
Returns: | ||
tuple[int, str, str]: return code, stdout, stderr | ||
""" | ||
if pattern is None or pattern == "": | ||
raise ValueError(f"pattern is invalid:{pattern}") | ||
|
||
if lines_before is None or lines_before < 0: | ||
raise ValueError(f"lines_before must be zero or a positive integer") | ||
|
||
if lines_after is None or lines_after < 0: | ||
raise ValueError(f"lines_after must be zero or a positive integer") | ||
|
||
opts = [] | ||
if case_insensitive: | ||
opts.append("-i") | ||
if lines_before > 0: | ||
opts.append(f"--before-context={lines_before}") | ||
|
||
if lines_after > 0: | ||
opts.append(f"--after-context={lines_after}") | ||
|
||
if isinstance(filepath, pathlib.Path): | ||
filepath = filepath.resolve() | ||
|
||
if extra_args is None: | ||
extra_args = "" | ||
|
||
cmd = f"{grep_bin} {" ".join(opts)} {extra_args} '{pattern}' {filepath}" | ||
return cls._run_cmd(cmd) | ||
|
||
@classmethod | ||
def grep_url(cls, url, **kw_args) -> tuple[str, pathlib.Path]: | ||
"""GETs the given url and passes all other parameters on to grep_file | ||
See grep_file for knowing what arguments are accepted for kw_args. | ||
Args: | ||
url (_type_): URL to get | ||
Returns: | ||
str: Potential matches | ||
""" | ||
file = cls.read_url_response_into_file(url=url) | ||
return cls.grep_file(filepath=file, **kw_args), file | ||
|
||
@classmethod | ||
def gunzip(cls, f: tuple[str, pathlib.Path]) -> pathlib.Path: | ||
# Unzip log file on the fly if we need to | ||
if str(f).endswith(".gz"): | ||
unzipped_file = str(f).removesuffix(".gz") | ||
retcode, stdout, stderr = cls._run_cmd(cmd=f"gunzip -kf {f}") | ||
if retcode != 0: | ||
raise Exception(f"Failed to gunzip build log '{f}': {stderr}") | ||
f = unzipped_file | ||
return pathlib.Path(str(f)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.