From ccdd89fa2851b08efce3942e393e02894647dd15 Mon Sep 17 00:00:00 2001 From: Andrew Hamilton Date: Thu, 10 Mar 2016 00:36:59 +0000 Subject: [PATCH] Coding style. Moved Runner class into worker.py and renamed it to Worker. --- vigil | 107 +++++++------------------------------------------ worker.py | 81 +++++++++++++++++++++++++++++++++++++ worker_test.py | 4 +- 3 files changed, 98 insertions(+), 94 deletions(-) diff --git a/vigil b/vigil index eb46083..ae91b35 100755 --- a/vigil +++ b/vigil @@ -70,7 +70,6 @@ import time import traceback import docopt -import psutil import pyinotify import urwid import urwid.raw_display @@ -704,11 +703,11 @@ class Screen: self._log.log_command("Paused workers." if self._is_paused else "Running workers...") if self._is_paused: - for runner in self.runners: - runner.pause() + for worker in self.workers: + worker.pause() else: - for runner in self.runners: - runner.continue_() + for worker in self.workers: + worker.continue_() def quit_(self): os.kill(os.getpid(), signal.SIGINT) @@ -835,82 +834,6 @@ def _regulate_temperature(log): log.log_message("The computer has cooled down. Continuing...") -def _make_process_nicest(pid): - process = psutil.Process(pid) - process.nice(19) - process.ionice(psutil.IOPRIO_CLASS_IDLE) - - -class Runner: - - def __init__(self, sandbox, is_already_paused, is_being_tested): - self.sandbox = sandbox - self.is_already_paused = is_already_paused - self.is_being_tested = is_being_tested - self.result = None - self.process = None - self.child_pid = None - - @asyncio.coroutine - def create_process(self): - if self.sandbox is None: - command = [worker.__file__] - else: - cache_path = os.path.join(os.getcwd(), tools.CACHE_PATH) - cache_mount = self.sandbox.mount_point + cache_path - subprocess.check_call(["sudo", "mount", "--bind", cache_path, - cache_mount]) - command = self.sandbox.command([worker.__file__]) - create = asyncio.create_subprocess_exec( - *command, stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) - self.process = yield from create - pid_line = yield from self.process.stdout.readline() - self.child_pid = int(pid_line.strip()) - - @asyncio.coroutine - def run_tool(self, path, tool): - self.process.stdin.write(("%s\n%s\n" % - (tool.__qualname__, path)).encode("utf-8")) - data = yield from self.process.stdout.readline() - return tools.Status(int(data)) - - @asyncio.coroutine - def job_runner(self, summary, log, jobs_added_event, - appearance_changed_event): - yield from self.create_process() - _make_process_nicest(self.child_pid) - while True: - yield from jobs_added_event.wait() - while True: - # _regulate_temperature(log) # My fan is broken - try: - self.result = summary.get_closest_placeholder() - except StopIteration: - self.result = None - if summary.result_total == summary.completed_total: - log.log_message("All results are up to date.") - if self.is_being_tested: - os.kill(os.getpid(), signal.SIGINT) - break - yield from self.result.run(log, appearance_changed_event, - self) - summary.completed_total += 1 - jobs_added_event.clear() - - def pause(self): - if self.result is not None and \ - self.result.status == tools.Status.running: - os.kill(self.child_pid, signal.SIGSTOP) - self.result.set_status(tools.Status.paused) - - def continue_(self): - if self.result is not None and \ - self.result.status == tools.Status.paused: - self.result.set_status(tools.Status.running) - os.kill(self.child_pid, signal.SIGCONT) - - def _add_watch_manager_to_mainloop(root_path, mainloop, on_filesystem_change, exclude_filter): watch_manager = pyinotify.WatchManager() @@ -993,7 +916,7 @@ def main(root_path, loop, worker_count=None, is_sandboxed=True, appearance_changed_event.set() watch_manager_fd = _add_watch_manager_to_mainloop( root_path, loop, on_filesystem_change, _is_path_excluded) - screen.runners = runners = [] + screen.workers = workers = [] if is_sandboxed: sandbox_temp_dir = tempfile.mkdtemp() sandbox = sandbox_fs.SandboxFs(sandbox_temp_dir) @@ -1008,11 +931,11 @@ def main(root_path, loop, worker_count=None, is_sandboxed=True, log.log_message("Running without the filesystem sandbox...") log.log_message("Starting workers (%s) ..." % worker_count) for index in range(worker_count): - runner = Runner(sandbox, screen._is_paused, is_being_tested) - runners.append(runner) - future = runner.job_runner( + worker_ = worker.Worker(sandbox, screen._is_paused, is_being_tested) + workers.append(worker_) + future = worker_.job_runner( summary, log, jobs_added_event, appearance_changed_event) - runner.future = asyncio.ensure_future(future, loop=loop) + worker_.future = asyncio.ensure_future(future, loop=loop) def on_window_resize(): appearance_changed_event.set() @@ -1024,11 +947,11 @@ def main(root_path, loop, worker_count=None, is_sandboxed=True, def exit_loop(): log.log_command("Exiting...") time.sleep(0.05) - for runner in runners: - runner.pause() - runner.future.cancel() - if runner.result is not None: - runner.result.reset() + for worker in workers: + worker.pause() + worker.future.cancel() + if worker.result is not None: + worker.result.reset() loop.stop() loop.add_signal_handler(signal.SIGWINCH, on_window_resize) loop.add_signal_handler(signal.SIGINT, exit_loop) @@ -1051,7 +974,7 @@ def main(root_path, loop, worker_count=None, is_sandboxed=True, # Cannot pickle generators, locks, sockets or events. (summary.closest_placeholder_generator, summary._lock, summary._jobs_added_event, screen._appearance_changed_event, - screen._main_loop, screen.runners, + screen._main_loop, screen.workers, log._appearance_changed_event) = [None] * 7 open_compressed = functools.partial(gzip.open, compresslevel=1) tools.dump_pickle_safe(screen, pickle_path, open=open_compressed) diff --git a/worker.py b/worker.py index cd59d57..659b920 100755 --- a/worker.py +++ b/worker.py @@ -3,11 +3,92 @@ # Copyright (C) 2015-2016 Andrew Hamilton. All rights reserved. # Licensed under the Artistic License 2.0. +import asyncio import os +import signal +import subprocess + +import psutil import tools +def _make_process_nicest(pid): + process = psutil.Process(pid) + process.nice(19) + process.ionice(psutil.IOPRIO_CLASS_IDLE) + + +class Worker: + + def __init__(self, sandbox, is_already_paused, is_being_tested): + self.sandbox = sandbox + self.is_already_paused = is_already_paused + self.is_being_tested = is_being_tested + self.result = None + self.process = None + self.child_pid = None + + @asyncio.coroutine + def create_process(self): + if self.sandbox is None: + command = [__file__] + else: + cache_path = os.path.join(os.getcwd(), tools.CACHE_PATH) + cache_mount = self.sandbox.mount_point + cache_path + subprocess.check_call(["sudo", "mount", "--bind", cache_path, + cache_mount]) + command = self.sandbox.command([__file__]) + create = asyncio.create_subprocess_exec( + *command, stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + self.process = yield from create + pid_line = yield from self.process.stdout.readline() + self.child_pid = int(pid_line.strip()) + + @asyncio.coroutine + def run_tool(self, path, tool): + self.process.stdin.write(("%s\n%s\n" % + (tool.__qualname__, path)).encode("utf-8")) + data = yield from self.process.stdout.readline() + return tools.Status(int(data)) + + @asyncio.coroutine + def job_runner(self, summary, log, jobs_added_event, + appearance_changed_event): + yield from self.create_process() + _make_process_nicest(self.child_pid) + while True: + yield from jobs_added_event.wait() + while True: + # _regulate_temperature(log) # My fan is broken + try: + self.result = summary.get_closest_placeholder() + except StopIteration: + self.result = None + if summary.result_total == summary.completed_total: + log.log_message("All results are up to date.") + if self.is_being_tested: + os.kill(os.getpid(), signal.SIGINT) + break + yield from self.result.run(log, appearance_changed_event, + self) + summary.completed_total += 1 + jobs_added_event.clear() + + def pause(self): + if self.result is not None and \ + self.result.status == tools.Status.running: + os.kill(self.child_pid, signal.SIGSTOP) + self.result.set_status(tools.Status.paused) + + def continue_(self): + if self.result is not None and \ + self.result.status == tools.Status.paused: + self.result.set_status(tools.Status.running) + os.kill(self.child_pid, signal.SIGCONT) + + def main(): print(os.getpid(), flush=True) while True: diff --git a/worker_test.py b/worker_test.py index 5970309..00578cd 100755 --- a/worker_test.py +++ b/worker_test.py @@ -12,7 +12,7 @@ import unittest import sandbox_fs import tools -import vigil +import worker class WorkerTestCase(unittest.TestCase): @@ -30,7 +30,7 @@ class WorkerTestCase(unittest.TestCase): def _test_worker(self, sandbox): loop = asyncio.get_event_loop() - worker_ = vigil.Runner(sandbox, False, False) + worker_ = worker.Worker(sandbox, False, False) loop.run_until_complete(worker_.create_process()) future = worker_.run_tool("foo", tools.metadata) status = loop.run_until_complete(future)