708 lines
25 KiB
Python
Executable file
708 lines
25 KiB
Python
Executable file
#!/usr/bin/env python3.11
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
import ast
|
|
import contextlib
|
|
import enum
|
|
import functools
|
|
import importlib
|
|
import importlib.util
|
|
import importlib.resources
|
|
import io
|
|
import math
|
|
import os
|
|
import os.path
|
|
import pickle
|
|
import pwd
|
|
import shlex
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import traceback
|
|
|
|
import fill3
|
|
import lscolors
|
|
import pygments
|
|
import pygments.lexers
|
|
import pygments.styles
|
|
import termstr
|
|
import toml
|
|
|
|
import eris
|
|
|
|
|
|
PYTHON_VERSION = "3.11"
|
|
PYTHON_EXECUTABLE = "python" + PYTHON_VERSION
|
|
CACHE_PATH = ".eris"
|
|
|
|
|
|
if "PYGMENT_STYLE" not in os.environ:
|
|
os.environ["PYGMENT_STYLE"] = "native"
|
|
|
|
|
|
class Status(enum.IntEnum):
|
|
|
|
ok = 1
|
|
problem = 2
|
|
error = 3
|
|
not_applicable = 4
|
|
running = 5
|
|
pending = 6
|
|
timed_out = 7
|
|
|
|
|
|
_STATUS_COLORS = {Status.ok: termstr.Color.green, Status.problem: termstr.Color.dark_green,
|
|
Status.not_applicable: termstr.Color.grey_80, Status.running: termstr.Color.lime,
|
|
Status.error: termstr.Color.red, Status.timed_out: termstr.Color.purple}
|
|
STATUS_MEANINGS = [(Status.ok, "Ok"), (Status.problem, "Problem"),
|
|
(Status.not_applicable, "Not applicable"), (Status.running, "Running"),
|
|
(Status.timed_out, "Timed out"), (Status.pending, "Pending"),
|
|
(Status.error, "Error")]
|
|
STATUS_TO_TERMSTR = {status: termstr.TermStr(" ", termstr.CharStyle(bg_color=color))
|
|
for status, color in _STATUS_COLORS.items()}
|
|
STATUS_TO_TERMSTR[Status.pending] = termstr.TermStr(".", fg_color=termstr.Color.grey_100)
|
|
|
|
|
|
TIMEOUT = 60
|
|
|
|
|
|
def _printable(text):
|
|
return "".join(char if ord(char) > 31 or char in ["\n", "\t"] else "#" for char in text)
|
|
|
|
|
|
def _fix_input(input_):
|
|
return _printable(input_).expandtabs(tabsize=4)
|
|
|
|
|
|
def _do_command(command, **kwargs):
|
|
completed_process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
text=True, **kwargs)
|
|
return (_fix_input(completed_process.stdout), _fix_input(completed_process.stderr),
|
|
completed_process.returncode)
|
|
|
|
|
|
def _run_command(command, error_status=None, has_color=False, timeout=None, **kwargs):
|
|
error_status = Status.problem if error_status is None else error_status
|
|
if has_color:
|
|
process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
text=True, timeout=timeout, **kwargs)
|
|
stdout, stderr, returncode = (termstr.TermStr.from_term(process.stdout),
|
|
termstr.TermStr.from_term(process.stderr), process.returncode)
|
|
else:
|
|
stdout, stderr, returncode = _do_command(command, timeout=timeout)
|
|
result_status = Status.ok if returncode == 0 else error_status
|
|
return result_status, (stdout + stderr)
|
|
|
|
|
|
def deps(**kwargs):
|
|
def decorating_func(func):
|
|
for key, value in kwargs.items():
|
|
setattr(func, key, value)
|
|
return func
|
|
return decorating_func
|
|
|
|
|
|
def _syntax_highlight(text, lexer, style):
|
|
@functools.lru_cache(maxsize=500)
|
|
def _parse_rgb(hex_rgb):
|
|
if hex_rgb.startswith("#"):
|
|
hex_rgb = hex_rgb[1:]
|
|
return tuple(int("0x" + hex_rgb[index:index+2], base=16) for index in [0, 2, 4])
|
|
|
|
@functools.lru_cache(maxsize=500)
|
|
def _char_style_for_token_type(token_type, default_bg_color, default_style):
|
|
try:
|
|
token_style = style.style_for_token(token_type)
|
|
except KeyError:
|
|
return default_style
|
|
fg_color = (termstr.Color.black if token_style["color"] is None
|
|
else _parse_rgb(token_style["color"]))
|
|
bg_color = (default_bg_color if token_style["bgcolor"] is None
|
|
else _parse_rgb(token_style["bgcolor"]))
|
|
return termstr.CharStyle(fg_color, bg_color, token_style["bold"], token_style["italic"],
|
|
token_style["underline"])
|
|
default_bg_color = _parse_rgb(style.background_color)
|
|
default_style = termstr.CharStyle(bg_color=default_bg_color)
|
|
text = termstr.join("", [termstr.TermStr(text, _char_style_for_token_type(
|
|
token_type, default_bg_color, default_style))
|
|
for token_type, text in pygments.lex(text, lexer)])
|
|
text_widget = fill3.Text(text, pad_char=termstr.TermStr(" ", bg_color=default_bg_color))
|
|
return termstr.join("\n", text_widget.text)
|
|
|
|
|
|
def _syntax_highlight_using_path(text, path):
|
|
lexer = pygments.lexers.get_lexer_for_filename(path, text, stripnl=False)
|
|
style = pygments.styles.get_style_by_name(os.environ["PYGMENT_STYLE"])
|
|
return _syntax_highlight(text, lexer, style)
|
|
|
|
|
|
def linguist(path):
|
|
# Dep: ruby?, ruby-dev, libicu-dev, cmake, "gem install github-linguist"
|
|
return _run_command(["linguist", path], Status.ok)
|
|
|
|
|
|
def _permissions_in_octal(permissions):
|
|
result = []
|
|
for part_index in range(3):
|
|
index = part_index * 3 + 1
|
|
part = permissions[index:index+3]
|
|
digit = sum(2 ** (2 - index) for index, element in enumerate(part) if element != "-")
|
|
result.append(str(digit))
|
|
return "".join(result)
|
|
|
|
|
|
def _pretty_bytes(bytes):
|
|
if bytes == 0:
|
|
return "0 B"
|
|
units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
|
|
unit_index = int(math.floor(math.log(bytes, 1024)))
|
|
power = math.pow(1024, unit_index)
|
|
conversion = round(bytes / power, 2)
|
|
return f"{conversion} {units[unit_index]}"
|
|
|
|
|
|
@deps(deps={"file", "coreutils"}, url="https://github.com/ahamilton/eris", executables={"file"})
|
|
def metadata(path):
|
|
|
|
def detail(value, unit):
|
|
result = f" ({value})" if unit is None else f" ({value} {unit})"
|
|
return termstr.TermStr(result, fg_color=termstr.Color.grey_100)
|
|
is_symlink = "yes" if os.path.islink(path) else "no"
|
|
stat_result = os.stat(path)
|
|
permissions = stat.filemode(stat_result.st_mode)
|
|
hardlinks = str(stat_result.st_nlink)
|
|
group = [pwd.getpwuid(stat_result.st_gid).pw_name, detail(stat_result.st_gid, "gid")]
|
|
owner = [pwd.getpwuid(stat_result.st_uid).pw_name, detail(stat_result.st_uid, "uid")]
|
|
modified, created, access = [[time.asctime(time.gmtime(seconds)), detail(int(seconds), "secs")]
|
|
for seconds in (stat_result.st_mtime, stat_result.st_ctime,
|
|
stat_result.st_atime)]
|
|
size = [_pretty_bytes(stat_result.st_size), detail(stat_result.st_size, "bytes")]
|
|
stdout, *rest = _do_command(["file", "--dereference", "--brief", "--uncompress", "--mime",
|
|
path])
|
|
mime_type = stdout
|
|
stdout, *rest = _do_command(["file", "--dereference", "--brief", "--uncompress", path])
|
|
file_type = stdout
|
|
permissions_value = [permissions, detail(_permissions_in_octal(permissions), None)]
|
|
text = []
|
|
for line in [("size", size), ("permissions", permissions_value), None,
|
|
("modified time", modified), ("creation time", created), ("access time", access),
|
|
None, ("owner", owner), ("group", group), None, ("hardlinks", hardlinks),
|
|
("symlink", is_symlink), None, ("mime type", mime_type.strip()),
|
|
("file type", file_type.strip())]:
|
|
if line is None:
|
|
text.append("\n")
|
|
else:
|
|
name, value = line
|
|
name = termstr.TermStr(name + ":", fg_color=termstr.Color.blue).ljust(16)
|
|
text.append(name + termstr.join("", value) + "\n")
|
|
return Status.ok, termstr.join("", text)
|
|
|
|
|
|
@deps(deps={"python3-pygments"}, url="http://pygments.org/")
|
|
def contents(path):
|
|
with open(path) as file_:
|
|
try:
|
|
head = file_.read(200)
|
|
tail = file_.read()
|
|
except UnicodeDecodeError:
|
|
return Status.not_applicable, "Not unicode"
|
|
text = _fix_input(head + tail)
|
|
try:
|
|
text = _syntax_highlight_using_path(text, path)
|
|
except pygments.util.ClassNotFound:
|
|
pass
|
|
return Status.ok, text
|
|
|
|
|
|
def _has_shebang_line(path):
|
|
with open(path, "rb") as file_:
|
|
return file_.read(2) == b"#!"
|
|
|
|
|
|
def _is_python_test_file(path):
|
|
path = str(os.path.basename(path))
|
|
return path.endswith("_test.py") or path.startswith("test_")
|
|
|
|
|
|
@deps(url="https://docs.python.org/3/library/unittest.html")
|
|
def python_unittests(path):
|
|
if _is_python_test_file(path):
|
|
command = [path] if _has_shebang_line(path) else [PYTHON_EXECUTABLE, path]
|
|
stdout, stderr, returncode = _do_command(command, timeout=TIMEOUT)
|
|
status = Status.ok if returncode == 0 else Status.problem
|
|
return status, (stdout + "\n" + stderr)
|
|
else:
|
|
return Status.not_applicable, "No tests."
|
|
|
|
|
|
@deps(deps={"python3-pytest", "python3-pytest-cov"},
|
|
url="https://docs.pytest.org/en/latest/", executables={"pytest"})
|
|
def pytest(path):
|
|
command = [PYTHON_EXECUTABLE, "-m", "pytest", "--cov=.",
|
|
"--doctest-modules", "--color=yes", path]
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
env = os.environ.copy()
|
|
env["COVERAGE_FILE"] = os.path.join(temp_dir, "coverage")
|
|
process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
text=True, timeout=TIMEOUT, env=env)
|
|
stdout, stderr, returncode = (termstr.TermStr.from_term(process.stdout),
|
|
termstr.TermStr.from_term(process.stderr), process.returncode)
|
|
if returncode == 5:
|
|
status = Status.not_applicable
|
|
else:
|
|
status = Status.ok if returncode == 0 else Status.problem
|
|
return status, (stdout + stderr)
|
|
|
|
|
|
@deps(deps={"python3-mypy"}, url="http://mypy-lang.org/")
|
|
def mypy(path):
|
|
stdout, stderr, returncode = _do_command(
|
|
[PYTHON_EXECUTABLE, "-m", "mypy", "--ignore-missing-imports", path], timeout=TIMEOUT)
|
|
status = Status.ok if returncode == 0 else Status.problem
|
|
return status, stdout
|
|
|
|
|
|
def _colorize_coverage_report(lines):
|
|
line_color = {"> ": termstr.Color.green, "! ": termstr.Color.grey_150, " ": None}
|
|
return termstr.join("", [termstr.TermStr(line, fg_color=line_color[line[:2]]) for line in lines])
|
|
|
|
|
|
@deps(deps={"python3-coverage"}, url="https://coverage.readthedocs.io/")
|
|
def python_coverage(path):
|
|
coverage_path = ".coverage"
|
|
if not os.path.exists(coverage_path):
|
|
return Status.not_applicable, f'No "{coverage_path}" file.'
|
|
if os.stat(path).st_mtime > os.stat(coverage_path).st_mtime:
|
|
return (Status.not_applicable,
|
|
f'File has been modified since "{coverage_path}" file was generated.')
|
|
path = os.path.normpath(path)
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
stdout, stderr, returncode = _do_command([PYTHON_EXECUTABLE, "-m", "coverage", "annotate",
|
|
"--directory", temp_dir, path])
|
|
if returncode != 0:
|
|
return Status.problem, stdout
|
|
cover_filename = os.listdir(temp_dir)[0]
|
|
with open(os.path.join(temp_dir, cover_filename), "r") as f:
|
|
lines = f.read().splitlines(keepends=True)
|
|
failed_lines = [line for line in lines if line.startswith("! ")]
|
|
status = Status.ok if not failed_lines else Status.problem
|
|
return status, _colorize_coverage_report(lines)
|
|
|
|
|
|
def _function_body_lines(python_source):
|
|
ranges = []
|
|
|
|
class FuncNodeVisitor(ast.NodeVisitor):
|
|
|
|
def _line_range(self, body):
|
|
return body[0].lineno - 1, body[-1].end_lineno
|
|
|
|
def visit_FunctionDef(self, node):
|
|
ranges.append(self._line_range(node.body))
|
|
visit_AsyncFunctionDef = visit_FunctionDef
|
|
tree = ast.parse(python_source)
|
|
FuncNodeVisitor().visit(tree)
|
|
return ranges
|
|
|
|
|
|
@deps(url="https://github.com/ahamilton/eris")
|
|
def python_gut(path):
|
|
with open(path) as module_file:
|
|
python_source = module_file.read()
|
|
lines = python_source.splitlines(keepends=True)
|
|
deleted = 0
|
|
for start_line, end_line in _function_body_lines(python_source):
|
|
del lines[start_line-deleted:end_line-deleted]
|
|
deleted += (end_line - start_line)
|
|
gutted_source = "".join(lines).expandtabs(tabsize=4)
|
|
return Status.ok, _syntax_highlight_using_path(gutted_source, path)
|
|
|
|
|
|
def _get_mccabe_line_score(line):
|
|
position, function_name, score = line.split()
|
|
return int(score)
|
|
|
|
|
|
def _colorize_mccabe(text):
|
|
return termstr.join("", [
|
|
termstr.TermStr(line, fg_color=termstr.Color.yellow)
|
|
if _get_mccabe_line_score(line) > 10 else line
|
|
for line in text.splitlines(keepends=True)])
|
|
|
|
|
|
@deps(deps={"python3-mccabe"}, url="https://pypi.org/project/mccabe/")
|
|
def python_mccabe(path):
|
|
stdout, *rest = _do_command([PYTHON_EXECUTABLE, "-m", "mccabe", path])
|
|
max_score = 0
|
|
with contextlib.suppress(ValueError): # When there are no lines
|
|
max_score = max(_get_mccabe_line_score(line) for line in stdout.splitlines())
|
|
status = Status.problem if max_score > 10 else Status.ok
|
|
return status, _colorize_mccabe(stdout)
|
|
|
|
|
|
# FIX: Reenable when pydisasm is not causing problems
|
|
# @deps(deps={"python3-xdis"}, executables={"pydisasm"},
|
|
# url="https://pypi.python.org/pypi/xdis")
|
|
# def pydisasm(path):
|
|
# return _run_command(["pydisasm", path], Status.ok,
|
|
# Status.not_applicable)
|
|
|
|
|
|
@deps(deps={"perltidy"}, url="http://perltidy.sourceforge.net/", executables={"perltidy"})
|
|
def perltidy(path):
|
|
stdout, *rest = _do_command(["perltidy", "-st", path])
|
|
return Status.ok, _syntax_highlight_using_path(stdout, path)
|
|
|
|
|
|
@deps(deps={"tidy"}, url="https://www.html-tidy.org/", executables={"tidy"})
|
|
def html_syntax(path):
|
|
# Stop tidy from modifiying input path by piping in input.
|
|
tidy_process = subprocess.run(f"cat {shlex.quote(path)} | tidy",
|
|
capture_output=True, text=True, shell=True)
|
|
status = Status.ok if tidy_process.returncode == 0 else Status.problem
|
|
return status, _fix_input(tidy_process.stderr)
|
|
|
|
|
|
@deps(deps={"pandoc"}, url="https://pandoc.org/", executables={"pandoc"})
|
|
def pandoc(path):
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_path = os.path.join(temp_dir, "temp.html")
|
|
_do_command(["pandoc", "-t", "html", "-o", temp_path, path], timeout=TIMEOUT)
|
|
return elinks(temp_path)
|
|
|
|
|
|
MAX_IMAGE_SIZE = 200
|
|
|
|
|
|
def _resize_image(image, new_width):
|
|
import PIL.Image # Here to avoid 'Segmentation Fault' in install-all-tools
|
|
scale = new_width / image.width
|
|
return image.resize((int(image.width * scale), int(image.height * scale)), PIL.Image.LANCZOS)
|
|
|
|
|
|
def _image_to_text(image):
|
|
text = "▀" * image.width
|
|
data = list(image.getdata())
|
|
width = image.width
|
|
rows = [data[row_index*width:(row_index+1)*width] for row_index in range(image.height)]
|
|
if image.height % 2 == 1:
|
|
rows.append([None] * image.width)
|
|
return termstr.join("\n", [
|
|
termstr.TermStr(text, tuple(termstr.CharStyle(fg_color=top_pixel, bg_color=bottom_pixel)
|
|
for top_pixel, bottom_pixel in zip(rows[index], rows[index+1])))
|
|
for index in range(0, image.height, 2)])
|
|
|
|
|
|
@deps(deps={"python3-pillow"}, url="http://python-pillow.github.io/")
|
|
def pil(path):
|
|
import PIL.Image
|
|
with open(path, "rb") as image_file:
|
|
with PIL.Image.open(image_file).convert("RGB") as image:
|
|
if image.width > MAX_IMAGE_SIZE:
|
|
image = _resize_image(image, MAX_IMAGE_SIZE)
|
|
return Status.ok, _image_to_text(image)
|
|
|
|
|
|
@deps(deps={"python3-cairosvg"}, url="https://cairosvg.org/")
|
|
def cairosvg(path):
|
|
import cairosvg
|
|
import PIL.Image
|
|
png_bytes = cairosvg.svg2png(url=path, output_width=MAX_IMAGE_SIZE)
|
|
with io.BytesIO(png_bytes) as png_file:
|
|
with PIL.Image.open(png_file).convert("RGB") as image:
|
|
return Status.ok, _image_to_text(image)
|
|
|
|
|
|
@deps(deps={"golang-go"},
|
|
url="https://github.com/golang/go", executables={"go"})
|
|
def godoc(path):
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
symlink_path = os.path.join(temp_dir, "file.go")
|
|
os.symlink(os.path.abspath(path), symlink_path)
|
|
stdout, stderr, returncode = _do_command(["go", "doc", "."], cwd=temp_dir)
|
|
os.remove(symlink_path)
|
|
status = Status.not_applicable if stdout.strip() == "" or returncode != 0 else Status.ok
|
|
return status, stdout + stderr
|
|
|
|
|
|
@deps(deps={"git"}, url="https://git-scm.com/docs/git-log",
|
|
executables={"git"})
|
|
def git_log(path):
|
|
status, output = _run_command(["git", "log", "--find-renames", "--follow", "--stat", "--color",
|
|
path], error_status=Status.not_applicable, has_color=True)
|
|
return (Status.not_applicable, "") if output.data == "" else (status, output)
|
|
|
|
|
|
def make_tool_function(dependencies, command, url=None, error_status=None,
|
|
has_color=False, timeout=None):
|
|
if url is None:
|
|
url = dependencies[0]
|
|
command_parts = command.split()
|
|
executables = set([command_parts[0]])
|
|
error_status = None if error_status is None else Status[error_status]
|
|
|
|
@deps(deps=set(dependencies), url=url, executables=executables)
|
|
def func(path):
|
|
return _run_command(command_parts + [path], error_status, has_color, timeout)
|
|
func.command = command
|
|
return func
|
|
|
|
|
|
elinks, git_diff, git_blame = None, None, None # For linters.
|
|
|
|
|
|
if "ERIS_CONFIG" in os.environ:
|
|
tools_toml = toml.load(os.environ["ERIS_CONFIG"])
|
|
else:
|
|
with importlib.resources.open_text(eris, "tools.toml") as tools_toml_file:
|
|
tools_toml = toml.load(tools_toml_file)
|
|
tools_for_extensions = tools_toml["tools_for_extensions"]
|
|
del tools_toml["tools_for_extensions"]
|
|
for tool_name, tool_toml in tools_toml.items():
|
|
tool_func = make_tool_function(**tool_toml)
|
|
tool_func.__name__ = tool_func.__qualname__ = tool_name
|
|
globals()[tool_name] = tool_func
|
|
|
|
#############################
|
|
|
|
|
|
def log_error(message=None):
|
|
message = traceback.format_exc() if message is None else message + "\n"
|
|
with open("/tmp/eris.log", "a") as log_file:
|
|
log_file.write(message)
|
|
|
|
|
|
def lru_cache_with_eviction(maxsize=128, typed=False):
|
|
versions = {}
|
|
make_key = functools._make_key
|
|
|
|
def evict(*args, **kwds):
|
|
key = make_key(args, kwds, typed)
|
|
if key in versions:
|
|
versions[key] += 1
|
|
|
|
def decorating_function(user_function):
|
|
|
|
def remove_version(*args, **kwds):
|
|
return user_function(*args[1:], **kwds)
|
|
new_func = functools.lru_cache(maxsize=maxsize, typed=typed)(remove_version)
|
|
|
|
def add_version(*args, **kwds):
|
|
key = make_key(args, kwds, typed)
|
|
return new_func(*((versions.setdefault(key, 0),) + args), **kwds)
|
|
add_version.versions = versions
|
|
add_version.cache_info = new_func.cache_info
|
|
add_version.evict = evict
|
|
return functools.update_wrapper(add_version, user_function)
|
|
return decorating_function
|
|
|
|
|
|
def dump_pickle_safe(object_, path, protocol=pickle.HIGHEST_PROTOCOL, open=open):
|
|
tmp_path = path + ".tmp"
|
|
try:
|
|
with open(tmp_path, "wb") as file_:
|
|
pickle.dump(object_, file_, protocol=protocol)
|
|
except (OSError, KeyboardInterrupt):
|
|
os.remove(tmp_path)
|
|
else:
|
|
os.rename(tmp_path, path)
|
|
|
|
|
|
@functools.cache
|
|
def compression_open_func(compression):
|
|
return open if compression == "none" else importlib.import_module(compression).open
|
|
|
|
|
|
class Result:
|
|
|
|
COMPLETED_STATUSES = {Status.ok, Status.problem, Status.error,
|
|
Status.not_applicable, Status.timed_out}
|
|
|
|
def __init__(self, path, tool):
|
|
self.path = path
|
|
self.tool = tool
|
|
self.compression = None
|
|
self.scroll_position = (0, 0)
|
|
self.status = Status.pending
|
|
self.is_highlighted = False
|
|
|
|
def pickle_path(self):
|
|
return os.path.join(CACHE_PATH, self.path + "-" + self.tool.__name__)
|
|
|
|
@property
|
|
@lru_cache_with_eviction(maxsize=50)
|
|
def result(self):
|
|
unknown_label = fill3.Text("?")
|
|
if self.status == Status.pending or self.compression is None:
|
|
return unknown_label
|
|
try:
|
|
with compression_open_func(self.compression)(self.pickle_path(), "rb") as pickle_file:
|
|
return pickle.load(pickle_file)
|
|
except FileNotFoundError:
|
|
return unknown_label
|
|
|
|
@result.setter
|
|
def result(self, value):
|
|
os.makedirs(os.path.dirname(self.pickle_path()), exist_ok=True)
|
|
dump_pickle_safe(value, self.pickle_path(), open=compression_open_func(self.compression))
|
|
Result.result.fget.evict(self)
|
|
|
|
def set_status(self, status):
|
|
self.status = status
|
|
self.entry.appearance_cache = None
|
|
|
|
@property
|
|
def is_completed(self):
|
|
return self.status in Result.COMPLETED_STATUSES
|
|
|
|
async def run(self, log, runner):
|
|
tool_name = tool_name_colored(self.tool, self.path)
|
|
path = lscolors.path_colored(self.path)
|
|
log.log_message(["Running ", tool_name, " on ", path, "…"])
|
|
self.set_status(Status.running)
|
|
fill3.APPEARANCE_CHANGED_EVENT.set()
|
|
start_time = time.time()
|
|
new_status = await runner.run_tool(self.path, self.tool)
|
|
Result.result.fget.evict(self)
|
|
end_time = time.time()
|
|
self.set_status(new_status)
|
|
fill3.APPEARANCE_CHANGED_EVENT.set()
|
|
log.log_message(["Finished running ", tool_name, " on ", path, ". ",
|
|
STATUS_TO_TERMSTR[new_status], f" {round(end_time - start_time, 2)} secs"])
|
|
|
|
def reset(self):
|
|
self.set_status(Status.pending)
|
|
|
|
def _get_cursor(self):
|
|
status_color = _STATUS_COLORS.get(self.status, None)
|
|
return termstr.TermStr("+", termstr.CharStyle(
|
|
fg_color=termstr.Color.white, bg_color=status_color, is_bold=True))
|
|
|
|
def appearance(self):
|
|
return [self._get_cursor() if self.is_highlighted else STATUS_TO_TERMSTR[self.status]]
|
|
|
|
def get_pages_dir(self):
|
|
return self.pickle_path() + ".pages"
|
|
|
|
def delete(self):
|
|
with contextlib.suppress(FileNotFoundError):
|
|
os.remove(self.pickle_path())
|
|
with contextlib.suppress(FileNotFoundError):
|
|
shutil.rmtree(self.get_pages_dir())
|
|
Result.result.fget.evict(self)
|
|
|
|
def as_html(self):
|
|
html, styles = termstr.TermStr(
|
|
STATUS_TO_TERMSTR[self.status]).as_html()
|
|
return (f'<a title="{self.tool.__name__}" href="{self.path}/{self.tool.__name__}" '
|
|
f'target="listing">{html}</a>', styles)
|
|
|
|
|
|
def generic_tools():
|
|
return [contents, metadata]
|
|
|
|
|
|
TOOLS_FOR_EXTENSIONS = []
|
|
for extensions, tool_names in tools_for_extensions:
|
|
TOOLS_FOR_EXTENSIONS.append(
|
|
(extensions, [globals()[tool_name] for tool_name in tool_names]))
|
|
|
|
|
|
@functools.cache
|
|
def _tools_for_extension():
|
|
result = {}
|
|
for extensions, tools in TOOLS_FOR_EXTENSIONS:
|
|
for extension in extensions:
|
|
result[extension] = tools
|
|
return result
|
|
|
|
|
|
def tools_all():
|
|
tools_ = set(generic_tools())
|
|
tools_.add(git_diff)
|
|
tools_.add(git_blame)
|
|
tools_.add(git_log)
|
|
for tool_list in _tools_for_extension().values():
|
|
tools_.update(set(tool_list))
|
|
return tools_
|
|
|
|
|
|
def tool_dependencies(tool):
|
|
try:
|
|
return tool.deps
|
|
except AttributeError:
|
|
return set()
|
|
|
|
|
|
def dependencies():
|
|
dependencies_all = set()
|
|
for tool in tools_all():
|
|
dependencies_all.update(tool_dependencies(tool))
|
|
return dependencies_all
|
|
|
|
|
|
def splitext(path):
|
|
root, ext = os.path.splitext(path)
|
|
if "." in root:
|
|
for compound_ext in [".tar.gz", ".tar.bz2"]:
|
|
if path.endswith(compound_ext):
|
|
return path[:-len(compound_ext)], path[-len(compound_ext):]
|
|
return root, ext
|
|
|
|
|
|
@functools.cache
|
|
def is_tool_available(tool):
|
|
if (hasattr(tool, "command") and tool.command.startswith(f"{PYTHON_EXECUTABLE} -m ")):
|
|
try:
|
|
return importlib.util.find_spec(tool.command.split()[2]) is not None
|
|
except ModuleNotFoundError:
|
|
return False
|
|
try:
|
|
return all(shutil.which(executable) for executable in tool.executables)
|
|
except AttributeError:
|
|
return True
|
|
|
|
|
|
def tools_for_path(path):
|
|
git_tools = [git_diff, git_blame, git_log] if os.path.exists(".git") else []
|
|
root, ext = splitext(path)
|
|
extra_tools = [] if ext == "" else _tools_for_extension().get(ext[1:], [])
|
|
tools = generic_tools() + git_tools + extra_tools
|
|
return [tool for tool in tools if is_tool_available(tool)]
|
|
|
|
|
|
def run_tool_no_error(path, tool):
|
|
try:
|
|
return tool(path)
|
|
except subprocess.TimeoutExpired:
|
|
return Status.timed_out, "Timed out"
|
|
except UnicodeDecodeError:
|
|
return Status.not_applicable, "Result not in UTF-8"
|
|
except Exception:
|
|
return Status.error, _syntax_highlight(
|
|
traceback.format_exc(), pygments.lexers.PythonTracebackLexer(),
|
|
pygments.styles.get_style_by_name(os.environ["PYGMENT_STYLE"]))
|
|
|
|
|
|
@functools.lru_cache(maxsize=100)
|
|
def tool_name_colored(tool, path):
|
|
char_style = (termstr.CharStyle(is_bold=True) if tool in generic_tools()
|
|
else lscolors._charstyle_of_path(path))
|
|
return termstr.TermStr(tool.__name__, char_style)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Let tools be tested individually...
|
|
# ./tools.py <tool> <path>
|
|
# e.g. ./tools.py contents tools.toml
|
|
tool_name, path = sys.argv[1:3]
|
|
tool = locals()[tool_name]
|
|
valid_tools = tools_for_path(path)
|
|
assert tool in valid_tools, valid_tools
|
|
status, text = run_tool_no_error(path, tool)
|
|
print(text)
|
|
sys.exit(0 if status == Status.ok else 1)
|