This reverts commit 43e61b538b.
That change caused problems. For example, more jobs running
simultaneously than there should be.
989 lines
36 KiB
Python
Executable file
989 lines
36 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2015 Andrew Hamilton. All rights reserved.
|
|
# Licensed under the Artistic License 2.0.
|
|
|
|
"""\
|
|
Produces a set of reports for every file in a directory tree.
|
|
The reports are produced by many existing command-line tools.
|
|
|
|
The state of each report is also summarised by a status indicator.
|
|
The possible states are listed below.
|
|
|
|
A report is viewed by selecting its status indicator with the cursor.
|
|
|
|
Reports are recalculated whenever files are changed, added, or deleted, and so
|
|
are kept up to date. (optional)
|
|
|
|
The reports are cached in a directory named ".vigil" under the target
|
|
directory.
|
|
|
|
Usage: vigil <root_path>
|
|
|
|
e.g. # vigil my_project
|
|
|
|
Keys:
|
|
*h - Show the help screen. (toggle)
|
|
*d, *c, *j, *k - Move the cursor up, down, left and right.
|
|
*D, *C, *J, *K - Scroll the result pane up, down, left and right.
|
|
*t - Turn the result pane to portrait or landscape orientation. (toggle)
|
|
*l - Show the activity log. (toggle)
|
|
*n - Move to the next issue.
|
|
*N - Move to the next issue of the current tool.
|
|
*o - Order files by type, or by directory location. (toggle)
|
|
*w - Watch the filesystem for changes. (toggle)
|
|
*s - Change the appearance of result statuses. (toggle)
|
|
*q - Quit.
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
import collections
|
|
import functools
|
|
import gc
|
|
import gzip
|
|
import importlib
|
|
import multiprocessing
|
|
import os
|
|
import pickle
|
|
import shutil
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
|
|
import pyinotify
|
|
|
|
import fill3
|
|
import terminal
|
|
import termstr
|
|
import tools
|
|
|
|
|
|
_LOG_PATH = os.path.join(os.getcwd(), "vigil.log")
|
|
|
|
|
|
def _log_error(message=None):
|
|
message = traceback.format_exc() if message is None else message + "\n"
|
|
with open(_LOG_PATH, "a") as log_file:
|
|
log_file.write(message)
|
|
|
|
|
|
_CACHE_PATH = ".vigil"
|
|
|
|
|
|
def lru_cache_with_eviction(maxsize=128, typed=False):
|
|
versions = {}
|
|
make_key = functools._make_key
|
|
|
|
def evict(*args, **kwds):
|
|
key = make_key(args, kwds, typed)
|
|
if key in versions:
|
|
versions[key] += 1
|
|
|
|
def decorating_function(user_function):
|
|
|
|
def remove_version(*args, **kwds):
|
|
return user_function(*args[1:], **kwds)
|
|
new_func = functools.lru_cache(maxsize=maxsize, typed=typed)(
|
|
remove_version)
|
|
|
|
def add_version(*args, **kwds):
|
|
key = make_key(args, kwds, typed)
|
|
return new_func(*((versions.setdefault(key, 0),) + args), **kwds)
|
|
add_version.versions = versions
|
|
add_version.cache_info = new_func.cache_info
|
|
add_version.evict = evict
|
|
return functools.update_wrapper(add_version, user_function)
|
|
return decorating_function
|
|
|
|
|
|
def dump_pickle_safe(object_, path, protocol=pickle.HIGHEST_PROTOCOL,
|
|
open=open):
|
|
tmp_path = path + ".tmp"
|
|
try:
|
|
with open(tmp_path, "wb") as file_:
|
|
pickle.dump(object_, file_, protocol=protocol)
|
|
except OSError:
|
|
os.remove(tmp_path)
|
|
else:
|
|
os.rename(tmp_path, path)
|
|
|
|
|
|
def multiprocessing_process(func, *args, **kwargs):
|
|
def wrapper(child_conn, func, args, **kwargs):
|
|
result = func(*args, **kwargs)
|
|
child_conn.send(result)
|
|
child_conn.close()
|
|
parent_conn, child_conn = multiprocessing.Pipe()
|
|
process = multiprocessing.Process(
|
|
target=wrapper, args=(child_conn, func, args), kwargs=kwargs,
|
|
daemon=True)
|
|
process.start()
|
|
process.result_conn = parent_conn
|
|
return process
|
|
|
|
|
|
def status_to_str(status, is_status_simple):
|
|
if isinstance(status, int): # is a status enumeration
|
|
dict_ = (tools._STATUS_TO_TERMSTR_SIMPLE if is_status_simple
|
|
else tools._STATUS_TO_TERMSTR)
|
|
return dict_[status]
|
|
else:
|
|
return status
|
|
|
|
|
|
class Result:
|
|
|
|
def __init__(self, path, tool, is_stored_compressed=True):
|
|
self.path = path
|
|
self.tool = tool
|
|
self._open_func = gzip.open if is_stored_compressed else open
|
|
self.pickle_path = os.path.join(_CACHE_PATH,
|
|
path + "-" + tool.__name__)
|
|
self.scroll_position = (0, 0)
|
|
self.is_completed = False
|
|
self.reset()
|
|
|
|
def __del__(self):
|
|
try:
|
|
os.remove(self.pickle_path)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
@property
|
|
@lru_cache_with_eviction(maxsize=50)
|
|
def result(self):
|
|
unknown_label = fill3.Text("?")
|
|
if self.is_placeholder:
|
|
return unknown_label
|
|
try:
|
|
with self._open_func(self.pickle_path, "rb") as pickle_file:
|
|
return pickle.load(pickle_file)
|
|
except FileNotFoundError:
|
|
return unknown_label
|
|
|
|
@result.setter
|
|
def result(self, value):
|
|
os.makedirs(os.path.dirname(self.pickle_path), exist_ok=True)
|
|
dump_pickle_safe(value, self.pickle_path, open=self._open_func)
|
|
Result.result.fget.evict(self)
|
|
|
|
def set_status(self, status, appearance_changed_event):
|
|
self.status = status
|
|
appearance_changed_event.set()
|
|
self.entry.appearance_cache = None
|
|
|
|
def run(self, log, appearance_changed_event):
|
|
self.is_placeholder = False
|
|
tool_name = tools._tool_name_colored(self.tool, self.path)
|
|
path_colored = tools._path_colored(self.path)
|
|
log.log_message(["Running ", tool_name, " on ", path_colored, "."])
|
|
self.set_status(tools.Status.running, appearance_changed_event)
|
|
start_time = time.time()
|
|
self.process = multiprocessing_process(
|
|
tools.run_tool_no_error, self.path, self.tool)
|
|
new_status, result = self.process.result_conn.recv()
|
|
self.status, self.result = new_status, result
|
|
self.process = None
|
|
end_time = time.time()
|
|
self.set_status(new_status, appearance_changed_event)
|
|
self.is_completed = True
|
|
log.log_message(
|
|
["Finished running ", tool_name, " on ", path_colored, ". ",
|
|
status_to_str(new_status, self.entry.summary.is_status_simple),
|
|
" %s secs" % round(end_time - start_time, 2)])
|
|
|
|
def reset(self):
|
|
self.is_placeholder = True
|
|
self.status = tools.Status.empty
|
|
try:
|
|
self.process.terminate()
|
|
except AttributeError:
|
|
pass
|
|
self.process = None
|
|
|
|
def appearance_min(self):
|
|
return [status_to_str(self.status,
|
|
self.entry.summary.is_status_simple)]
|
|
|
|
|
|
def reverse_style(style):
|
|
return termstr.CharStyle(style.bg_color, style.fg_color, style.is_bold,
|
|
style.is_underlined)
|
|
|
|
|
|
class Entry(collections.UserList):
|
|
|
|
def __init__(self, path, results, summary, highlighted=None,
|
|
set_results=True):
|
|
collections.UserList.__init__(self, results)
|
|
self.path = path
|
|
self.summary = summary
|
|
self.highlighted = highlighted
|
|
self.widgets = self.data
|
|
if set_results:
|
|
# FIX: this is missed for entries appended later
|
|
for result in results:
|
|
result.entry = self
|
|
self.widget = fill3.Row(results)
|
|
self.appearance_cache = None
|
|
|
|
def appearance_min(self):
|
|
# 'appearance' local variable exists because appearance_cache can
|
|
# become None at any time.
|
|
appearance = self.appearance_cache
|
|
if appearance is None:
|
|
if self.highlighted is not None:
|
|
if self.summary.is_status_simple:
|
|
cursor = fill3.Text("●")
|
|
else:
|
|
cursor = fill3.Style(self.widget[self.highlighted],
|
|
reverse_style)
|
|
self.widget[self.highlighted] = cursor
|
|
new_appearance = self.widget.appearance_min()
|
|
path = tools._path_colored(self.path)
|
|
padding = " " * (self.summary._max_path_length - len(path) + 1)
|
|
new_appearance[0] = path + padding + new_appearance[0]
|
|
self.appearance_cache = appearance = new_appearance
|
|
return appearance
|
|
|
|
|
|
def is_filename_excluded(filename):
|
|
return filename.startswith(".")
|
|
|
|
|
|
def codebase_files(path, skip_hidden_directories=True):
|
|
for (dirpath, dirnames, filenames) in os.walk(path):
|
|
if skip_hidden_directories:
|
|
filtered_dirnames = [dirname for dirname in dirnames
|
|
if not is_filename_excluded(dirname)]
|
|
dirnames[:] = filtered_dirnames
|
|
for filename in filenames:
|
|
if not is_filename_excluded(filename):
|
|
yield os.path.join(dirpath, filename)
|
|
|
|
|
|
def fix_paths(root_path, paths):
|
|
return [os.path.join(".", os.path.relpath(path, root_path))
|
|
for path in paths]
|
|
|
|
|
|
def change_background(str_, new_background):
|
|
|
|
def change_background_style(style):
|
|
new_bg = (new_background if style.bg_color == termstr.Color.black
|
|
else style.bg_color)
|
|
return termstr.CharStyle(style.fg_color, new_bg, style.is_bold,
|
|
style.is_underlined)
|
|
return termstr.TermStr(str_).transform_style(change_background_style)
|
|
|
|
|
|
class Summary:
|
|
|
|
def __init__(self, root_path, jobs_added_event):
|
|
self._root_path = root_path
|
|
self._jobs_added_event = jobs_added_event
|
|
self._view_widget = fill3.View.from_widget(self)
|
|
self.__cursor_position = (0, 0)
|
|
self.closest_placeholder_generator = None
|
|
self._lock = threading.Lock()
|
|
self._cache = {}
|
|
self.is_status_simple = False
|
|
self.is_directory_sort = True
|
|
self._max_width = None
|
|
self._max_path_length = None
|
|
self.sync_with_filesystem()
|
|
|
|
@property
|
|
def _cursor_position(self):
|
|
return self.__cursor_position
|
|
|
|
@_cursor_position.setter
|
|
def _cursor_position(self, new_position):
|
|
if new_position != self.__cursor_position:
|
|
self.__cursor_position = new_position
|
|
self.closest_placeholder_generator = None
|
|
|
|
def sync_with_filesystem(self, sync_tools=True, sync_paths=True):
|
|
if sync_tools:
|
|
importlib.reload(tools)
|
|
x, y = self._cursor_position
|
|
try:
|
|
old_path = self.get_selection().path
|
|
except AttributeError:
|
|
old_path = None
|
|
new_column = fill3.Column([])
|
|
new_cache = {}
|
|
if sync_paths:
|
|
paths = fix_paths(self._root_path,
|
|
codebase_files(self._root_path))
|
|
self._paths = paths
|
|
self.sort(self.is_directory_sort)
|
|
else:
|
|
paths = self._paths
|
|
jobs_added = False
|
|
new_cursor_position = (0, 0)
|
|
row_index = 0
|
|
result_total, completed_total = 0, 0
|
|
for path in paths:
|
|
full_path = os.path.join(self._root_path, path)
|
|
try:
|
|
key = (path, os.stat(full_path).st_ctime)
|
|
except FileNotFoundError:
|
|
continue
|
|
if path == old_path:
|
|
new_cursor_position = (x, row_index)
|
|
row = []
|
|
for tool in tools.tools_for_path(path):
|
|
cache_key = (key, tool.__name__, tool.__code__.co_code)
|
|
if cache_key in self._cache:
|
|
result = self._cache[cache_key]
|
|
result.tool = tool
|
|
else:
|
|
result = Result(path, tool)
|
|
jobs_added = True
|
|
if result.is_completed:
|
|
completed_total += 1
|
|
new_cache[cache_key] = result
|
|
row.append(result)
|
|
new_column.append(Entry(path, row, self))
|
|
row_index += 1
|
|
result_total += len(row)
|
|
max_width = max(len(row) for row in new_column)
|
|
max_path_length = max(len(path) for path in paths) - len("./")
|
|
self._column, self._cache, self._cursor_position, self.result_total, \
|
|
self.completed_total, self._max_width, self._max_path_length, \
|
|
self.closest_placeholder_generator = (
|
|
new_column, new_cache, new_cursor_position, result_total,
|
|
completed_total, max_width, max_path_length, None)
|
|
if jobs_added:
|
|
self._jobs_added_event.set()
|
|
# Delete the stale results from the disk now, to avoid accidently
|
|
# deleting a future result with the same filename. See Result.__del__.
|
|
gc.collect()
|
|
|
|
def placeholder_spiral(self):
|
|
x, y = self.cursor_position()
|
|
result = self._column[y][x]
|
|
if result.is_placeholder:
|
|
yield result
|
|
for lap in range(max(len(self._column), self._max_width)):
|
|
y -= 1
|
|
for dx, dy in [(1, 1), (-1, 1), (-1, -1), (1, -1)]:
|
|
for move in range(lap + 1):
|
|
x += dx
|
|
y += dy
|
|
try:
|
|
result = self._column[y][x]
|
|
except IndexError:
|
|
continue
|
|
if result.is_placeholder:
|
|
yield result
|
|
|
|
def get_closest_placeholder(self):
|
|
with self._lock:
|
|
try:
|
|
return self.closest_placeholder_generator.send(None)
|
|
except AttributeError:
|
|
self.closest_placeholder_generator = self.placeholder_spiral()
|
|
return self.closest_placeholder_generator.send(None)
|
|
|
|
def appearance_dimensions(self):
|
|
status_width = 1 if self.is_status_simple else 2
|
|
width = self._max_path_length + 1 + status_width * self._max_width
|
|
return width, len(self._column)
|
|
|
|
def appearance_interval(self, interval):
|
|
start_y, end_y = interval
|
|
x, y = self.cursor_position()
|
|
rows = fill3.Column(self._column.widgets)
|
|
rows[y] = Entry(rows[y].path, rows[y].widgets, self, highlighted=x,
|
|
set_results=False)
|
|
return rows.appearance_interval(interval)
|
|
|
|
def appearance(self, dimensions):
|
|
width, height = dimensions
|
|
x, y = self.cursor_position()
|
|
status_width = 1 if self.is_status_simple else 2
|
|
screen_x, screen_y = self._max_path_length + 1 + x * status_width, y
|
|
width, height = width - 1, height - 1 # Minus one for the scrollbars
|
|
scroll_y = (screen_y // height) * height
|
|
self._view_widget.position = ((screen_x // width) * width, scroll_y)
|
|
appearance = self._view_widget.appearance(dimensions)
|
|
appearance[screen_y - scroll_y] = change_background(
|
|
appearance[screen_y - scroll_y], termstr.Color.grey_50)
|
|
return appearance
|
|
|
|
def cursor_position(self):
|
|
x, y = self._cursor_position
|
|
return min(x, len(self._column[y])-1), y
|
|
|
|
def get_selection(self):
|
|
x, y = self.cursor_position()
|
|
return self._column[y][x]
|
|
|
|
def _move_cursor(self, dx, dy):
|
|
if dy == 0:
|
|
x, y = self.cursor_position()
|
|
self._cursor_position = ((x + dx) % len(self._column[y]), y)
|
|
elif dx == 0:
|
|
x, y = self._cursor_position
|
|
self._cursor_position = (x, (y + dy) % len(self._column))
|
|
else:
|
|
raise ValueError
|
|
|
|
def cursor_right(self):
|
|
self._move_cursor(1, 0)
|
|
|
|
def cursor_left(self):
|
|
self._move_cursor(-1, 0)
|
|
|
|
def cursor_up(self):
|
|
self._move_cursor(0, -1)
|
|
|
|
def cursor_down(self):
|
|
self._move_cursor(0, 1)
|
|
|
|
def _issue_generator(self):
|
|
x, y = self.cursor_position()
|
|
for index in range(len(self._column) + 1):
|
|
row_index = (index + y) % len(self._column)
|
|
row = self._column[row_index]
|
|
for index_x, result in enumerate(row):
|
|
if (result.status == tools.Status.failure and
|
|
not (row_index == y and index_x <= x and
|
|
index != len(self._column))):
|
|
yield result, (index_x, row_index)
|
|
|
|
def move_to_next_issue(self):
|
|
try:
|
|
issue, self._cursor_position = self._issue_generator().send(None)
|
|
except StopIteration:
|
|
pass
|
|
|
|
def move_to_next_issue_of_tool(self):
|
|
current_tool = self.get_selection().tool
|
|
for issue, position in self._issue_generator():
|
|
if issue.tool == current_tool:
|
|
self._cursor_position = position
|
|
return
|
|
|
|
def toggle_status_style(self):
|
|
self.is_status_simple = not self.is_status_simple
|
|
self.sync_with_filesystem(sync_tools=False, sync_paths=False)
|
|
|
|
def sort(self, is_directory_sort):
|
|
def directory_sort(path):
|
|
return (os.path.dirname(path), tools.splitext(path)[1],
|
|
os.path.basename(path))
|
|
|
|
def type_sort(path):
|
|
return (tools.splitext(path)[1], os.path.dirname(path),
|
|
os.path.basename(path))
|
|
key_func = directory_sort if is_directory_sort else type_sort
|
|
self._paths.sort(key=key_func)
|
|
self.is_directory_sort = is_directory_sort
|
|
self.sync_with_filesystem(sync_tools=False, sync_paths=False)
|
|
|
|
|
|
class Log:
|
|
|
|
GREY_BOLD_STYLE = termstr.CharStyle(termstr.Color.grey_100, is_bold=True)
|
|
GREEN_STYLE = termstr.CharStyle(termstr.Color.green)
|
|
|
|
def __init__(self, appearance_changed_event):
|
|
self._appearance_changed_event = appearance_changed_event
|
|
self.widget = fill3.Column([])
|
|
self.portal = fill3.Portal(self.widget)
|
|
self._appearance_cache = None
|
|
|
|
def log_message(self, message, timestamp=None, char_style=None):
|
|
if isinstance(message, list):
|
|
message = [part[1] if isinstance(part, tuple) else part
|
|
for part in message]
|
|
message = fill3.join("", message)
|
|
if char_style is not None:
|
|
message = termstr.TermStr(message, char_style)
|
|
timestamp = (time.strftime("%H:%M:%S", time.localtime())
|
|
if timestamp is None else timestamp)
|
|
label = fill3.Text(termstr.TermStr(timestamp, Log.GREY_BOLD_STYLE) +
|
|
" " + message)
|
|
self.widget.append(label)
|
|
self.widget.widgets = self.widget[-200:]
|
|
self._appearance_cache = None
|
|
self._appearance_changed_event.set()
|
|
|
|
def log_command(self, message, timestamp=None):
|
|
self.log_message(message, char_style=Log.GREEN_STYLE)
|
|
|
|
def appearance_min(self):
|
|
appearance = self._appearance_cache
|
|
if appearance is None:
|
|
self._appearance_cache = appearance = self.widget.appearance_min()
|
|
return appearance
|
|
|
|
def appearance(self, dimensions):
|
|
width, height = dimensions
|
|
full_appearance = self.appearance_min()
|
|
self.portal.position = (0, max(0, len(full_appearance) - height))
|
|
return self.portal.appearance(dimensions)
|
|
|
|
|
|
def _highlight_chars(str_, style, marker="*"):
|
|
parts = str_.split(marker)
|
|
highlighted_parts = [termstr.TermStr(part[0], style) + part[1:]
|
|
for part in parts[1:] if part != ""]
|
|
return fill3.join("", [parts[0]] + highlighted_parts)
|
|
|
|
|
|
class Help:
|
|
|
|
def __init__(self, summary, screen):
|
|
self.summary = summary
|
|
self.screen = screen
|
|
self.body = fill3.Placeholder()
|
|
self.view = fill3.View.from_widget(self.body)
|
|
self.widget = fill3.Border(self.view, title="Help")
|
|
self.usage = _highlight_chars(__doc__, Log.GREEN_STYLE)
|
|
portal = self.view.portal
|
|
self.key_map = {"h": self.exit_help, "d": portal.scroll_up,
|
|
"c": portal.scroll_down, "j": portal.scroll_left,
|
|
"k": portal.scroll_right, "q": self.exit_help}
|
|
|
|
def exit_help(self):
|
|
self.screen._is_help_visible = False
|
|
|
|
def on_keypressed(self):
|
|
try:
|
|
action = self.key_map[sys.stdin.read(1)]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
action()
|
|
|
|
def appearance(self, dimensions):
|
|
text = fill3.join(
|
|
"\n", [self.usage, "Statuses:"] +
|
|
[" " + status_to_str(status, self.summary.is_status_simple) +
|
|
" " + meaning for status, meaning in tools.STATUS_MEANINGS])
|
|
self.body.widget = fill3.Text(text)
|
|
return self.widget.appearance(dimensions)
|
|
|
|
|
|
class Listing:
|
|
|
|
def __init__(self, view):
|
|
self.view = view
|
|
self.last_dimensions = None
|
|
|
|
def appearance(self, dimensions):
|
|
self.last_dimensions = dimensions
|
|
return self.view.appearance(dimensions)
|
|
|
|
|
|
def add_watch_manager_to_mainloop(watch_manager, mainloop):
|
|
notifier = pyinotify.Notifier(watch_manager)
|
|
|
|
def on_inotify():
|
|
notifier.read_events()
|
|
notifier.process_events()
|
|
mainloop.add_reader(watch_manager.get_fd(), on_inotify)
|
|
|
|
|
|
def is_path_excluded(path):
|
|
return any(part.startswith(".") for part in path.split(os.path.sep))
|
|
|
|
|
|
class Screen:
|
|
|
|
def __init__(self, summary, log, appearance_changed_event, main_loop):
|
|
self._summary = summary
|
|
self._log = log
|
|
self._appearance_changed_event = appearance_changed_event
|
|
self._main_loop = main_loop
|
|
self._is_listing_portrait = True
|
|
self._is_log_visible = True
|
|
self._is_help_visible = False
|
|
self._is_watching_filesystem = False
|
|
self.toggle_watch_filesystem()
|
|
self._make_widgets()
|
|
self._make_keymap()
|
|
|
|
def make_watch_manager(self):
|
|
|
|
def on_filesystem_change(event):
|
|
self._log.log_message("Filesystem changed.")
|
|
self._summary.sync_with_filesystem(sync_tools=False)
|
|
self._appearance_changed_event.set()
|
|
|
|
def on_tools_change(event):
|
|
self._log.log_message("Tools changed.")
|
|
self._summary.sync_with_filesystem(sync_paths=False)
|
|
self._appearance_changed_event.set()
|
|
watch_manager = pyinotify.WatchManager()
|
|
event_mask = (pyinotify.IN_CREATE | pyinotify.IN_DELETE |
|
|
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_ATTRIB)
|
|
watch_manager.add_watch(
|
|
self._summary._root_path, event_mask, rec=True, auto_add=True,
|
|
proc_fun=on_filesystem_change, exclude_filter=lambda path:
|
|
is_path_excluded(path))
|
|
watch_manager.add_watch(tools.__file__, event_mask,
|
|
proc_fun=on_tools_change)
|
|
self._watch_manager = watch_manager
|
|
add_watch_manager_to_mainloop(self._watch_manager, self._main_loop)
|
|
|
|
def _partition(self, widgets, height):
|
|
smaller_height = max(height // 4, 10)
|
|
return [height - smaller_height, smaller_height]
|
|
|
|
def _partition_2(self, widgets, height):
|
|
smaller_height = max(height // 4, 10)
|
|
return [smaller_height, height - smaller_height]
|
|
|
|
def _make_widgets(self):
|
|
self._help_widget = Help(self._summary, self)
|
|
root_path = os.path.basename(self._summary._root_path)
|
|
summary = fill3.Border(self._summary, title="Summary of " + root_path)
|
|
selected_widget = self._summary.get_selection()
|
|
self._view = fill3.View.from_widget(selected_widget.result)
|
|
self._listing = fill3.Border(Listing(self._view))
|
|
log = fill3.Border(self._log, title="Log")
|
|
port_log = fill3.Row([fill3.Column([summary, log], self._partition),
|
|
self._listing])
|
|
land_log = fill3.Column([fill3.Row([summary, log]), self._listing],
|
|
self._partition_2)
|
|
port_no_log = fill3.Row([summary, self._listing])
|
|
land_no_log = fill3.Column([summary, self._listing], self._partition_2)
|
|
self._layouts = [[land_no_log, port_no_log], [land_log, port_log]]
|
|
|
|
def _make_keymap(self):
|
|
key_map = {}
|
|
for keys, action in self._KEY_DATA:
|
|
for key in keys:
|
|
key_map[key] = action
|
|
self._key_map = key_map
|
|
|
|
def toggle_help(self):
|
|
self._is_help_visible = not self._is_help_visible
|
|
|
|
def toggle_log(self):
|
|
self._is_log_visible = not self._is_log_visible
|
|
|
|
def toggle_window_orientation(self):
|
|
self._is_listing_portrait = not self._is_listing_portrait
|
|
|
|
def cursor_up(self):
|
|
self._summary.cursor_up()
|
|
|
|
def cursor_down(self):
|
|
self._summary.cursor_down()
|
|
|
|
def cursor_right(self):
|
|
self._summary.cursor_right()
|
|
|
|
def cursor_left(self):
|
|
self._summary.cursor_left()
|
|
|
|
def _move_listing(self, dx, dy):
|
|
listing_width, listing_height = self._listing.widget.last_dimensions
|
|
selected_widget = self._summary.get_selection()
|
|
x, y = selected_widget.scroll_position
|
|
selected_widget.scroll_position = \
|
|
(max(x + dx * (listing_width // 2), 0),
|
|
max(y + dy * (listing_height // 2), 0))
|
|
|
|
def listing_up(self):
|
|
self._move_listing(0, -1)
|
|
|
|
def listing_down(self):
|
|
self._move_listing(0, 1)
|
|
|
|
def listing_right(self):
|
|
self._move_listing(1, 0)
|
|
|
|
def listing_left(self):
|
|
self._move_listing(-1, 0)
|
|
|
|
def move_to_next_issue(self):
|
|
self._summary.move_to_next_issue()
|
|
|
|
def move_to_next_issue_of_tool(self):
|
|
self._summary.move_to_next_issue_of_tool()
|
|
|
|
def edit_file(self):
|
|
path = self._summary.get_selection().path
|
|
path_colored = tools._path_colored(path)
|
|
self._log.log_message("Editing " + path_colored + " in emacs.")
|
|
subprocess.Popen(["emacsclient", path],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
def toggle_status_style(self):
|
|
self._summary.toggle_status_style()
|
|
|
|
def toggle_sort(self):
|
|
new_sort = not self._summary.is_directory_sort
|
|
sort_order = ("directory then type" if new_sort
|
|
else "type then directory")
|
|
self._log.log_command("Ordering files by %s." % sort_order)
|
|
self._summary.sort(new_sort)
|
|
|
|
def toggle_watch_filesystem(self):
|
|
self._is_watching_filesystem = not self._is_watching_filesystem
|
|
self._log.log_command("Watching the filesystem for changes."
|
|
if self._is_watching_filesystem else
|
|
"Stopped watching the filesystem.")
|
|
if self._is_watching_filesystem:
|
|
self._summary.sync_with_filesystem()
|
|
self.make_watch_manager()
|
|
else:
|
|
self._main_loop.remove_reader(self._watch_manager.get_fd())
|
|
self._watch_manager = None
|
|
|
|
def quit_(self):
|
|
raise KeyboardInterrupt
|
|
|
|
def on_mouse_event(self, event):
|
|
if event[0] not in ["mouse press", "mouse drag"]:
|
|
return
|
|
if event[1] == 4: # Mouse wheel up
|
|
self.listing_up()
|
|
self._appearance_changed_event.set()
|
|
return
|
|
if event[1] == 5: # Mouse wheel down
|
|
self.listing_down()
|
|
self._appearance_changed_event.set()
|
|
return
|
|
x, y = event[2:4]
|
|
border_width = 1
|
|
view_width, view_height = \
|
|
self._summary._view_widget.portal.last_dimensions
|
|
if x < border_width or y < border_width or x > view_width or \
|
|
y > view_height:
|
|
return
|
|
status_width = 1 if self._summary.is_status_simple else 2
|
|
view_x, view_y = self._summary._view_widget.portal.position
|
|
spacer = 1
|
|
column_index = (x - self._summary._max_path_length - spacer -
|
|
border_width + view_x) // status_width
|
|
row_index = y - border_width + view_y
|
|
if row_index >= len(self._summary._column):
|
|
return
|
|
row = self._summary._column[row_index]
|
|
if column_index < 0 or column_index >= len(row):
|
|
return
|
|
new_position = column_index, row_index
|
|
if new_position != self._summary._cursor_position:
|
|
self._summary._cursor_position = new_position
|
|
self._appearance_changed_event.set()
|
|
|
|
def on_keypressed(self, urwid_screen):
|
|
if self._is_help_visible:
|
|
self._help_widget.on_keypressed()
|
|
self._appearance_changed_event.set()
|
|
return
|
|
events = urwid_screen.get_input()
|
|
for event in events:
|
|
if type(event) == tuple:
|
|
self.on_mouse_event(event)
|
|
continue
|
|
try:
|
|
action = self._key_map[event]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
action(self)
|
|
self._appearance_changed_event.set()
|
|
|
|
_STATUS_BAR = _highlight_chars(" *help *quit *d,*c,*j,*k:navigate *turn"
|
|
" *log *edit *next *watch *order *statuses",
|
|
Log.GREEN_STYLE)
|
|
|
|
@functools.lru_cache(maxsize=2)
|
|
def _get_status_bar_appearance(self, width, is_directory_sort,
|
|
is_watching_filesystem, progress_bar_size):
|
|
ordering_text = "directory" if is_directory_sort else "type "
|
|
watching_text = "watching" if is_watching_filesystem else "--------"
|
|
indicators = " %s order:%s " % (watching_text, ordering_text)
|
|
spacing = " " * (width - len(self._STATUS_BAR) - len(indicators))
|
|
bar = (self._STATUS_BAR[:width - len(indicators)] + spacing +
|
|
indicators)[:width]
|
|
return [bar[:progress_bar_size].underline() + bar[progress_bar_size:]]
|
|
|
|
def appearance(self, dimensions):
|
|
width, height = dimensions
|
|
if self._is_help_visible:
|
|
return self._help_widget.appearance(dimensions)
|
|
widget = self._summary.get_selection()
|
|
view = self._listing.widget.view
|
|
view.position = widget.scroll_position
|
|
view.widget = widget.result
|
|
tool_name = tools._tool_name_colored(widget.tool, widget.path)
|
|
self._listing.title = (
|
|
tools._path_colored(widget.path) + " ─── " + tool_name + " " +
|
|
status_to_str(widget.status, self._summary.is_status_simple))
|
|
incomplete = self._summary.result_total - self._summary.completed_total
|
|
progress_bar_size = max(0, width * incomplete //
|
|
self._summary.result_total)
|
|
status_bar_appearance = self._get_status_bar_appearance(
|
|
width, self._summary.is_directory_sort,
|
|
self._is_watching_filesystem, progress_bar_size)
|
|
return (self._layouts[self._is_log_visible][self._is_listing_portrait]
|
|
.appearance((width, height-len(status_bar_appearance))) +
|
|
status_bar_appearance)
|
|
|
|
_KEY_DATA = [
|
|
({"t"}, toggle_window_orientation), ({"l"}, toggle_log),
|
|
({"h"}, toggle_help), ({"d", "up"}, cursor_up),
|
|
({"c", "down"}, cursor_down), ({"j", "left"}, cursor_left),
|
|
({"k", "right"}, cursor_right), ({"D", "page up"}, listing_up),
|
|
({"C", "page down"}, listing_down), ({"J", "home"}, listing_left),
|
|
({"K", "end"}, listing_right), ({"o"}, toggle_sort),
|
|
({"n"}, move_to_next_issue), ({"N"}, move_to_next_issue_of_tool),
|
|
({"e"}, edit_file), ({"s"}, toggle_status_style),
|
|
({"w"}, toggle_watch_filesystem), ({"q"}, quit_)]
|
|
|
|
|
|
def get_cpu_temperature():
|
|
with open("/sys/class/thermal/thermal_zone0/temp", "r") as temp_file:
|
|
return int(temp_file.read()[:-4])
|
|
|
|
|
|
def regulate_temperature(log):
|
|
if get_cpu_temperature() >= 72:
|
|
log.log_message("The computer is too hot. Waiting to cool down...")
|
|
while get_cpu_temperature() > 66:
|
|
time.sleep(1)
|
|
log.log_message("The computer has cooled down. Continuing...")
|
|
|
|
|
|
class Runner:
|
|
|
|
def __init__(self):
|
|
self.result = None
|
|
self.is_running = True
|
|
|
|
def job_runner(self, summary, log, jobs_added_event,
|
|
appearance_changed_event):
|
|
while True:
|
|
jobs_added_event.wait()
|
|
while self.is_running:
|
|
# regulate_temperature(log) # My fan is broken
|
|
try:
|
|
self.result = summary.get_closest_placeholder()
|
|
except StopIteration:
|
|
log.log_message("All results are up to date.")
|
|
break
|
|
try:
|
|
self.result.run(log, appearance_changed_event)
|
|
summary.completed_total += 1
|
|
except EOFError: # Occurs if the process is terminated
|
|
pass
|
|
jobs_added_event.clear()
|
|
|
|
|
|
_UPDATE_THREAD_STOPPED = False
|
|
|
|
|
|
def update_screen(main_widget, appearance_changed_event):
|
|
while True:
|
|
appearance_changed_event.wait()
|
|
appearance_changed_event.clear()
|
|
if _UPDATE_THREAD_STOPPED:
|
|
break
|
|
fill3.patch_screen(main_widget)
|
|
|
|
|
|
def main(root_path, urwid_screen):
|
|
global _UPDATE_THREAD_STOPPED
|
|
os.chdir(root_path) # FIX: Don't change directory if possible.
|
|
loop = asyncio.get_event_loop()
|
|
jobs_added_event = threading.Event()
|
|
appearance_changed_event = threading.Event()
|
|
try:
|
|
pickle_path = os.path.join(_CACHE_PATH, ".summary.pickle")
|
|
with gzip.open(pickle_path, "rb") as file_:
|
|
screen = pickle.load(file_)
|
|
except FileNotFoundError:
|
|
summary = Summary(root_path, jobs_added_event)
|
|
log = Log(appearance_changed_event)
|
|
screen = Screen(summary, log, appearance_changed_event, loop)
|
|
else:
|
|
screen._appearance_changed_event = appearance_changed_event
|
|
screen._main_loop = loop
|
|
if screen._is_watching_filesystem:
|
|
screen.make_watch_manager()
|
|
summary = screen._summary
|
|
summary._lock = threading.Lock()
|
|
summary._jobs_added_event = jobs_added_event
|
|
log = screen._log
|
|
log._appearance_changed_event = appearance_changed_event
|
|
if screen._is_watching_filesystem:
|
|
summary.sync_with_filesystem()
|
|
log.log_message("Program started.")
|
|
jobs_added_event.set()
|
|
update_display_thread = threading.Thread(
|
|
target=update_screen, args=(screen, appearance_changed_event),
|
|
daemon=True)
|
|
update_display_thread.start()
|
|
loop.add_reader(sys.stdin, screen.on_keypressed, urwid_screen)
|
|
runners = [Runner() for index in range(multiprocessing.cpu_count() * 2)]
|
|
screen.runners = runners
|
|
for runner in runners:
|
|
args = (summary, log, jobs_added_event, appearance_changed_event)
|
|
threading.Thread(target=runner.job_runner, args=args,
|
|
daemon=True).start()
|
|
|
|
def on_window_resize(n, frame):
|
|
appearance_changed_event.set()
|
|
signal.signal(signal.SIGWINCH, on_window_resize)
|
|
appearance_changed_event.set()
|
|
try:
|
|
loop.run_forever()
|
|
except KeyboardInterrupt:
|
|
log.log_message("Program stopped.")
|
|
_UPDATE_THREAD_STOPPED = True
|
|
appearance_changed_event.set()
|
|
update_display_thread.join()
|
|
for runner in runners:
|
|
runner.is_running = False
|
|
runner.result.reset()
|
|
# Cannot pickle generators, locks, sockets or events.
|
|
summary.closest_placeholder_generator = None
|
|
summary._lock = None
|
|
summary._jobs_added_event = None
|
|
screen._appearance_changed_event = None
|
|
screen._main_loop = None
|
|
screen._watch_manager = None
|
|
log._appearance_changed_event = None
|
|
open_compressed = functools.partial(gzip.open, compresslevel=1)
|
|
dump_pickle_safe(screen, pickle_path, open=open_compressed)
|
|
|
|
|
|
def manage_cache(root_path):
|
|
cache_path = os.path.join(root_path, _CACHE_PATH)
|
|
timestamp_path = os.path.join(cache_path, ".creation-time")
|
|
if os.path.exists(cache_path) and \
|
|
os.stat(__file__).st_mtime > os.stat(timestamp_path).st_mtime:
|
|
print("Vigil has been updated, so clearing the cache and"
|
|
" recalculating all results...")
|
|
shutil.rmtree(cache_path)
|
|
if not os.path.exists(cache_path):
|
|
os.mkdir(cache_path)
|
|
open(timestamp_path, "w").close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) == 2:
|
|
root_path = os.path.abspath(sys.argv[1])
|
|
with terminal.console_title("vigil: " + os.path.basename(root_path)):
|
|
manage_cache(root_path)
|
|
with terminal.hidden_cursor():
|
|
with terminal.urwid_screen() as urwid_screen:
|
|
main(root_path, urwid_screen)
|
|
else:
|
|
usage = __doc__.replace("*", "")
|
|
print(usage)
|