Run all jobs inside long lived worker processes

- There is less forking, once per worker process, instead of once per job.
- Nice and ionice are applied to the worker processes and hence all jobs run
  by them.
- This allows jobs to benefit from caches filled by previous jobs. For example
  many tools applied to a python file need to calculate if its python2 or python3.
This commit is contained in:
Andrew Hamilton 2015-12-31 15:07:30 +00:00
parent 39de82fb45
commit 667dd8ffa5
3 changed files with 47 additions and 40 deletions

View file

@ -6,7 +6,7 @@ set -e
echo "Install the dependencies of the vigil script..."
sudo apt-get --yes install python3-minimal python3-pygments python3-pyinotify \
python3-urwid
python3-urwid python3-psutil
echo
echo "Install all the tools vigil may need..."
./install-tools

74
vigil
View file

@ -55,6 +55,7 @@ import threading
import time
import traceback
import psutil
import pyinotify
import fill3
@ -107,26 +108,12 @@ def dump_pickle_safe(object_, path, protocol=pickle.HIGHEST_PROTOCOL,
try:
with open(tmp_path, "wb") as file_:
pickle.dump(object_, file_, protocol=protocol)
except OSError:
except (OSError, KeyboardInterrupt):
os.remove(tmp_path)
else:
os.rename(tmp_path, path)
def multiprocessing_process(func, *args, **kwargs):
def wrapper(child_conn, func, args, **kwargs):
result = func(*args, **kwargs)
child_conn.send(result)
child_conn.close()
parent_conn, child_conn = multiprocessing.Pipe()
process = multiprocessing.Process(
target=wrapper, args=(child_conn, func, args), kwargs=kwargs,
daemon=True)
process.start()
process.result_conn = parent_conn
return process
def status_to_str(status, is_status_simple):
if isinstance(status, int): # is a status enumeration
dict_ = (tools._STATUS_TO_TERMSTR_SIMPLE if is_status_simple
@ -177,18 +164,15 @@ class Result:
appearance_changed_event.set()
self.entry.appearance_cache = None
def run(self, log, appearance_changed_event):
def run(self, log, appearance_changed_event, runner):
self.is_placeholder = False
tool_name = tools._tool_name_colored(self.tool, self.path)
path_colored = tools._path_colored(self.path)
log.log_message(["Running ", tool_name, " on ", path_colored, "."])
self.set_status(tools.Status.running, appearance_changed_event)
start_time = time.time()
self.process = multiprocessing_process(
tools.run_tool_no_error, self.path, self.tool)
new_status, result = self.process.result_conn.recv()
self.status, self.result = new_status, result
self.process = None
new_status = runner.worker.run_tool(self.path, self.tool)
Result.result.fget.evict(self)
end_time = time.time()
self.set_status(new_status, appearance_changed_event)
self.is_completed = True
@ -200,11 +184,6 @@ class Result:
def reset(self):
self.is_placeholder = True
self.status = tools.Status.empty
try:
self.process.terminate()
except AttributeError:
pass
self.process = None
def appearance_min(self):
return [status_to_str(self.status,
@ -860,11 +839,49 @@ def regulate_temperature(log):
log.log_message("The computer has cooled down. Continuing...")
def make_process_nicest(pid):
process = psutil.Process(pid)
process.nice(19)
process.ionice(psutil.IOPRIO_CLASS_IDLE)
class _Result(Result):
def __del__(self):
pass
def work_loop(child_connection):
while True:
tool, path = child_connection.recv()
result = _Result(path, tool)
status, result.result = tools.run_tool_no_error(path, tool)
child_connection.send(status)
class Worker:
def __init__(self):
self.parent_connection, child_connection = multiprocessing.Pipe()
self.process = multiprocessing.Process(
target=work_loop, args=(child_connection,), daemon=True)
make_process_nicest(self.process.pid)
self.process.start()
def run_tool(self, path, tool):
self.parent_connection.send([tool, path])
return self.parent_connection.recv()
def stop(self):
os.kill(self.process.pid, signal.SIGKILL)
class Runner:
def __init__(self):
self.result = None
self.is_running = True
self.worker = Worker()
def job_runner(self, summary, log, jobs_added_event,
appearance_changed_event):
@ -878,7 +895,7 @@ class Runner:
log.log_message("All results are up to date.")
break
try:
self.result.run(log, appearance_changed_event)
self.result.run(log, appearance_changed_event, self)
summary.completed_total += 1
except EOFError: # Occurs if the process is terminated
pass
@ -931,7 +948,6 @@ def main(root_path, urwid_screen):
update_display_thread.start()
loop.add_reader(sys.stdin, screen.on_keypressed, urwid_screen)
runners = [Runner() for index in range(multiprocessing.cpu_count() * 2)]
screen.runners = runners
for runner in runners:
args = (summary, log, jobs_added_event, appearance_changed_event)
threading.Thread(target=runner.job_runner, args=args,
@ -949,7 +965,9 @@ def main(root_path, urwid_screen):
appearance_changed_event.set()
update_display_thread.join()
for runner in runners:
runner.worker.stop()
runner.is_running = False
for runner in runners:
runner.result.reset()
# Cannot pickle generators, locks, sockets or events.
summary.closest_placeholder_generator = None

View file

@ -38,17 +38,6 @@ class LruCacheWithEvictionTestCase(unittest.TestCase):
self._assert_cache(a, 1, 2, 2)
class MultiprocessingWrapperTestCase(unittest.TestCase):
def test_multiprocessing_wrapper(self):
def a(b, c, d=1):
return b + c + d
process = vigil.multiprocessing_process(a, 1, 2)
result = process.result_conn.recv()
process.join()
self.assertEqual(result, 4)
_DIMENSIONS = (40, 40)