This is a workaround, not being able to get very small appearances is still there. For now the main appearance won't shrink below a width of 10 or a height of 20, but thats okay since the interface is already impractical at that point. The minimum appearance is then cropped to fit the window.
1009 lines
37 KiB
Python
Executable file
1009 lines
37 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2015-2016 Andrew Hamilton. All rights reserved.
|
|
# Licensed under the Artistic License 2.0.
|
|
|
|
"""\
|
|
Vigil maintains a set of reports for each file in a directory tree.
|
|
Different types of reports are produced for different types of file.
|
|
|
|
The state of each report is summarised by a status indicator, and a report is
|
|
viewed by selecting this status indicator with the cursor. The types of status
|
|
are listed below.
|
|
|
|
Reports are recalculated whenever files are changed, so that they are always up
|
|
to date.
|
|
|
|
The reports are cached in a directory ".vigil" under the target directory.
|
|
|
|
|
|
Usage: vigil <root_path>
|
|
|
|
e.g. # vigil my_project
|
|
|
|
Keys:
|
|
*h - Show the help screen. (toggle)
|
|
*d, *c, *j, *k or arrow keys or mouse click - Move the cursor.
|
|
*D, *C, *J, *K or page up, page down, home, end or the mouse wheel -
|
|
Scroll the result pane.
|
|
*t - Turn the result pane to portrait or landscape orientation. (toggle)
|
|
*l - Show the activity log. (toggle)
|
|
*n - Move to the next issue.
|
|
*N - Move to the next issue of the current tool.
|
|
*o - Order files by type, or by directory location. (toggle)
|
|
*p - Pause work. (toggle)
|
|
*s - Change the appearance of result statuses. (toggle)
|
|
*q - Quit.
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
import collections
|
|
import contextlib
|
|
import functools
|
|
import gzip
|
|
import multiprocessing
|
|
import os
|
|
import pickle
|
|
import shutil
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import traceback
|
|
|
|
import pyinotify
|
|
|
|
import fill3
|
|
import sandbox_fs
|
|
import terminal
|
|
import termstr
|
|
import tools
|
|
|
|
|
|
_LOG_PATH = os.path.join(os.getcwd(), "vigil.log")
|
|
|
|
|
|
def _log_error(message=None):
|
|
message = traceback.format_exc() if message is None else message + "\n"
|
|
with open(_LOG_PATH, "a") as log_file:
|
|
log_file.write(message)
|
|
|
|
|
|
_CACHE_PATH = ".vigil"
|
|
|
|
|
|
def lru_cache_with_eviction(maxsize=128, typed=False):
|
|
versions = {}
|
|
make_key = functools._make_key
|
|
|
|
def evict(*args, **kwds):
|
|
key = make_key(args, kwds, typed)
|
|
if key in versions:
|
|
versions[key] += 1
|
|
|
|
def decorating_function(user_function):
|
|
|
|
def remove_version(*args, **kwds):
|
|
return user_function(*args[1:], **kwds)
|
|
new_func = functools.lru_cache(maxsize=maxsize, typed=typed)(
|
|
remove_version)
|
|
|
|
def add_version(*args, **kwds):
|
|
key = make_key(args, kwds, typed)
|
|
return new_func(*((versions.setdefault(key, 0),) + args), **kwds)
|
|
add_version.versions = versions
|
|
add_version.cache_info = new_func.cache_info
|
|
add_version.evict = evict
|
|
return functools.update_wrapper(add_version, user_function)
|
|
return decorating_function
|
|
|
|
|
|
def dump_pickle_safe(object_, path, protocol=pickle.HIGHEST_PROTOCOL,
|
|
open=open):
|
|
tmp_path = path + ".tmp"
|
|
try:
|
|
with open(tmp_path, "wb") as file_:
|
|
pickle.dump(object_, file_, protocol=protocol)
|
|
except (OSError, KeyboardInterrupt):
|
|
os.remove(tmp_path)
|
|
else:
|
|
os.rename(tmp_path, path)
|
|
|
|
|
|
def status_to_str(status, is_status_simple):
|
|
if isinstance(status, int): # is a status enumeration
|
|
dict_ = (tools._STATUS_TO_TERMSTR_SIMPLE if is_status_simple
|
|
else tools._STATUS_TO_TERMSTR)
|
|
return dict_[status]
|
|
else:
|
|
return status
|
|
|
|
|
|
class Result:
|
|
|
|
def __init__(self, path, tool, is_stored_compressed=True):
|
|
self.path = path
|
|
self.tool = tool
|
|
self._open_func = gzip.open if is_stored_compressed else open
|
|
self.pickle_path = os.path.join(_CACHE_PATH,
|
|
path + "-" + tool.__name__)
|
|
self.scroll_position = (0, 0)
|
|
self.is_completed = False
|
|
self.is_placeholder = True
|
|
self.status = tools.Status.pending
|
|
|
|
@property
|
|
@lru_cache_with_eviction(maxsize=50)
|
|
def result(self):
|
|
unknown_label = fill3.Text("?")
|
|
if self.is_placeholder:
|
|
return unknown_label
|
|
try:
|
|
with self._open_func(self.pickle_path, "rb") as pickle_file:
|
|
return pickle.load(pickle_file)
|
|
except FileNotFoundError:
|
|
return unknown_label
|
|
|
|
@result.setter
|
|
def result(self, value):
|
|
os.makedirs(os.path.dirname(self.pickle_path), exist_ok=True)
|
|
dump_pickle_safe(value, self.pickle_path, open=self._open_func)
|
|
Result.result.fget.evict(self)
|
|
|
|
def set_status(self, status):
|
|
self.status = status
|
|
self.entry.appearance_cache = None
|
|
|
|
def run(self, log, appearance_changed_event, worker, runner):
|
|
self.is_placeholder = False
|
|
tool_name = tools._tool_name_colored(self.tool, self.path)
|
|
path_colored = tools._path_colored(self.path)
|
|
log.log_message(["Running ", tool_name, " on ", path_colored, "..."])
|
|
self.set_status(tools.Status.running)
|
|
if runner.is_already_paused:
|
|
runner.is_already_paused = False
|
|
runner.pause()
|
|
appearance_changed_event.set()
|
|
start_time = time.time()
|
|
new_status = worker.run_tool(self.path, self.tool)
|
|
Result.result.fget.evict(self)
|
|
end_time = time.time()
|
|
self.set_status(new_status)
|
|
appearance_changed_event.set()
|
|
self.is_completed = True
|
|
log.log_message(
|
|
["Finished running ", tool_name, " on ", path_colored, ". ",
|
|
status_to_str(new_status, self.entry.summary.is_status_simple),
|
|
" %s secs" % round(end_time - start_time, 2)])
|
|
|
|
def reset(self):
|
|
self.is_placeholder = True
|
|
self.set_status(tools.Status.pending)
|
|
|
|
def appearance_min(self):
|
|
return [status_to_str(self.status,
|
|
self.entry.summary.is_status_simple)]
|
|
|
|
|
|
import worker # Avoid a circular import. worker.py needs the Result class.
|
|
|
|
|
|
def reverse_style(style):
|
|
return termstr.CharStyle(style.bg_color, style.fg_color, style.is_bold,
|
|
style.is_underlined)
|
|
|
|
|
|
class Entry(collections.UserList):
|
|
|
|
def __init__(self, path, results, summary, highlighted=None,
|
|
set_results=True):
|
|
collections.UserList.__init__(self, results)
|
|
self.path = path
|
|
self.summary = summary
|
|
self.highlighted = highlighted
|
|
self.widgets = self.data
|
|
if set_results:
|
|
# FIX: this is missed for entries appended later
|
|
for result in results:
|
|
result.entry = self
|
|
self.widget = fill3.Row(results)
|
|
self.appearance_cache = None
|
|
|
|
def appearance_min(self):
|
|
# 'appearance' local variable exists because appearance_cache can
|
|
# become None at any time.
|
|
appearance = self.appearance_cache
|
|
if appearance is None:
|
|
if self.highlighted is not None:
|
|
cursor = (fill3.Text("●") if self.summary.is_status_simple
|
|
else fill3.Style(self.widget[self.highlighted],
|
|
reverse_style))
|
|
self.widget[self.highlighted] = cursor
|
|
new_appearance = self.widget.appearance_min()
|
|
path = tools._path_colored(self.path)
|
|
padding = " " * (self.summary._max_path_length - len(path) + 1)
|
|
new_appearance[0] = path + padding + new_appearance[0]
|
|
self.appearance_cache = appearance = new_appearance
|
|
return appearance
|
|
|
|
|
|
def is_filename_excluded(filename):
|
|
return filename.startswith(".")
|
|
|
|
|
|
def codebase_files(path, skip_hidden_directories=True):
|
|
for (dirpath, dirnames, filenames) in os.walk(path):
|
|
if skip_hidden_directories:
|
|
filtered_dirnames = [dirname for dirname in dirnames
|
|
if not is_filename_excluded(dirname)]
|
|
dirnames[:] = filtered_dirnames
|
|
for filename in filenames:
|
|
if not is_filename_excluded(filename):
|
|
yield os.path.join(dirpath, filename)
|
|
|
|
|
|
def fix_paths(root_path, paths):
|
|
return [os.path.join(".", os.path.relpath(path, root_path))
|
|
for path in paths]
|
|
|
|
|
|
def change_background(str_, new_background):
|
|
|
|
def change_background_style(style):
|
|
new_bg = (new_background if style.bg_color == termstr.Color.black
|
|
else style.bg_color)
|
|
return termstr.CharStyle(style.fg_color, new_bg, style.is_bold,
|
|
style.is_underlined)
|
|
return termstr.TermStr(str_).transform_style(change_background_style)
|
|
|
|
|
|
class Summary:
|
|
|
|
def __init__(self, root_path, jobs_added_event):
|
|
self._root_path = root_path
|
|
self._jobs_added_event = jobs_added_event
|
|
self._view_widget = fill3.View.from_widget(self)
|
|
self.__cursor_position = (0, 0)
|
|
self.closest_placeholder_generator = None
|
|
self._lock = threading.Lock()
|
|
self._cache = {}
|
|
self.is_status_simple = False
|
|
self.is_directory_sort = True
|
|
self._max_width = None
|
|
self._max_path_length = None
|
|
self._all_results = set()
|
|
self.sync_with_filesystem()
|
|
|
|
@property
|
|
def _cursor_position(self):
|
|
return self.__cursor_position
|
|
|
|
@_cursor_position.setter
|
|
def _cursor_position(self, new_position):
|
|
if new_position != self.__cursor_position:
|
|
self.__cursor_position = new_position
|
|
self.closest_placeholder_generator = None
|
|
|
|
def sync_with_filesystem(self, sync_paths=True):
|
|
x, y = self._cursor_position
|
|
try:
|
|
old_path = self.get_selection().path
|
|
except AttributeError:
|
|
old_path = None
|
|
new_column = fill3.Column([])
|
|
new_cache = {}
|
|
if sync_paths:
|
|
paths = fix_paths(self._root_path,
|
|
codebase_files(self._root_path))
|
|
self._paths = paths
|
|
self.sort(self.is_directory_sort)
|
|
else:
|
|
paths = self._paths
|
|
jobs_added = False
|
|
new_cursor_position = (0, 0)
|
|
row_index = 0
|
|
result_total, completed_total = 0, 0
|
|
all_results = set()
|
|
for path in paths:
|
|
full_path = os.path.join(self._root_path, path)
|
|
try:
|
|
key = (path, os.stat(full_path).st_ctime)
|
|
except FileNotFoundError:
|
|
continue
|
|
if path == old_path:
|
|
new_cursor_position = (x, row_index)
|
|
row = []
|
|
for tool in tools.tools_for_path(path):
|
|
cache_key = (key, tool.__name__, tool.__code__.co_code)
|
|
if cache_key in self._cache:
|
|
result = self._cache[cache_key]
|
|
result.tool = tool
|
|
else:
|
|
result = Result(path, tool)
|
|
jobs_added = True
|
|
all_results.add(result)
|
|
if result.is_completed:
|
|
completed_total += 1
|
|
new_cache[cache_key] = result
|
|
row.append(result)
|
|
new_column.append(Entry(path, row, self))
|
|
row_index += 1
|
|
result_total += len(row)
|
|
max_width = max(len(row) for row in new_column)
|
|
max_path_length = max(len(path) for path in paths) - len("./")
|
|
deleted_results = self._all_results - all_results
|
|
self._column, self._cache, self._cursor_position, self.result_total, \
|
|
self.completed_total, self._max_width, self._max_path_length, \
|
|
self.closest_placeholder_generator, self._all_results = (
|
|
new_column, new_cache, new_cursor_position, result_total,
|
|
completed_total, max_width, max_path_length, None, all_results)
|
|
if jobs_added:
|
|
self._jobs_added_event.set()
|
|
for result in deleted_results:
|
|
with contextlib.suppress(FileNotFoundError):
|
|
os.remove(result.pickle_path)
|
|
|
|
def placeholder_spiral(self):
|
|
x, y = self.cursor_position()
|
|
result = self._column[y][x]
|
|
if result.is_placeholder:
|
|
yield result
|
|
for lap in range(max(len(self._column), self._max_width)):
|
|
y -= 1
|
|
for dx, dy in [(1, 1), (-1, 1), (-1, -1), (1, -1)]:
|
|
for move in range(lap + 1):
|
|
x += dx
|
|
y += dy
|
|
try:
|
|
result = self._column[y][x]
|
|
except IndexError:
|
|
continue
|
|
if result.is_placeholder:
|
|
yield result
|
|
|
|
def get_closest_placeholder(self):
|
|
with self._lock:
|
|
try:
|
|
return self.closest_placeholder_generator.send(None)
|
|
except AttributeError:
|
|
self.closest_placeholder_generator = self.placeholder_spiral()
|
|
return self.closest_placeholder_generator.send(None)
|
|
|
|
def appearance_dimensions(self):
|
|
status_width = 1 if self.is_status_simple else 2
|
|
width = self._max_path_length + 1 + status_width * self._max_width
|
|
return width, len(self._column)
|
|
|
|
def appearance_interval(self, interval):
|
|
start_y, end_y = interval
|
|
x, y = self.cursor_position()
|
|
rows = fill3.Column(self._column.widgets)
|
|
rows[y] = Entry(rows[y].path, rows[y].widgets, self, highlighted=x,
|
|
set_results=False)
|
|
return rows.appearance_interval(interval)
|
|
|
|
def appearance(self, dimensions):
|
|
width, height = dimensions
|
|
x, y = self.cursor_position()
|
|
status_width = 1 if self.is_status_simple else 2
|
|
screen_x, screen_y = self._max_path_length + 1 + x * status_width, y
|
|
width, height = width - 1, height - 1 # Minus one for the scrollbars
|
|
scroll_y = (screen_y // height) * height
|
|
self._view_widget.position = ((screen_x // width) * width, scroll_y)
|
|
appearance = self._view_widget.appearance(dimensions)
|
|
appearance[screen_y - scroll_y] = change_background(
|
|
appearance[screen_y - scroll_y], termstr.Color.grey_50)
|
|
return appearance
|
|
|
|
def cursor_position(self):
|
|
x, y = self._cursor_position
|
|
return min(x, len(self._column[y])-1), y
|
|
|
|
def get_selection(self):
|
|
x, y = self.cursor_position()
|
|
return self._column[y][x]
|
|
|
|
def _move_cursor(self, dx, dy):
|
|
if dy == 0:
|
|
x, y = self.cursor_position()
|
|
self._cursor_position = ((x + dx) % len(self._column[y]), y)
|
|
elif dx == 0:
|
|
x, y = self._cursor_position
|
|
self._cursor_position = (x, (y + dy) % len(self._column))
|
|
else:
|
|
raise ValueError
|
|
|
|
def cursor_right(self):
|
|
self._move_cursor(1, 0)
|
|
|
|
def cursor_left(self):
|
|
self._move_cursor(-1, 0)
|
|
|
|
def cursor_up(self):
|
|
self._move_cursor(0, -1)
|
|
|
|
def cursor_down(self):
|
|
self._move_cursor(0, 1)
|
|
|
|
def _issue_generator(self):
|
|
x, y = self.cursor_position()
|
|
for index in range(len(self._column) + 1):
|
|
row_index = (index + y) % len(self._column)
|
|
row = self._column[row_index]
|
|
for index_x, result in enumerate(row):
|
|
if (result.status == tools.Status.problem and
|
|
not (row_index == y and index_x <= x and
|
|
index != len(self._column))):
|
|
yield result, (index_x, row_index)
|
|
|
|
def move_to_next_issue(self):
|
|
with contextlib.suppress(StopIteration):
|
|
issue, self._cursor_position = self._issue_generator().send(None)
|
|
|
|
def move_to_next_issue_of_tool(self):
|
|
current_tool = self.get_selection().tool
|
|
for issue, position in self._issue_generator():
|
|
if issue.tool == current_tool:
|
|
self._cursor_position = position
|
|
return
|
|
|
|
def toggle_status_style(self):
|
|
self.is_status_simple = not self.is_status_simple
|
|
self.sync_with_filesystem(sync_paths=False)
|
|
|
|
def sort(self, is_directory_sort):
|
|
def directory_sort(path):
|
|
return (os.path.dirname(path), tools.splitext(path)[1],
|
|
os.path.basename(path))
|
|
|
|
def type_sort(path):
|
|
return (tools.splitext(path)[1], os.path.dirname(path),
|
|
os.path.basename(path))
|
|
key_func = directory_sort if is_directory_sort else type_sort
|
|
self._paths.sort(key=key_func)
|
|
self.is_directory_sort = is_directory_sort
|
|
self.sync_with_filesystem(sync_paths=False)
|
|
|
|
|
|
class Log:
|
|
|
|
GREY_BOLD_STYLE = termstr.CharStyle(termstr.Color.grey_100, is_bold=True)
|
|
GREEN_STYLE = termstr.CharStyle(termstr.Color.green)
|
|
|
|
def __init__(self, appearance_changed_event):
|
|
self._appearance_changed_event = appearance_changed_event
|
|
self.widget = fill3.Column([])
|
|
self.portal = fill3.Portal(self.widget)
|
|
self._appearance_cache = None
|
|
|
|
def log_message(self, message, timestamp=None, char_style=None):
|
|
if isinstance(message, list):
|
|
message = [part[1] if isinstance(part, tuple) else part
|
|
for part in message]
|
|
message = fill3.join("", message)
|
|
if char_style is not None:
|
|
message = termstr.TermStr(message, char_style)
|
|
timestamp = (time.strftime("%H:%M:%S", time.localtime())
|
|
if timestamp is None else timestamp)
|
|
label = fill3.Text(termstr.TermStr(timestamp, Log.GREY_BOLD_STYLE) +
|
|
" " + message)
|
|
self.widget.append(label)
|
|
self.widget.widgets = self.widget[-200:]
|
|
self._appearance_cache = None
|
|
self._appearance_changed_event.set()
|
|
|
|
def log_command(self, message, timestamp=None):
|
|
self.log_message(message, char_style=Log.GREEN_STYLE)
|
|
|
|
def appearance_min(self):
|
|
appearance = self._appearance_cache
|
|
if appearance is None:
|
|
self._appearance_cache = appearance = self.widget.appearance_min()
|
|
return appearance
|
|
|
|
def appearance(self, dimensions):
|
|
width, height = dimensions
|
|
full_appearance = self.appearance_min()
|
|
self.portal.position = (0, max(0, len(full_appearance) - height))
|
|
return self.portal.appearance(dimensions)
|
|
|
|
|
|
def _highlight_chars(str_, style, marker="*"):
|
|
parts = str_.split(marker)
|
|
highlighted_parts = [termstr.TermStr(part[0], style) + part[1:]
|
|
for part in parts[1:] if part != ""]
|
|
return fill3.join("", [parts[0]] + highlighted_parts)
|
|
|
|
|
|
@functools.lru_cache()
|
|
def _get_help_text(is_status_simple=True):
|
|
usage = _highlight_chars(__doc__, Log.GREEN_STYLE)
|
|
return fill3.join("\n", [usage, "Statuses:"] +
|
|
[" " + status_to_str(status, is_status_simple) + " " +
|
|
meaning for status, meaning in tools.STATUS_MEANINGS])
|
|
|
|
|
|
class Help:
|
|
|
|
def __init__(self, summary, screen):
|
|
self.summary = summary
|
|
self.screen = screen
|
|
self.body = fill3.Placeholder()
|
|
self.view = fill3.View.from_widget(self.body)
|
|
self.widget = fill3.Border(self.view, title="Help")
|
|
portal = self.view.portal
|
|
self.key_map = {"h": self.exit_help, "d": portal.scroll_up,
|
|
"c": portal.scroll_down, "j": portal.scroll_left,
|
|
"k": portal.scroll_right, "q": self.exit_help}
|
|
|
|
def exit_help(self):
|
|
self.screen._is_help_visible = False
|
|
|
|
def on_keypressed(self):
|
|
try:
|
|
action = self.key_map[sys.stdin.read(1)]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
action()
|
|
|
|
def appearance(self, dimensions):
|
|
text = _get_help_text(self.summary.is_status_simple)
|
|
self.body.widget = fill3.Text(text)
|
|
return self.widget.appearance(dimensions)
|
|
|
|
|
|
class Listing:
|
|
|
|
def __init__(self, view):
|
|
self.view = view
|
|
self.last_dimensions = None
|
|
|
|
def appearance(self, dimensions):
|
|
self.last_dimensions = dimensions
|
|
return self.view.appearance(dimensions)
|
|
|
|
|
|
class Screen:
|
|
|
|
def __init__(self, summary, log, appearance_changed_event, main_loop):
|
|
self._summary = summary
|
|
self._log = log
|
|
self._appearance_changed_event = appearance_changed_event
|
|
self._main_loop = main_loop
|
|
self._is_listing_portrait = True
|
|
self._is_log_visible = True
|
|
self._is_help_visible = False
|
|
self._is_paused = False
|
|
self._make_widgets()
|
|
self._make_keymap()
|
|
|
|
def _partition(self, widgets, height):
|
|
smaller_height = max(height // 4, 10)
|
|
return [height - smaller_height, smaller_height]
|
|
|
|
def _partition_2(self, widgets, height):
|
|
smaller_height = max(height // 4, 10)
|
|
return [smaller_height, height - smaller_height]
|
|
|
|
def _make_widgets(self):
|
|
self._help_widget = Help(self._summary, self)
|
|
root_path = os.path.basename(self._summary._root_path)
|
|
summary = fill3.Border(self._summary, title="Summary of " + root_path)
|
|
selected_widget = self._summary.get_selection()
|
|
self._view = fill3.View.from_widget(selected_widget.result)
|
|
self._listing = fill3.Border(Listing(self._view))
|
|
log = fill3.Border(self._log, title="Log")
|
|
port_log = fill3.Row([fill3.Column([summary, log], self._partition),
|
|
self._listing])
|
|
land_log = fill3.Column([fill3.Row([summary, log]), self._listing],
|
|
self._partition_2)
|
|
port_no_log = fill3.Row([summary, self._listing])
|
|
land_no_log = fill3.Column([summary, self._listing], self._partition_2)
|
|
self._layouts = [[land_no_log, port_no_log], [land_log, port_log]]
|
|
|
|
def _make_keymap(self):
|
|
key_map = {}
|
|
for keys, action in self._KEY_DATA:
|
|
for key in keys:
|
|
key_map[key] = action
|
|
self._key_map = key_map
|
|
|
|
def toggle_help(self):
|
|
self._is_help_visible = not self._is_help_visible
|
|
|
|
def toggle_log(self):
|
|
self._is_log_visible = not self._is_log_visible
|
|
|
|
def toggle_window_orientation(self):
|
|
self._is_listing_portrait = not self._is_listing_portrait
|
|
|
|
def cursor_up(self):
|
|
self._summary.cursor_up()
|
|
|
|
def cursor_down(self):
|
|
self._summary.cursor_down()
|
|
|
|
def cursor_right(self):
|
|
self._summary.cursor_right()
|
|
|
|
def cursor_left(self):
|
|
self._summary.cursor_left()
|
|
|
|
def _move_listing(self, dx, dy):
|
|
listing_width, listing_height = self._listing.widget.last_dimensions
|
|
selected_widget = self._summary.get_selection()
|
|
x, y = selected_widget.scroll_position
|
|
selected_widget.scroll_position = \
|
|
(max(x + dx * (listing_width // 2), 0),
|
|
max(y + dy * (listing_height // 2), 0))
|
|
|
|
def listing_up(self):
|
|
self._move_listing(0, -1)
|
|
|
|
def listing_down(self):
|
|
self._move_listing(0, 1)
|
|
|
|
def listing_right(self):
|
|
self._move_listing(1, 0)
|
|
|
|
def listing_left(self):
|
|
self._move_listing(-1, 0)
|
|
|
|
def move_to_next_issue(self):
|
|
self._summary.move_to_next_issue()
|
|
|
|
def move_to_next_issue_of_tool(self):
|
|
self._summary.move_to_next_issue_of_tool()
|
|
|
|
def edit_file(self):
|
|
path = self._summary.get_selection().path
|
|
path_colored = tools._path_colored(path)
|
|
self._log.log_message("Editing " + path_colored + " in emacs.")
|
|
subprocess.Popen(["emacsclient", path],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
def toggle_status_style(self):
|
|
self._summary.toggle_status_style()
|
|
|
|
def toggle_sort(self):
|
|
new_sort = not self._summary.is_directory_sort
|
|
sort_order = ("directory then type" if new_sort
|
|
else "type then directory")
|
|
self._log.log_command("Ordering files by %s." % sort_order)
|
|
self._summary.sort(new_sort)
|
|
|
|
def toggle_pause(self):
|
|
self._is_paused = not self._is_paused
|
|
self._log.log_command("Paused work." if self._is_paused else
|
|
"Continuing work...")
|
|
if self._is_paused:
|
|
for runner in self.runners:
|
|
runner.pause()
|
|
else:
|
|
for runner in self.runners:
|
|
runner.continue_()
|
|
|
|
def quit_(self):
|
|
raise KeyboardInterrupt
|
|
|
|
def on_mouse_event(self, event):
|
|
if event[0] not in ["mouse press", "mouse drag"]:
|
|
return
|
|
if event[1] == 4: # Mouse wheel up
|
|
self.listing_up()
|
|
self._appearance_changed_event.set()
|
|
return
|
|
if event[1] == 5: # Mouse wheel down
|
|
self.listing_down()
|
|
self._appearance_changed_event.set()
|
|
return
|
|
x, y = event[2:4]
|
|
border_width = 1
|
|
view_width, view_height = \
|
|
self._summary._view_widget.portal.last_dimensions
|
|
if x < border_width or y < border_width or x > view_width or \
|
|
y > view_height:
|
|
return
|
|
status_width = 1 if self._summary.is_status_simple else 2
|
|
view_x, view_y = self._summary._view_widget.portal.position
|
|
spacer = 1
|
|
column_index = (x - self._summary._max_path_length - spacer -
|
|
border_width + view_x) // status_width
|
|
row_index = y - border_width + view_y
|
|
if row_index >= len(self._summary._column):
|
|
return
|
|
row = self._summary._column[row_index]
|
|
if column_index < 0 or column_index >= len(row):
|
|
return
|
|
new_position = column_index, row_index
|
|
if new_position != self._summary._cursor_position:
|
|
self._summary._cursor_position = new_position
|
|
self._appearance_changed_event.set()
|
|
|
|
def on_keypressed(self, urwid_screen):
|
|
if self._is_help_visible:
|
|
self._help_widget.on_keypressed()
|
|
self._appearance_changed_event.set()
|
|
return
|
|
events = urwid_screen.get_input()
|
|
for event in events:
|
|
if type(event) == tuple:
|
|
self.on_mouse_event(event)
|
|
continue
|
|
try:
|
|
action = self._key_map[event]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
action(self)
|
|
self._appearance_changed_event.set()
|
|
|
|
_STATUS_BAR = _highlight_chars(
|
|
" *help *quit *d,*c,*j,*k:navigate *turn *log *edit *next *pause"
|
|
" *order *statuses", Log.GREEN_STYLE)
|
|
|
|
@functools.lru_cache(maxsize=2)
|
|
def _get_status_bar_appearance(self, width, is_directory_sort, is_paused,
|
|
progress_bar_size):
|
|
ordering_text = "directory" if is_directory_sort else "type "
|
|
paused_indicator = (termstr.TermStr("paused ").fg_color(
|
|
termstr.Color.yellow) if is_paused else termstr.TermStr("running").
|
|
fg_color(termstr.Color.light_blue))
|
|
indicators = " " + paused_indicator + " order:%s " % ordering_text
|
|
spacing = " " * (width - len(self._STATUS_BAR) - len(indicators))
|
|
bar = (self._STATUS_BAR[:width - len(indicators)] + spacing +
|
|
indicators)[:width]
|
|
return [bar[:progress_bar_size].underline() + bar[progress_bar_size:]]
|
|
|
|
def appearance(self, dimensions):
|
|
width, height = max(dimensions[0], 10), max(dimensions[1], 20)
|
|
if self._is_help_visible:
|
|
return self._help_widget.appearance(dimensions)
|
|
widget = self._summary.get_selection()
|
|
view = self._listing.widget.view
|
|
view.position = widget.scroll_position
|
|
view.widget = widget.result
|
|
tool_name = tools._tool_name_colored(widget.tool, widget.path)
|
|
self._listing.title = (
|
|
tools._path_colored(widget.path) + " ─── " + tool_name + " " +
|
|
status_to_str(widget.status, self._summary.is_status_simple))
|
|
incomplete = self._summary.result_total - self._summary.completed_total
|
|
progress_bar_size = max(0, width * incomplete //
|
|
self._summary.result_total)
|
|
status_bar_appearance = self._get_status_bar_appearance(
|
|
width, self._summary.is_directory_sort, self._is_paused,
|
|
progress_bar_size)
|
|
result = (self._layouts[self._is_log_visible]
|
|
[self._is_listing_portrait] .appearance(
|
|
(width, height-len(status_bar_appearance))) +
|
|
status_bar_appearance)
|
|
return (result if (width, height) == dimensions
|
|
else fill3.appearance_resize(result, dimensions))
|
|
|
|
_KEY_DATA = [
|
|
({"t"}, toggle_window_orientation), ({"l"}, toggle_log),
|
|
({"h"}, toggle_help), ({"d", "up"}, cursor_up),
|
|
({"c", "down"}, cursor_down), ({"j", "left"}, cursor_left),
|
|
({"k", "right"}, cursor_right), ({"D", "page up"}, listing_up),
|
|
({"C", "page down"}, listing_down), ({"J", "home"}, listing_left),
|
|
({"K", "end"}, listing_right), ({"o"}, toggle_sort),
|
|
({"n"}, move_to_next_issue), ({"N"}, move_to_next_issue_of_tool),
|
|
({"e"}, edit_file), ({"s"}, toggle_status_style), ({"q"}, quit_),
|
|
({"p"}, toggle_pause)]
|
|
|
|
|
|
def get_cpu_temperature():
|
|
with open("/sys/class/thermal/thermal_zone0/temp", "r") as temp_file:
|
|
return int(temp_file.read()[:-4])
|
|
|
|
|
|
def regulate_temperature(log):
|
|
if get_cpu_temperature() >= 72:
|
|
log.log_message("The computer is too hot. Waiting to cool down...")
|
|
while get_cpu_temperature() > 66:
|
|
time.sleep(1)
|
|
log.log_message("The computer has cooled down. Continuing...")
|
|
|
|
|
|
class Runner:
|
|
|
|
def __init__(self, sandbox, is_already_paused, is_being_tested):
|
|
self.result = None
|
|
self.worker = worker.Worker(sandbox)
|
|
self.is_already_paused = is_already_paused
|
|
self.is_being_tested = is_being_tested
|
|
|
|
def job_runner(self, summary, log, jobs_added_event,
|
|
appearance_changed_event):
|
|
while True:
|
|
jobs_added_event.wait()
|
|
while True:
|
|
# regulate_temperature(log) # My fan is broken
|
|
try:
|
|
self.result = summary.get_closest_placeholder()
|
|
except StopIteration:
|
|
log.log_message("All results are up to date.")
|
|
break
|
|
with contextlib.suppress(ValueError): # Process was terminated
|
|
self.result.run(log, appearance_changed_event, self.worker,
|
|
self)
|
|
summary.completed_total += 1
|
|
if self.is_being_tested and self.result.tool == tools.metadata:
|
|
os.kill(os.getpid(), signal.SIGINT)
|
|
jobs_added_event.clear()
|
|
|
|
def pause(self):
|
|
if self.result is not None and \
|
|
self.result.status == tools.Status.running:
|
|
self.worker.pause()
|
|
self.result.set_status(tools.Status.paused)
|
|
|
|
def continue_(self):
|
|
if self.result is not None and \
|
|
self.result.status == tools.Status.paused:
|
|
self.result.set_status(tools.Status.running)
|
|
self.worker.continue_()
|
|
|
|
|
|
def is_path_excluded(path):
|
|
return any(part.startswith(".") for part in path.split(os.path.sep))
|
|
|
|
|
|
def add_watch_manager_to_mainloop(root_path, mainloop, on_filesystem_change,
|
|
exclude_filter):
|
|
watch_manager = pyinotify.WatchManager()
|
|
event_mask = (pyinotify.IN_CREATE | pyinotify.IN_DELETE |
|
|
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_ATTRIB |
|
|
pyinotify.IN_MOVED_FROM | pyinotify.IN_MOVED_TO)
|
|
watch_manager.add_watch(root_path, event_mask, rec=True, auto_add=True,
|
|
proc_fun=lambda event: None,
|
|
exclude_filter=exclude_filter)
|
|
notifier = pyinotify.Notifier(watch_manager)
|
|
|
|
def on_inotify():
|
|
time.sleep(0.1) # A little time for more events
|
|
notifier.read_events()
|
|
notifier.process_events()
|
|
on_filesystem_change()
|
|
mainloop.add_reader(watch_manager.get_fd(), on_inotify)
|
|
|
|
|
|
_UPDATE_THREAD_STOPPED = False
|
|
|
|
|
|
def update_screen(main_widget, appearance_changed_event):
|
|
while True:
|
|
appearance_changed_event.wait()
|
|
appearance_changed_event.clear()
|
|
if _UPDATE_THREAD_STOPPED:
|
|
break
|
|
fill3.patch_screen(main_widget)
|
|
|
|
|
|
def main(root_path, is_being_tested=False):
|
|
global _UPDATE_THREAD_STOPPED
|
|
loop = asyncio.get_event_loop()
|
|
jobs_added_event = threading.Event()
|
|
appearance_changed_event = threading.Event()
|
|
try:
|
|
pickle_path = os.path.join(_CACHE_PATH, ".summary.pickle")
|
|
with gzip.open(pickle_path, "rb") as file_:
|
|
screen = pickle.load(file_)
|
|
except FileNotFoundError:
|
|
summary = Summary(root_path, jobs_added_event)
|
|
log = Log(appearance_changed_event)
|
|
screen = Screen(summary, log, appearance_changed_event, loop)
|
|
else:
|
|
screen._appearance_changed_event = appearance_changed_event
|
|
screen._main_loop = loop
|
|
summary = screen._summary
|
|
summary._lock = threading.Lock()
|
|
summary._jobs_added_event = jobs_added_event
|
|
log = screen._log
|
|
log._appearance_changed_event = appearance_changed_event
|
|
summary.sync_with_filesystem()
|
|
|
|
def on_filesystem_change():
|
|
log.log_message("Filesystem changed.")
|
|
summary.sync_with_filesystem()
|
|
appearance_changed_event.set()
|
|
add_watch_manager_to_mainloop(root_path, loop, on_filesystem_change,
|
|
is_path_excluded)
|
|
log.log_message("Program started.")
|
|
jobs_added_event.set()
|
|
runners = []
|
|
sandbox_temp_dir = tempfile.mkdtemp()
|
|
sandbox = sandbox_fs.SandboxFs(sandbox_temp_dir)
|
|
|
|
def start_runners():
|
|
log.log_message("Making filesystem sandbox...")
|
|
sandbox.mount()
|
|
log.log_message("Sandbox made.")
|
|
log.log_message("Starting workers...")
|
|
worker_total = multiprocessing.cpu_count() * 2
|
|
for index in range(worker_total):
|
|
runners.append(Runner(sandbox, screen._is_paused, is_being_tested))
|
|
screen.runners = runners
|
|
log.log_message("Workers started. (%s)" % worker_total)
|
|
for runner in runners:
|
|
args = (summary, log, jobs_added_event, appearance_changed_event)
|
|
threading.Thread(target=runner.job_runner, args=args,
|
|
daemon=True).start()
|
|
try:
|
|
threading.Thread(target=start_runners, daemon=True).start()
|
|
|
|
def on_window_resize(n, frame):
|
|
appearance_changed_event.set()
|
|
appearance_changed_event.set()
|
|
update_display_thread = threading.Thread(
|
|
target=update_screen, args=(screen, appearance_changed_event),
|
|
daemon=True)
|
|
with terminal.hidden_cursor():
|
|
with terminal.urwid_screen() as urwid_screen:
|
|
loop.add_reader(sys.stdin, screen.on_keypressed, urwid_screen)
|
|
update_display_thread.start()
|
|
signal.signal(signal.SIGWINCH, on_window_resize)
|
|
try:
|
|
loop.run_forever()
|
|
except KeyboardInterrupt:
|
|
log.log_command("Exiting...")
|
|
time.sleep(0.05)
|
|
_UPDATE_THREAD_STOPPED = True
|
|
appearance_changed_event.set()
|
|
update_display_thread.join()
|
|
log.log_message("Program stopped.")
|
|
for runner in runners:
|
|
runner.pause()
|
|
if runner.result is not None:
|
|
runner.result.reset()
|
|
# Cannot pickle generators, locks, sockets or events.
|
|
(summary.closest_placeholder_generator, summary._lock,
|
|
summary._jobs_added_event, screen._appearance_changed_event,
|
|
screen._main_loop, screen.runners,
|
|
log._appearance_changed_event) = [None] * 7
|
|
open_compressed = functools.partial(gzip.open, compresslevel=1)
|
|
dump_pickle_safe(screen, pickle_path, open=open_compressed)
|
|
finally:
|
|
sandbox.umount()
|
|
os.rmdir(sandbox_temp_dir)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def chdir(path):
|
|
old_cwd = os.getcwd()
|
|
os.chdir(path)
|
|
try:
|
|
yield
|
|
finally:
|
|
os.chdir(old_cwd)
|
|
|
|
|
|
def manage_cache(root_path):
|
|
cache_path = os.path.join(root_path, _CACHE_PATH)
|
|
timestamp_path = os.path.join(cache_path, ".creation-time")
|
|
if os.path.exists(cache_path) and \
|
|
os.stat(__file__).st_mtime > os.stat(timestamp_path).st_mtime:
|
|
print("Vigil has been updated, so clearing the cache and"
|
|
" recalculating all results...")
|
|
shutil.rmtree(cache_path)
|
|
if not os.path.exists(cache_path):
|
|
os.mkdir(cache_path)
|
|
open(timestamp_path, "w").close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) == 2:
|
|
subprocess.call(["sudo", "-p", "Vigil needs sudo to create the filesy"
|
|
"stem sandbox... [sudo] password for %u: ", "true"])
|
|
root_path = os.path.abspath(sys.argv[1])
|
|
with terminal.console_title("vigil: " + os.path.basename(root_path)):
|
|
manage_cache(root_path)
|
|
with chdir(root_path): # FIX: Don't change directory if possible.
|
|
main(root_path)
|
|
else:
|
|
print(_get_help_text())
|