Coding style.

Moved Runner class into worker.py and renamed it to Worker.
This commit is contained in:
Andrew Hamilton 2016-03-10 00:36:59 +00:00
parent 4fa5b524d4
commit ccdd89fa28
3 changed files with 98 additions and 94 deletions

107
vigil
View file

@ -70,7 +70,6 @@ import time
import traceback
import docopt
import psutil
import pyinotify
import urwid
import urwid.raw_display
@ -704,11 +703,11 @@ class Screen:
self._log.log_command("Paused workers." if self._is_paused else
"Running workers...")
if self._is_paused:
for runner in self.runners:
runner.pause()
for worker in self.workers:
worker.pause()
else:
for runner in self.runners:
runner.continue_()
for worker in self.workers:
worker.continue_()
def quit_(self):
os.kill(os.getpid(), signal.SIGINT)
@ -835,82 +834,6 @@ def _regulate_temperature(log):
log.log_message("The computer has cooled down. Continuing...")
def _make_process_nicest(pid):
process = psutil.Process(pid)
process.nice(19)
process.ionice(psutil.IOPRIO_CLASS_IDLE)
class Runner:
def __init__(self, sandbox, is_already_paused, is_being_tested):
self.sandbox = sandbox
self.is_already_paused = is_already_paused
self.is_being_tested = is_being_tested
self.result = None
self.process = None
self.child_pid = None
@asyncio.coroutine
def create_process(self):
if self.sandbox is None:
command = [worker.__file__]
else:
cache_path = os.path.join(os.getcwd(), tools.CACHE_PATH)
cache_mount = self.sandbox.mount_point + cache_path
subprocess.check_call(["sudo", "mount", "--bind", cache_path,
cache_mount])
command = self.sandbox.command([worker.__file__])
create = asyncio.create_subprocess_exec(
*command, stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
self.process = yield from create
pid_line = yield from self.process.stdout.readline()
self.child_pid = int(pid_line.strip())
@asyncio.coroutine
def run_tool(self, path, tool):
self.process.stdin.write(("%s\n%s\n" %
(tool.__qualname__, path)).encode("utf-8"))
data = yield from self.process.stdout.readline()
return tools.Status(int(data))
@asyncio.coroutine
def job_runner(self, summary, log, jobs_added_event,
appearance_changed_event):
yield from self.create_process()
_make_process_nicest(self.child_pid)
while True:
yield from jobs_added_event.wait()
while True:
# _regulate_temperature(log) # My fan is broken
try:
self.result = summary.get_closest_placeholder()
except StopIteration:
self.result = None
if summary.result_total == summary.completed_total:
log.log_message("All results are up to date.")
if self.is_being_tested:
os.kill(os.getpid(), signal.SIGINT)
break
yield from self.result.run(log, appearance_changed_event,
self)
summary.completed_total += 1
jobs_added_event.clear()
def pause(self):
if self.result is not None and \
self.result.status == tools.Status.running:
os.kill(self.child_pid, signal.SIGSTOP)
self.result.set_status(tools.Status.paused)
def continue_(self):
if self.result is not None and \
self.result.status == tools.Status.paused:
self.result.set_status(tools.Status.running)
os.kill(self.child_pid, signal.SIGCONT)
def _add_watch_manager_to_mainloop(root_path, mainloop, on_filesystem_change,
exclude_filter):
watch_manager = pyinotify.WatchManager()
@ -993,7 +916,7 @@ def main(root_path, loop, worker_count=None, is_sandboxed=True,
appearance_changed_event.set()
watch_manager_fd = _add_watch_manager_to_mainloop(
root_path, loop, on_filesystem_change, _is_path_excluded)
screen.runners = runners = []
screen.workers = workers = []
if is_sandboxed:
sandbox_temp_dir = tempfile.mkdtemp()
sandbox = sandbox_fs.SandboxFs(sandbox_temp_dir)
@ -1008,11 +931,11 @@ def main(root_path, loop, worker_count=None, is_sandboxed=True,
log.log_message("Running without the filesystem sandbox...")
log.log_message("Starting workers (%s) ..." % worker_count)
for index in range(worker_count):
runner = Runner(sandbox, screen._is_paused, is_being_tested)
runners.append(runner)
future = runner.job_runner(
worker_ = worker.Worker(sandbox, screen._is_paused, is_being_tested)
workers.append(worker_)
future = worker_.job_runner(
summary, log, jobs_added_event, appearance_changed_event)
runner.future = asyncio.ensure_future(future, loop=loop)
worker_.future = asyncio.ensure_future(future, loop=loop)
def on_window_resize():
appearance_changed_event.set()
@ -1024,11 +947,11 @@ def main(root_path, loop, worker_count=None, is_sandboxed=True,
def exit_loop():
log.log_command("Exiting...")
time.sleep(0.05)
for runner in runners:
runner.pause()
runner.future.cancel()
if runner.result is not None:
runner.result.reset()
for worker in workers:
worker.pause()
worker.future.cancel()
if worker.result is not None:
worker.result.reset()
loop.stop()
loop.add_signal_handler(signal.SIGWINCH, on_window_resize)
loop.add_signal_handler(signal.SIGINT, exit_loop)
@ -1051,7 +974,7 @@ def main(root_path, loop, worker_count=None, is_sandboxed=True,
# Cannot pickle generators, locks, sockets or events.
(summary.closest_placeholder_generator, summary._lock,
summary._jobs_added_event, screen._appearance_changed_event,
screen._main_loop, screen.runners,
screen._main_loop, screen.workers,
log._appearance_changed_event) = [None] * 7
open_compressed = functools.partial(gzip.open, compresslevel=1)
tools.dump_pickle_safe(screen, pickle_path, open=open_compressed)

View file

@ -3,11 +3,92 @@
# Copyright (C) 2015-2016 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import asyncio
import os
import signal
import subprocess
import psutil
import tools
def _make_process_nicest(pid):
process = psutil.Process(pid)
process.nice(19)
process.ionice(psutil.IOPRIO_CLASS_IDLE)
class Worker:
def __init__(self, sandbox, is_already_paused, is_being_tested):
self.sandbox = sandbox
self.is_already_paused = is_already_paused
self.is_being_tested = is_being_tested
self.result = None
self.process = None
self.child_pid = None
@asyncio.coroutine
def create_process(self):
if self.sandbox is None:
command = [__file__]
else:
cache_path = os.path.join(os.getcwd(), tools.CACHE_PATH)
cache_mount = self.sandbox.mount_point + cache_path
subprocess.check_call(["sudo", "mount", "--bind", cache_path,
cache_mount])
command = self.sandbox.command([__file__])
create = asyncio.create_subprocess_exec(
*command, stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
self.process = yield from create
pid_line = yield from self.process.stdout.readline()
self.child_pid = int(pid_line.strip())
@asyncio.coroutine
def run_tool(self, path, tool):
self.process.stdin.write(("%s\n%s\n" %
(tool.__qualname__, path)).encode("utf-8"))
data = yield from self.process.stdout.readline()
return tools.Status(int(data))
@asyncio.coroutine
def job_runner(self, summary, log, jobs_added_event,
appearance_changed_event):
yield from self.create_process()
_make_process_nicest(self.child_pid)
while True:
yield from jobs_added_event.wait()
while True:
# _regulate_temperature(log) # My fan is broken
try:
self.result = summary.get_closest_placeholder()
except StopIteration:
self.result = None
if summary.result_total == summary.completed_total:
log.log_message("All results are up to date.")
if self.is_being_tested:
os.kill(os.getpid(), signal.SIGINT)
break
yield from self.result.run(log, appearance_changed_event,
self)
summary.completed_total += 1
jobs_added_event.clear()
def pause(self):
if self.result is not None and \
self.result.status == tools.Status.running:
os.kill(self.child_pid, signal.SIGSTOP)
self.result.set_status(tools.Status.paused)
def continue_(self):
if self.result is not None and \
self.result.status == tools.Status.paused:
self.result.set_status(tools.Status.running)
os.kill(self.child_pid, signal.SIGCONT)
def main():
print(os.getpid(), flush=True)
while True:

View file

@ -12,7 +12,7 @@ import unittest
import sandbox_fs
import tools
import vigil
import worker
class WorkerTestCase(unittest.TestCase):
@ -30,7 +30,7 @@ class WorkerTestCase(unittest.TestCase):
def _test_worker(self, sandbox):
loop = asyncio.get_event_loop()
worker_ = vigil.Runner(sandbox, False, False)
worker_ = worker.Worker(sandbox, False, False)
loop.run_until_complete(worker_.create_process())
future = worker_.run_tool("foo", tools.metadata)
status = loop.run_until_complete(future)