Coding style.

- Can determine testing globally with '"unittest" in sys.modules'.
- Removed all the is_being_tested plumbing.
This commit is contained in:
Andrew Hamilton 2022-01-14 09:22:27 +10:00
parent d899423f6b
commit 62ebbd562b
4 changed files with 10 additions and 11 deletions

View file

@ -666,10 +666,10 @@ class Screen:
state["workers"] = None
return state
def make_workers(self, worker_count, is_being_tested, compression):
def make_workers(self, worker_count, compression):
workers = []
for index in range(worker_count):
worker_ = worker.Worker(is_being_tested, compression)
worker_ = worker.Worker(compression)
workers.append(worker_)
future = worker_.job_runner(self, self._summary, self._log,
self._summary._jobs_added_event)
@ -1020,7 +1020,7 @@ def on_filesystem_event(event, summary, root_path):
async def main(title, root_path, worker_count=None, editor_command=None, theme=None,
compression=None, is_being_tested=False):
compression=None):
loop = asyncio.get_running_loop()
if worker_count is None:
worker_count = max(multiprocessing.cpu_count() - 1, 1)
@ -1042,12 +1042,12 @@ async def main(title, root_path, worker_count=None, editor_command=None, theme=N
notifier = setup_inotify(root_path, loop, callback, is_path_excluded)
try:
log.log_message(f"Starting workers ({worker_count}) …")
screen.make_workers(worker_count, is_being_tested, compression)
screen.make_workers(worker_count, compression)
loop.create_task(summary.sync_with_filesystem(log))
for worker_ in screen.workers:
loop.create_task(worker_.future)
try:
if sys.stdout.isatty() or is_being_tested:
if sys.stdout.isatty() or "unittest" in sys.modules:
await fill3.tui(title, screen)
log.log_message("Program stopped.")
else:

View file

@ -5,6 +5,7 @@ import asyncio
import contextlib
import os
import signal
import sys
import fill3
@ -17,8 +18,7 @@ class Worker:
AUTOSAVE_MESSAGE = "Auto-saving…"
unsaved_jobs_total = 0
def __init__(self, is_being_tested, compression):
self.is_being_tested = is_being_tested
def __init__(self, compression):
self.compression = compression
self.result = None
self.process = None
@ -67,7 +67,7 @@ class Worker:
log.log_message("All results are up to date.")
log.log_message(Worker.AUTOSAVE_MESSAGE)
screen.save()
if self.is_being_tested:
if "unittest" in sys.modules:
fill3.SHUTDOWN_EVENT.set()
jobs_added_event.clear()

View file

@ -204,8 +204,7 @@ class MainTestCase(unittest.TestCase):
with __main__.chdir(root_path):
loop = asyncio.get_event_loop()
with contextlib.redirect_stdout(io.StringIO()):
loop.run_until_complete(__main__.main("test",
root_path, worker_count=2, is_being_tested=True))
loop.run_until_complete(__main__.main("test", root_path, worker_count=2))
for file_name in ["summary.pickle", "source_checksum",
"foo-metadata", "foo-contents"]:
self.assertTrue(os.path.exists(".eris/" + file_name))

View file

@ -27,7 +27,7 @@ class WorkerTestCase(unittest.TestCase):
def test_run_job(self):
loop = asyncio.get_event_loop()
compression = "none"
worker_ = worker.Worker(False, compression)
worker_ = worker.Worker(compression)
loop.run_until_complete(worker_.create_process())
worker_.process.stdin.write(f"{compression}\n".encode("utf-8"))
future = worker_.run_tool("foo", tools.metadata)