Run all tools in a filesystem sandbox

This commit is contained in:
Andrew Hamilton 2016-01-21 23:22:42 +00:00
parent 4d0a03cc16
commit 721cc28d03
11 changed files with 426 additions and 151 deletions

217
vigil
View file

@ -41,8 +41,8 @@ Keys:
import asyncio
import collections
import contextlib
import functools
import gc
import gzip
import importlib
import multiprocessing
@ -52,14 +52,15 @@ import shutil
import signal
import subprocess
import sys
import tempfile
import threading
import time
import traceback
import psutil
import pyinotify
import fill3
import sandbox_fs
import terminal
import termstr
import tools
@ -134,13 +135,8 @@ class Result:
path + "-" + tool.__name__)
self.scroll_position = (0, 0)
self.is_completed = False
self.reset()
def __del__(self):
try:
os.remove(self.pickle_path)
except FileNotFoundError:
pass
self.is_placeholder = True
self.status = tools.Status.empty
@property
@lru_cache_with_eviction(maxsize=50)
@ -160,22 +156,23 @@ class Result:
dump_pickle_safe(value, self.pickle_path, open=self._open_func)
Result.result.fget.evict(self)
def set_status(self, status, appearance_changed_event):
def set_status(self, status):
self.status = status
appearance_changed_event.set()
self.entry.appearance_cache = None
def run(self, log, appearance_changed_event, worker):
self.is_placeholder = False
tool_name = tools._tool_name_colored(self.tool, self.path)
path_colored = tools._path_colored(self.path)
log.log_message(["Running ", tool_name, " on ", path_colored, "."])
self.set_status(tools.Status.running, appearance_changed_event)
log.log_message(["Running ", tool_name, " on ", path_colored, "..."])
self.set_status(tools.Status.running)
appearance_changed_event.set()
start_time = time.time()
new_status = worker.run_tool(self.path, self.tool)
Result.result.fget.evict(self)
end_time = time.time()
self.set_status(new_status, appearance_changed_event)
self.set_status(new_status)
appearance_changed_event.set()
self.is_completed = True
log.log_message(
["Finished running ", tool_name, " on ", path_colored, ". ",
@ -184,13 +181,16 @@ class Result:
def reset(self):
self.is_placeholder = True
self.status = tools.Status.empty
self.set_status(tools.Status.empty)
def appearance_min(self):
return [status_to_str(self.status,
self.entry.summary.is_status_simple)]
import worker # Avoid a circular import. worker.py needs the Result class.
def reverse_style(style):
return termstr.CharStyle(style.bg_color, style.fg_color, style.is_bold,
style.is_underlined)
@ -276,6 +276,7 @@ class Summary:
self.is_directory_sort = True
self._max_width = None
self._max_path_length = None
self._all_results = set()
self.sync_with_filesystem()
@property
@ -309,6 +310,7 @@ class Summary:
new_cursor_position = (0, 0)
row_index = 0
result_total, completed_total = 0, 0
all_results = set()
for path in paths:
full_path = os.path.join(self._root_path, path)
try:
@ -326,6 +328,7 @@ class Summary:
else:
result = Result(path, tool)
jobs_added = True
all_results.add(result)
if result.is_completed:
completed_total += 1
new_cache[cache_key] = result
@ -335,16 +338,17 @@ class Summary:
result_total += len(row)
max_width = max(len(row) for row in new_column)
max_path_length = max(len(path) for path in paths) - len("./")
deleted_results = self._all_results - all_results
self._column, self._cache, self._cursor_position, self.result_total, \
self.completed_total, self._max_width, self._max_path_length, \
self.closest_placeholder_generator = (
self.closest_placeholder_generator, self._all_results = (
new_column, new_cache, new_cursor_position, result_total,
completed_total, max_width, max_path_length, None)
completed_total, max_width, max_path_length, None, all_results)
if jobs_added:
self._jobs_added_event.set()
# Delete the stale results from the disk now, to avoid accidently
# deleting a future result with the same filename. See Result.__del__.
gc.collect()
for result in deleted_results:
with contextlib.suppress(FileNotFoundError):
os.remove(result.pickle_path)
def placeholder_spiral(self):
x, y = self.cursor_position()
@ -440,10 +444,8 @@ class Summary:
yield result, (index_x, row_index)
def move_to_next_issue(self):
try:
with contextlib.suppress(StopIteration):
issue, self._cursor_position = self._issue_generator().send(None)
except StopIteration:
pass
def move_to_next_issue_of_tool(self):
current_tool = self.get_selection().tool
@ -707,7 +709,7 @@ class Screen:
def toggle_watch_filesystem(self):
self._is_watching_filesystem = not self._is_watching_filesystem
self._log.log_command("Watching the filesystem for changes."
self._log.log_command("Watching the filesystem for changes..."
if self._is_watching_filesystem else
"Stopped watching the filesystem.")
if self._is_watching_filesystem:
@ -720,7 +722,7 @@ class Screen:
def toggle_pause(self):
self._is_paused = not self._is_paused
self._log.log_command("Paused work." if self._is_paused else
"Continuing work.")
"Continuing work...")
if self._is_paused:
for runner in self.runners:
runner.pause()
@ -849,72 +851,29 @@ def regulate_temperature(log):
log.log_message("The computer has cooled down. Continuing...")
def make_process_nicest(pid):
process = psutil.Process(pid)
process.nice(19)
process.ionice(psutil.IOPRIO_CLASS_IDLE)
class _Result(Result):
def __del__(self):
pass
def work_loop(parent_connection):
while True:
tool, path = parent_connection.recv()
result = _Result(path, tool)
status, result.result = tools.run_tool_no_error(path, tool)
parent_connection.send(status)
class Worker:
def __init__(self):
self.child_connection, parent_connection = multiprocessing.Pipe()
self.process = multiprocessing.Process(
target=work_loop, args=(parent_connection,), daemon=True)
make_process_nicest(self.process.pid)
self.process.start()
def run_tool(self, path, tool):
self.child_connection.send([tool, path])
return self.child_connection.recv()
def pause(self):
os.kill(self.process.pid, signal.SIGSTOP)
def continue_(self):
os.kill(self.process.pid, signal.SIGCONT)
def stop(self):
os.kill(self.process.pid, signal.SIGKILL)
class Runner:
def __init__(self):
def __init__(self, sandbox, is_being_tested):
self.result = None
self.is_running = True
self.worker = Worker()
self.worker = worker.Worker(sandbox)
self.is_being_tested = is_being_tested
def job_runner(self, summary, log, jobs_added_event,
appearance_changed_event):
while True:
jobs_added_event.wait()
while self.is_running:
while True:
# regulate_temperature(log) # My fan is broken
try:
self.result = summary.get_closest_placeholder()
except StopIteration:
log.log_message("All results are up to date.")
break
try:
with contextlib.suppress(ValueError): # Process was terminated
self.result.run(log, appearance_changed_event, self.worker)
summary.completed_total += 1
except EOFError: # Occurs if the process is terminated
pass
if self.is_being_tested:
os.kill(os.getpid(), signal.SIGINT)
jobs_added_event.clear()
def pause(self):
@ -936,9 +895,8 @@ def update_screen(main_widget, appearance_changed_event):
fill3.patch_screen(main_widget)
def main(root_path):
def main(root_path, is_being_tested=False):
global _UPDATE_THREAD_STOPPED
os.chdir(root_path) # FIX: Don't change directory if possible.
loop = asyncio.get_event_loop()
jobs_added_event = threading.Event()
appearance_changed_event = threading.Event()
@ -964,46 +922,72 @@ def main(root_path):
summary.sync_with_filesystem()
log.log_message("Program started.")
jobs_added_event.set()
runners = [Runner() for index in range(multiprocessing.cpu_count() * 2)]
screen.runners = runners
for runner in runners:
args = (summary, log, jobs_added_event, appearance_changed_event)
threading.Thread(target=runner.job_runner, args=args,
daemon=True).start()
if screen._is_paused:
runners = []
sandbox_temp_dir = tempfile.mkdtemp()
sandbox = sandbox_fs.SandboxFs(sandbox_temp_dir)
def start_runners():
log.log_message("Making filesystem sandbox...")
sandbox.mount()
log.log_message("Sandbox made.")
log.log_message("Starting workers...")
worker_total = multiprocessing.cpu_count() * 2
for index in range(worker_total):
runners.append(Runner(sandbox, is_being_tested))
screen.runners = runners
log.log_message("Workers started. (%s)" % worker_total)
for runner in runners:
args = (summary, log, jobs_added_event, appearance_changed_event)
threading.Thread(target=runner.job_runner, args=args,
daemon=True).start()
if screen._is_paused:
for runner in runners:
runner.pause()
try:
threading.Thread(target=start_runners, daemon=True).start()
def on_window_resize(n, frame):
appearance_changed_event.set()
appearance_changed_event.set()
update_display_thread = threading.Thread(
target=update_screen, args=(screen, appearance_changed_event),
daemon=True)
with terminal.hidden_cursor():
with terminal.urwid_screen() as urwid_screen:
loop.add_reader(sys.stdin, screen.on_keypressed, urwid_screen)
update_display_thread.start()
signal.signal(signal.SIGWINCH, on_window_resize)
try:
loop.run_forever()
except KeyboardInterrupt:
_UPDATE_THREAD_STOPPED = True
appearance_changed_event.set()
update_display_thread.join()
log.log_message("Program stopped.")
for runner in runners:
runner.pause()
if runner.result is not None:
runner.result.reset()
# Cannot pickle generators, locks, sockets or events.
(summary.closest_placeholder_generator, summary._lock,
summary._jobs_added_event, screen._appearance_changed_event,
screen._main_loop, screen._watch_manager, screen.runners,
log._appearance_changed_event) = [None] * 8
open_compressed = functools.partial(gzip.open, compresslevel=1)
dump_pickle_safe(screen, pickle_path, open=open_compressed)
finally:
sandbox.umount()
os.rmdir(sandbox_temp_dir)
def on_window_resize(n, frame):
appearance_changed_event.set()
appearance_changed_event.set()
update_display_thread = threading.Thread(
target=update_screen, args=(screen, appearance_changed_event),
daemon=True)
with terminal.hidden_cursor():
with terminal.urwid_screen() as urwid_screen:
signal.signal(signal.SIGWINCH, on_window_resize)
update_display_thread.start()
loop.add_reader(sys.stdin, screen.on_keypressed, urwid_screen)
try:
loop.run_forever()
except KeyboardInterrupt:
log.log_message("Program stopped.")
_UPDATE_THREAD_STOPPED = True
appearance_changed_event.set()
update_display_thread.join()
for runner in runners:
runner.worker.stop()
runner.is_running = False
for runner in runners:
runner.result.reset()
# Cannot pickle generators, locks, sockets or events.
(summary.closest_placeholder_generator, summary._lock,
summary._jobs_added_event, screen._appearance_changed_event,
screen._main_loop, screen._watch_manager, screen.runners,
log._appearance_changed_event) = [None] * 8
open_compressed = functools.partial(gzip.open, compresslevel=1)
dump_pickle_safe(screen, pickle_path, open=open_compressed)
@contextlib.contextmanager
def chdir(path):
old_cwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(old_cwd)
def manage_cache(root_path):
@ -1021,10 +1005,13 @@ def manage_cache(root_path):
if __name__ == "__main__":
if len(sys.argv) == 2:
subprocess.call(["sudo", "-p", "Vigil needs sudo to create the filesy"
"stem sandbox... [sudo] password for %u: ", "true"])
root_path = os.path.abspath(sys.argv[1])
with terminal.console_title("vigil: " + os.path.basename(root_path)):
manage_cache(root_path)
main(root_path)
with chdir(root_path): # FIX: Don't change directory if possible.
main(root_path)
else:
usage = __doc__.replace("*", "")
print(usage)