diff --git a/install-dependencies b/install-dependencies index bdcc6e7..920c548 100755 --- a/install-dependencies +++ b/install-dependencies @@ -6,7 +6,7 @@ set -e echo "Install the dependencies of the vigil script..." sudo apt-get --yes install python3-minimal python3-pygments python3-pyinotify \ - python3-urwid + python3-urwid python3-psutil echo echo "Install all the tools vigil may need..." ./install-tools diff --git a/vigil b/vigil index a8ecd97..a4904ae 100755 --- a/vigil +++ b/vigil @@ -55,6 +55,7 @@ import threading import time import traceback +import psutil import pyinotify import fill3 @@ -107,26 +108,12 @@ def dump_pickle_safe(object_, path, protocol=pickle.HIGHEST_PROTOCOL, try: with open(tmp_path, "wb") as file_: pickle.dump(object_, file_, protocol=protocol) - except OSError: + except (OSError, KeyboardInterrupt): os.remove(tmp_path) else: os.rename(tmp_path, path) -def multiprocessing_process(func, *args, **kwargs): - def wrapper(child_conn, func, args, **kwargs): - result = func(*args, **kwargs) - child_conn.send(result) - child_conn.close() - parent_conn, child_conn = multiprocessing.Pipe() - process = multiprocessing.Process( - target=wrapper, args=(child_conn, func, args), kwargs=kwargs, - daemon=True) - process.start() - process.result_conn = parent_conn - return process - - def status_to_str(status, is_status_simple): if isinstance(status, int): # is a status enumeration dict_ = (tools._STATUS_TO_TERMSTR_SIMPLE if is_status_simple @@ -177,18 +164,15 @@ class Result: appearance_changed_event.set() self.entry.appearance_cache = None - def run(self, log, appearance_changed_event): + def run(self, log, appearance_changed_event, runner): self.is_placeholder = False tool_name = tools._tool_name_colored(self.tool, self.path) path_colored = tools._path_colored(self.path) log.log_message(["Running ", tool_name, " on ", path_colored, "."]) self.set_status(tools.Status.running, appearance_changed_event) start_time = time.time() - self.process = multiprocessing_process( - tools.run_tool_no_error, self.path, self.tool) - new_status, result = self.process.result_conn.recv() - self.status, self.result = new_status, result - self.process = None + new_status = runner.worker.run_tool(self.path, self.tool) + Result.result.fget.evict(self) end_time = time.time() self.set_status(new_status, appearance_changed_event) self.is_completed = True @@ -200,11 +184,6 @@ class Result: def reset(self): self.is_placeholder = True self.status = tools.Status.empty - try: - self.process.terminate() - except AttributeError: - pass - self.process = None def appearance_min(self): return [status_to_str(self.status, @@ -860,11 +839,49 @@ def regulate_temperature(log): log.log_message("The computer has cooled down. Continuing...") +def make_process_nicest(pid): + process = psutil.Process(pid) + process.nice(19) + process.ionice(psutil.IOPRIO_CLASS_IDLE) + + +class _Result(Result): + + def __del__(self): + pass + + +def work_loop(child_connection): + while True: + tool, path = child_connection.recv() + result = _Result(path, tool) + status, result.result = tools.run_tool_no_error(path, tool) + child_connection.send(status) + + +class Worker: + + def __init__(self): + self.parent_connection, child_connection = multiprocessing.Pipe() + self.process = multiprocessing.Process( + target=work_loop, args=(child_connection,), daemon=True) + make_process_nicest(self.process.pid) + self.process.start() + + def run_tool(self, path, tool): + self.parent_connection.send([tool, path]) + return self.parent_connection.recv() + + def stop(self): + os.kill(self.process.pid, signal.SIGKILL) + + class Runner: def __init__(self): self.result = None self.is_running = True + self.worker = Worker() def job_runner(self, summary, log, jobs_added_event, appearance_changed_event): @@ -878,7 +895,7 @@ class Runner: log.log_message("All results are up to date.") break try: - self.result.run(log, appearance_changed_event) + self.result.run(log, appearance_changed_event, self) summary.completed_total += 1 except EOFError: # Occurs if the process is terminated pass @@ -931,7 +948,6 @@ def main(root_path, urwid_screen): update_display_thread.start() loop.add_reader(sys.stdin, screen.on_keypressed, urwid_screen) runners = [Runner() for index in range(multiprocessing.cpu_count() * 2)] - screen.runners = runners for runner in runners: args = (summary, log, jobs_added_event, appearance_changed_event) threading.Thread(target=runner.job_runner, args=args, @@ -949,7 +965,9 @@ def main(root_path, urwid_screen): appearance_changed_event.set() update_display_thread.join() for runner in runners: + runner.worker.stop() runner.is_running = False + for runner in runners: runner.result.reset() # Cannot pickle generators, locks, sockets or events. summary.closest_placeholder_generator = None diff --git a/vigil_test.py b/vigil_test.py index ef32b84..6205ff5 100755 --- a/vigil_test.py +++ b/vigil_test.py @@ -38,17 +38,6 @@ class LruCacheWithEvictionTestCase(unittest.TestCase): self._assert_cache(a, 1, 2, 2) -class MultiprocessingWrapperTestCase(unittest.TestCase): - - def test_multiprocessing_wrapper(self): - def a(b, c, d=1): - return b + c + d - process = vigil.multiprocessing_process(a, 1, 2) - result = process.result_conn.recv() - process.join() - self.assertEqual(result, 4) - - _DIMENSIONS = (40, 40)