Coding style.

- Increase maximum line length from 80 to 100.
This commit is contained in:
Andrew Hamilton 2021-11-29 12:51:34 +10:00
parent 75a028272d
commit 71b9da128b
15 changed files with 404 additions and 696 deletions

View file

@ -90,8 +90,7 @@ class Entry:
MAX_WIDTH = 0
def __init__(self, path, results, change_time, highlighted=None,
set_results=True):
def __init__(self, path, results, change_time, highlighted=None, set_results=True):
self.path = path
self.change_time = change_time
self.highlighted = highlighted
@ -114,8 +113,7 @@ class Entry:
return self.results[index]
def appearance_min(self):
if self.appearance_cache is None \
or self.last_width != Entry.MAX_WIDTH:
if self.appearance_cache is None or self.last_width != Entry.MAX_WIDTH:
self.last_width = Entry.MAX_WIDTH
if self.highlighted is not None:
self.results[self.highlighted].is_highlighted = True
@ -147,8 +145,7 @@ def is_path_excluded(path):
def codebase_files(path, skip_hidden_directories=True):
for (dirpath, dirnames, filenames) in os.walk(path):
if skip_hidden_directories:
filtered_dirnames = [dirname for dirname in dirnames
if not is_path_excluded(dirname)]
filtered_dirnames = [dirname for dirname in dirnames if not is_path_excluded(dirname)]
dirnames[:] = filtered_dirnames
for filename in filenames:
if not is_path_excluded(filename):
@ -179,9 +176,8 @@ def highlight_str(line, highlight_color, transparency):
else termstr.XTERM_COLORS[style.bg_color])
return termstr.CharStyle(
blend_color(fg_color, highlight_color, transparency),
blend_color(bg_color, highlight_color, transparency),
is_bold=style.is_bold, is_italic=style.is_italic,
is_underlined=style.is_underlined)
blend_color(bg_color, highlight_color, transparency), is_bold=style.is_bold,
is_italic=style.is_italic, is_underlined=style.is_underlined)
return termstr.TermStr(line).transform_style(blend_style)
@ -194,14 +190,12 @@ _UP, _DOWN, _LEFT, _RIGHT = (0, -1), (0, 1), (-1, 0), (1, 0)
def directory_sort(entry):
path = entry.path
return (os.path.dirname(path), tools.splitext(path)[1],
os.path.basename(path))
return (os.path.dirname(path), tools.splitext(path)[1], os.path.basename(path))
def type_sort(entry):
path = entry.path
return (tools.splitext(path)[1], os.path.dirname(path),
os.path.basename(path))
return (tools.splitext(path)[1], os.path.dirname(path), os.path.basename(path))
class Summary:
@ -235,12 +229,10 @@ class Summary:
if y == 0:
entries = []
else:
entries = itertools.chain(
[self._entries[y]], itertools.islice(self._entries, y),
itertools.islice(self._entries, y+1, None))
state["_old_entries"] = paged_list.PagedList(
entries, summary_path, 2000, 1, exist_ok=True,
open_func=open_compressed)
entries = itertools.chain([self._entries[y]], itertools.islice(self._entries, y),
itertools.islice(self._entries, y+1, None))
state["_old_entries"] = paged_list.PagedList(entries, summary_path, 2000, 1,
exist_ok=True, open_func=open_compressed)
state["_entries"] = None
state["__cursor_position"] = (x, 0)
return state
@ -261,8 +253,7 @@ class Summary:
def sort_entries(self):
key_func = directory_sort if self.is_directory_sort else type_sort
self._entries = sorted_collection.SortedCollection(
self._entries, key=key_func)
self._entries = sorted_collection.SortedCollection(self._entries, key=key_func)
self.closest_placeholder_generator = None
def add_entry(self, entry):
@ -273,8 +264,7 @@ class Summary:
if result.is_completed:
self.completed_total += 1
Entry.MAX_WIDTH = max(len(entry), Entry.MAX_WIDTH)
self._max_path_length = max(len(entry.path) - len("./"),
self._max_path_length)
self._max_path_length = max(len(entry.path) - len("./"), self._max_path_length)
entry_index = self._entries.insert(entry)
x, y = self._cursor_position
if entry_index <= y:
@ -313,11 +303,10 @@ class Summary:
del self._entries._keys[index]
del self._entries._items[index]
if len(row) == Entry.MAX_WIDTH:
Entry.MAX_WIDTH = max((len(entry) for entry in self._entries),
default=0)
Entry.MAX_WIDTH = max((len(entry) for entry in self._entries), default=0)
if (len(path) - 2) == self._max_path_length:
self._max_path_length = max(
((len(entry.path) - 2) for entry in self._entries), default=0)
self._max_path_length = max(((len(entry.path) - 2) for entry in self._entries),
default=0)
x, y = self._cursor_position
if y == len(self._entries):
self._cursor_position = x, y - 1
@ -370,8 +359,7 @@ class Summary:
log.log_message("Started sync with filesystem…")
start_time = time.time()
all_paths = set()
for path in fix_paths(self._root_path,
codebase_files(self._root_path)):
for path in fix_paths(self._root_path, codebase_files(self._root_path)):
await asyncio.sleep(0)
all_paths.add(path)
if path in cache:
@ -388,8 +376,7 @@ class Summary:
await asyncio.sleep(0)
self.on_file_deleted(path)
duration = time.time() - start_time
log.log_message(f"Finished sync with filesystem. "
f"{round(duration, 2)} secs")
log.log_message(f"Finished sync with filesystem. {round(duration, 2)} secs")
def _sweep_up(self, x, y):
yield from reversed(self._entries[y][:x])
@ -404,8 +391,7 @@ class Summary:
yield from self._entries[y]
def _sweep_combined(self, x, y):
for up_result, down_result in zip(self._sweep_up(x, y),
self._sweep_down(x, y)):
for up_result, down_result in zip(self._sweep_up(x, y), self._sweep_down(x, y)):
yield down_result
yield up_result
@ -439,8 +425,7 @@ class Summary:
return appearance
def _set_scroll_position(self, cursor_x, cursor_y, summary_height):
scroll_x, scroll_y = new_scroll_x, new_scroll_y = \
self._view_widget.position
scroll_x, scroll_y = new_scroll_x, new_scroll_y = self._view_widget.position
if cursor_y < scroll_y:
new_scroll_y = max(cursor_y - summary_height + 1, 0)
if (scroll_y + summary_height - 1) < cursor_y:
@ -450,8 +435,8 @@ class Summary:
def _highlight_cursor_row(self, appearance, cursor_y):
scroll_x, scroll_y = self._view_widget.position
highlighted_y = cursor_y - scroll_y
appearance[highlighted_y] = (highlight_str(
appearance[highlighted_y][:-1], termstr.Color.white, 0.8)
appearance[highlighted_y] = (highlight_str(appearance[highlighted_y][:-1],
termstr.Color.white, 0.8)
+ appearance[highlighted_y][-1])
return appearance
@ -462,8 +447,7 @@ class Summary:
cursor_x, cursor_y = self.cursor_position()
width, height = width - 1, height - 1 # Minus one for the scrollbars
self._set_scroll_position(cursor_x, cursor_y, height)
return self._highlight_cursor_row(
self._view_widget.appearance(dimensions), cursor_y)
return self._highlight_cursor_row(self._view_widget.appearance(dimensions), cursor_y)
def scroll(self, dx, dy):
scroll_x, scroll_y = self._view_widget.position
@ -530,8 +514,7 @@ class Summary:
row = self._entries[row_index]
for index_x, result in enumerate(row):
if (result.status == tools.Status.problem and
not (row_index == y and index_x <= x and
index != len(self._entries))):
not (row_index == y and index_x <= x and index != len(self._entries))):
yield result, (index_x, row_index)
def move_to_next_issue(self):
@ -594,13 +577,11 @@ class Log:
def log_message(self, message, timestamp=None, char_style=None):
if isinstance(message, list):
message = [part[1] if isinstance(part, tuple) else part
for part in message]
message = [part[1] if isinstance(part, tuple) else part for part in message]
message = fill3.join("", message)
if char_style is not None:
message = termstr.TermStr(message, char_style)
timestamp = (time.strftime("%H:%M:%S", time.localtime())
if timestamp is None else timestamp)
timestamp = time.strftime("%H:%M:%S", time.localtime()) if timestamp is None else timestamp
line = termstr.TermStr(timestamp, Log._GREY_BOLD_STYLE) + " " + message
if not sys.stdout.isatty():
print(line, flush=True)
@ -613,8 +594,7 @@ class Log:
self.log_message(message, char_style=Log._GREEN_STYLE)
def appearance(self, dimensions):
if self._appearance is None or \
fill3.appearance_dimensions(self._appearance) != dimensions:
if self._appearance is None or fill3.appearance_dimensions(self._appearance) != dimensions:
width, height = dimensions
del self.lines[:-height]
self._appearance = fill3.appearance_resize(self.lines, dimensions)
@ -629,9 +609,8 @@ def highlight_chars(str_, style, marker="*"):
def get_status_help():
return fill3.join("\n", ["Statuses:"] +
[" " + tools.STATUS_TO_TERMSTR[status] + " " + meaning
for status, meaning in tools.STATUS_MEANINGS])
return fill3.join("\n", ["Statuses:"] + [" " + tools.STATUS_TO_TERMSTR[status] + " " + meaning
for status, meaning in tools.STATUS_MEANINGS])
class Help:
@ -645,8 +624,7 @@ class Help:
portal = self.view.portal
self.key_map = {
"h": self._exit_help, terminal.UP_KEY: portal.scroll_up,
terminal.DOWN_KEY: portal.scroll_down,
terminal.LEFT_KEY: portal.scroll_left,
terminal.DOWN_KEY: portal.scroll_down, terminal.LEFT_KEY: portal.scroll_left,
terminal.RIGHT_KEY: portal.scroll_right, "q": self._exit_help,
terminal.ESC: self._exit_help}
@ -663,8 +641,7 @@ class Help:
appearance_changed_event.set()
def on_keyboard_input(self, term_code, appearance_changed_event):
action = (self.key_map.get(term_code) or
self.key_map.get(term_code.lower()))
action = self.key_map.get(term_code) or self.key_map.get(term_code.lower())
if action is not None:
action()
appearance_changed_event.set()
@ -740,15 +717,13 @@ class Screen:
result_widget = fill3.Text("Nothing selected")
self._view = fill3.View.from_widget(result_widget)
self._listing = fill3.Border(Listing(self._view))
log = fill3.Border(self._log, title="Log",
characters=Screen._DIMMED_BORDER)
log = fill3.Border(self._log, title="Log", characters=Screen._DIMMED_BORDER)
quarter_cut = functools.partial(self._partition, 25)
golden_cut = functools.partial(self._partition, 38.198)
three_quarter_cut = functools.partial(self._partition, 75)
port_log = fill3.Row([fill3.Column([summary, log], three_quarter_cut),
self._listing], golden_cut)
land_log = fill3.Column([fill3.Row([summary, log]), self._listing],
quarter_cut)
port_log = fill3.Row([fill3.Column([summary, log], three_quarter_cut), self._listing],
golden_cut)
land_log = fill3.Column([fill3.Row([summary, log]), self._listing], quarter_cut)
port_no_log = fill3.Row([summary, self._listing], golden_cut)
land_no_log = fill3.Column([summary, self._listing], quarter_cut)
self._layouts = [[land_no_log, port_no_log], [land_log, port_log]]
@ -769,8 +744,7 @@ class Screen:
x, y = selected_widget.scroll_position
widget_width, widget_height = fill3.appearance_dimensions(
selected_widget.result.appearance_min())
listing_width, listing_height = (self._listing.widget.
last_dimensions)
listing_width, listing_height = self._listing.widget.last_dimensions
listing_width -= 1 # scrollbars
listing_height -= 1
x = min(x + dx, max(widget_width - listing_width, 0))
@ -780,42 +754,33 @@ class Screen:
selected_widget.scroll_position = x, y
def cursor_up(self):
(self._summary.cursor_up() if self._is_summary_focused
else self._move_listing(_UP))
self._summary.cursor_up() if self._is_summary_focused else self._move_listing(_UP)
def cursor_down(self):
(self._summary.cursor_down() if self._is_summary_focused
else self._move_listing(_DOWN))
self._summary.cursor_down() if self._is_summary_focused else self._move_listing(_DOWN)
def cursor_right(self):
(self._summary.cursor_right() if self._is_summary_focused
else self._move_listing(_RIGHT))
self._summary.cursor_right() if self._is_summary_focused else self._move_listing(_RIGHT)
def cursor_left(self):
(self._summary.cursor_left() if self._is_summary_focused
else self._move_listing(_LEFT))
self._summary.cursor_left() if self._is_summary_focused else self._move_listing(_LEFT)
def cursor_page_up(self):
(self._summary.cursor_page_up() if self._is_summary_focused
else self.listing_page_up())
self._summary.cursor_page_up() if self._is_summary_focused else self.listing_page_up()
def cursor_page_down(self):
(self._summary.cursor_page_down() if self._is_summary_focused
else self.listing_page_down())
self._summary.cursor_page_down() if self._is_summary_focused else self.listing_page_down()
def cursor_end(self):
(self._summary.cursor_end() if self._is_summary_focused
else self._page_listing(_RIGHT))
self._summary.cursor_end() if self._is_summary_focused else self._page_listing(_RIGHT)
def cursor_home(self):
(self._summary.cursor_home() if self._is_summary_focused
else self._page_listing(_LEFT))
self._summary.cursor_home() if self._is_summary_focused else self._page_listing(_LEFT)
def _page_listing(self, vector):
dx, dy = vector
listing_width, listing_height = self._listing.widget.last_dimensions
self._move_listing((dx * (listing_width // 2),
dy * (listing_height // 2)))
self._move_listing((dx * (listing_width // 2), dy * (listing_height // 2)))
def listing_page_up(self):
self._page_listing(_UP)
@ -836,13 +801,11 @@ class Screen:
else:
path = self._summary.get_selection().path
path_colored = tools.path_colored(path)
line_num = (self._summary.get_selection().entry[0].
scroll_position[1] + 1)
line_num = self._summary.get_selection().entry[0].scroll_position[1] + 1
self._log.log_message([in_green("Editing "), path_colored,
in_green(f" at line {line_num}")])
subprocess.Popen(f"{self.editor_command} +{line_num} {path}",
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
subprocess.Popen(f"{self.editor_command} +{line_num} {path}", shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def toggle_status_style(self):
self._summary.toggle_status_style(self._log)
@ -862,16 +825,14 @@ class Screen:
selection = self._summary.get_selection()
tool_name = tools.tool_name_colored(selection.tool, selection.path)
path_colored = tools.path_colored(selection.path)
self._log.log_message([in_green("Refreshing "), tool_name,
in_green(" result of "), path_colored,
in_green("")])
self._log.log_message([in_green("Refreshing "), tool_name, in_green(" result of "),
path_colored, in_green("")])
self._summary.refresh_result(selection)
def refresh_tool(self):
selection = self._summary.get_selection()
tool_name = tools.tool_name_colored(selection.tool, selection.path)
self._log.log_message([in_green("Refreshing all results of "),
tool_name, in_green("")])
self._log.log_message([in_green("Refreshing all results of "), tool_name, in_green("")])
self._summary.refresh_tool(selection.tool)
_DIMMED_BORDER = [termstr.TermStr(part).fg_color(termstr.Color.grey_100)
@ -879,10 +840,8 @@ class Screen:
def _set_focus(self):
focused, unfocused = fill3.Border.THICK, Screen._DIMMED_BORDER
self._summary_border.set_style(focused if self._is_summary_focused
else unfocused)
self._listing.set_style(unfocused if self._is_summary_focused
else focused)
self._summary_border.set_style(focused if self._is_summary_focused else unfocused)
self._listing.set_style(unfocused if self._is_summary_focused else focused)
def toggle_focus(self):
self._is_summary_focused = not self._is_summary_focused
@ -894,10 +853,8 @@ class Screen:
def xdg_open(self):
path = self._summary.get_selection().path
path_colored = tools.path_colored(path)
self._log.log_message([in_green("Opening "), path_colored,
in_green("")])
subprocess.Popen(["xdg-open", path], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self._log.log_message([in_green("Opening "), path_colored, in_green("")])
subprocess.Popen(["xdg-open", path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def save(self):
worker.Worker.unsaved_jobs_total = 0
@ -907,8 +864,7 @@ class Screen:
def _select_entry_at_position(self, x, y, view_width, view_height):
border_width = 1
if x < border_width or y < border_width or x > view_width or \
y > view_height:
if x < border_width or y < border_width or x > view_width or y > view_height:
return
view_x, view_y = self._summary._view_widget.portal.position
column_index = x - border_width + view_x
@ -923,18 +879,14 @@ class Screen:
def _is_switching_focus(self, x, y, view_width, view_height):
return (not self._is_fullscreen and
(self._is_listing_portrait and
(x > view_width and
self._is_summary_focused or x <= view_width and
not self._is_summary_focused) or
not self._is_listing_portrait and
(y > view_height and
self._is_summary_focused or y <= view_height and
(x > view_width and self._is_summary_focused or x <= view_width and
not self._is_summary_focused) or not self._is_listing_portrait and
(y > view_height and self._is_summary_focused or y <= view_height and
not self._is_summary_focused)))
def on_mouse_input(self, term_code):
if self._is_help_visible:
self._help_widget.on_mouse_input(
term_code, self._appearance_changed_event)
self._help_widget.on_mouse_input(term_code, self._appearance_changed_event)
return
event = terminal.decode_mouse_input(term_code)
if event[0] not in [terminal.PRESS_MOUSE, terminal.DRAG_MOUSE]:
@ -953,8 +905,7 @@ class Screen:
elif event[1] == terminal.WHEEL_DOWN_MOUSE:
self.listing_page_down()
else:
view_width, view_height = \
self._summary._view_widget.portal.last_dimensions
view_width, view_height = self._summary._view_widget.portal.last_dimensions
if self._is_switching_focus(x, y, view_width, view_height):
self.toggle_focus()
else:
@ -965,11 +916,9 @@ class Screen:
def on_keyboard_input(self, term_code):
if self._is_help_visible:
self._help_widget.on_keyboard_input(
term_code, self._appearance_changed_event)
self._help_widget.on_keyboard_input(term_code, self._appearance_changed_event)
return
action = (Screen._KEY_MAP.get(term_code) or
Screen._KEY_MAP.get(term_code.lower()))
action = Screen._KEY_MAP.get(term_code) or Screen._KEY_MAP.get(term_code.lower())
if action is not None:
action(self)
self._appearance_changed_event.set()
@ -982,21 +931,16 @@ class Screen:
view.widget = widget.result
tool_name = tools.tool_name_colored(widget.tool, widget.path)
divider = " " + self._listing.top * 2 + " "
self._listing.title = (
tools.path_colored(widget.path) + divider + tool_name + " " +
tools.STATUS_TO_TERMSTR[widget.status] + divider +
"line " + str(y+1))
self._listing.title = (tools.path_colored(widget.path) + divider + tool_name + " " +
tools.STATUS_TO_TERMSTR[widget.status] + divider + "line " + str(y+1))
_STATUS_BAR = highlight_chars(
" *help *quit *t*a*b:focus *turn *log *edit *next *sort"
" *refresh *fullscreen *open", Log._GREEN_STYLE)
_STATUS_BAR = highlight_chars(" *help *quit *t*a*b:focus *turn *log *edit *next *sort"
" *refresh *fullscreen *open", Log._GREEN_STYLE)
@functools.lru_cache()
def _get_partial_bar_chars(self, bar_transparency):
bar_color = blend_color(termstr.Color.black, termstr.Color.white,
bar_transparency)
return [termstr.TermStr(char).fg_color(bar_color).
bg_color(termstr.Color.black)
bar_color = blend_color(termstr.Color.black, termstr.Color.white, bar_transparency)
return [termstr.TermStr(char).fg_color(bar_color).bg_color(termstr.Color.black)
for char in fill3.ScrollBar._PARTIAL_CHARS[1]]
@functools.lru_cache(maxsize=2)
@ -1007,19 +951,18 @@ class Screen:
whole = int(whole)
if whole < len(bar) and bar[whole].data == " ":
left_part = bar[:whole]
right_part = (self._get_partial_bar_chars(bar_transparency)
[int(fraction * 8)] + bar[whole+1:])
right_part = (self._get_partial_bar_chars(bar_transparency)[int(fraction * 8)]
+ bar[whole+1:])
else:
progress_bar_size = round(progress_bar_size)
left_part = bar[:progress_bar_size]
right_part = bar[progress_bar_size:]
return [highlight_str(left_part, termstr.Color.white, bar_transparency)
+ right_part]
return [highlight_str(left_part, termstr.Color.white, bar_transparency) + right_part]
def _get_status_bar(self, width):
incomplete = self._summary.result_total - self._summary.completed_total
progress_bar_size = width if self._summary.result_total == 0 else \
max(0, width * incomplete / self._summary.result_total)
progress_bar_size = (width if self._summary.result_total == 0 else
max(0, width * incomplete / self._summary.result_total))
return self._get_status_bar_appearance(width, progress_bar_size)
def appearance(self, dimensions):
@ -1028,44 +971,35 @@ class Screen:
if self._is_help_visible:
body = self._help_widget
elif self._is_fullscreen:
body = (self._summary_border if self._is_summary_focused
else self._listing)
body = self._summary_border if self._is_summary_focused else self._listing
else:
body = (self._layouts[self._is_log_visible]
[self._is_listing_portrait])
body = self._layouts[self._is_log_visible][self._is_listing_portrait]
width, height = max(dimensions[0], 10), max(dimensions[1], 20)
result = (body.appearance((width, height-1)) +
self._get_status_bar(width))
result = body.appearance((width, height-1)) + self._get_status_bar(width)
return (result if (width, height) == dimensions
else fill3.appearance_resize(result, dimensions))
_KEY_MAP = {
"t": toggle_window_orientation, "l": toggle_log, "h": toggle_help,
terminal.UP_KEY: cursor_up, terminal.DOWN_KEY: cursor_down,
terminal.LEFT_KEY: cursor_left, terminal.RIGHT_KEY: cursor_right,
terminal.PAGE_DOWN_KEY: cursor_page_down,
terminal.PAGE_UP_KEY: cursor_page_up, "s": toggle_order,
terminal.HOME_KEY: cursor_home, terminal.END_KEY: cursor_end,
"n": move_to_next_issue, "N": move_to_next_issue_of_tool,
"e": edit_file, "q": quit_, terminal.ESC: quit_, "r": refresh,
"R": refresh_tool, "\t": toggle_focus, "f": toggle_fullscreen,
"o": xdg_open}
_KEY_MAP = {"t": toggle_window_orientation, "l": toggle_log, "h": toggle_help,
terminal.UP_KEY: cursor_up, terminal.DOWN_KEY: cursor_down,
terminal.LEFT_KEY: cursor_left, terminal.RIGHT_KEY: cursor_right,
terminal.PAGE_DOWN_KEY: cursor_page_down, terminal.PAGE_UP_KEY: cursor_page_up,
"s": toggle_order, terminal.HOME_KEY: cursor_home, terminal.END_KEY: cursor_end,
"n": move_to_next_issue, "N": move_to_next_issue_of_tool, "e": edit_file,
"q": quit_, terminal.ESC: quit_, "r": refresh, "R": refresh_tool,
"\t": toggle_focus, "f": toggle_fullscreen, "o": xdg_open}
def setup_inotify(root_path, loop, on_filesystem_event, exclude_filter):
watch_manager = pyinotify.WatchManager()
event_mask = (pyinotify.IN_CREATE | pyinotify.IN_DELETE |
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_ATTRIB |
pyinotify.IN_MOVED_FROM | pyinotify.IN_MOVED_TO)
event_mask = (pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_CLOSE_WRITE |
pyinotify.IN_ATTRIB | pyinotify.IN_MOVED_FROM | pyinotify.IN_MOVED_TO)
watch_manager.add_watch(root_path, event_mask, rec=True, auto_add=True,
proc_fun=on_filesystem_event,
exclude_filter=exclude_filter, quiet=False)
return pyinotify.AsyncioNotifier(watch_manager, loop,
callback=lambda notifier: None)
proc_fun=on_filesystem_event, exclude_filter=exclude_filter,
quiet=False)
return pyinotify.AsyncioNotifier(watch_manager, loop, callback=lambda notifier: None)
def load_state(pickle_path, jobs_added_event, appearance_changed_event,
root_path, loop):
def load_state(pickle_path, jobs_added_event, appearance_changed_event, root_path, loop):
is_first_run = True
try:
with gzip.open(pickle_path, "rb") as file_:
@ -1119,16 +1053,14 @@ def main(root_path, loop, worker_count=None, editor_command=None, theme=None,
pickle_path = os.path.join(tools.CACHE_PATH, "summary.pickle")
jobs_added_event = asyncio.Event()
appearance_changed_event = asyncio.Event()
summary, screen, log, is_first_run = load_state(
pickle_path, jobs_added_event, appearance_changed_event, root_path,
loop)
summary, screen, log, is_first_run = load_state(pickle_path, jobs_added_event,
appearance_changed_event, root_path, loop)
screen.editor_command = editor_command
log.log_message("Program started.")
jobs_added_event.set()
def callback(event):
on_filesystem_event(event, summary, root_path,
appearance_changed_event)
on_filesystem_event(event, summary, root_path, appearance_changed_event)
notifier = setup_inotify(root_path, loop, callback, is_path_excluded)
try:
log.log_message(f"Starting workers ({worker_count}) …")
@ -1139,13 +1071,11 @@ def main(root_path, loop, worker_count=None, editor_command=None, theme=None,
time.sleep(0.05)
screen.stop_workers()
loop.stop()
loop.create_task(summary.sync_with_filesystem(
appearance_changed_event, log))
loop.create_task(summary.sync_with_filesystem(appearance_changed_event, log))
for worker_ in screen.workers:
loop.create_task(worker_.future)
if sys.stdout.isatty():
with fill3.context(loop, appearance_changed_event, screen,
exit_loop=exit_loop):
with fill3.context(loop, appearance_changed_event, screen, exit_loop=exit_loop):
loop.run_forever()
log.log_message("Program stopped.")
else:
@ -1192,13 +1122,10 @@ def print_tool_info():
for extensions, tools_ in tools.TOOLS_FOR_EXTENSIONS:
for extension in extensions:
for tool in tools_:
extensions_for_tool.setdefault(
tool, {extension}).add(extension)
extensions_for_tool.setdefault(tool, {extension}).add(extension)
for tool in sorted(tools.tools_all(), key=lambda t: t.__name__):
print(termstr.TermStr(tool.__name__).bold()
if tools.is_tool_available(tool)
else termstr.TermStr(tool.__name__).fg_color(termstr.Color.red)
+ " (not available) ")
print(termstr.TermStr(tool.__name__).bold() if tools.is_tool_available(tool)
else termstr.TermStr(tool.__name__).fg_color(termstr.Color.red) + " (not available)")
print("url:", tool.url)
extensions = list(extensions_for_tool.get(tool, {"*"}))
print("extensions:", ", ".join(extensions))
@ -1245,10 +1172,9 @@ def check_arguments():
if arguments["--compression"] not in compressions:
print("--compression must be one of:", " ".join(compressions))
sys.exit(1)
editor_command = arguments["--editor"] or os.environ.get("EDITOR", None)\
or os.environ.get("VISUAL", None)
return root_path, worker_count, editor_command, arguments["--theme"], \
arguments["--compression"]
editor_command = (arguments["--editor"] or os.environ.get("EDITOR", None)
or os.environ.get("VISUAL", None))
return root_path, worker_count, editor_command, arguments["--theme"], arguments["--compression"]
def inotify_watches_exceeded():
@ -1259,15 +1185,13 @@ def inotify_watches_exceeded():
def entry_point():
root_path, worker_count, editor_command, theme, compression = \
check_arguments()
root_path, worker_count, editor_command, theme, compression = check_arguments()
manage_cache(root_path)
with terminal.terminal_title("eris: " + os.path.basename(root_path)):
with chdir(root_path): # FIX: Don't change directory if possible.
loop = asyncio.get_event_loop()
try:
main(root_path, loop, worker_count, editor_command, theme,
compression)
main(root_path, loop, worker_count, editor_command, theme, compression)
except pyinotify.WatchManagerError:
inotify_watches_exceeded()

View file

@ -8,15 +8,13 @@ import shutil
def batch(iter_, page_size):
for _, batch in itertools.groupby(
enumerate(iter_), lambda tuple_: tuple_[0] // page_size):
for _, batch in itertools.groupby(enumerate(iter_), lambda tuple_: tuple_[0] // page_size):
yield [value for index, value in batch]
class PagedList:
def __init__(self, list_, pages_dir, page_size, cache_size, exist_ok=False,
open_func=open):
def __init__(self, list_, pages_dir, page_size, cache_size, exist_ok=False, open_func=open):
self.pages_dir = pages_dir # An empty or non-existant directory.
self.page_size = page_size
self.cache_size = cache_size
@ -57,12 +55,10 @@ class PagedList:
stop_page_index -= 1
stop_page_offset = self.page_size
if start_page_index == stop_page_index:
return (self._get_page(start_page_index)
[start_page_offset:stop_page_offset])
return self._get_page(start_page_index)[start_page_offset:stop_page_offset]
else:
return (self._get_page(start_page_index)[start_page_offset:] +
[line for page_index in
range(start_page_index+1, stop_page_index)
[line for page_index in range(start_page_index+1, stop_page_index)
for line in self._get_page(page_index)] +
self._get_page(stop_page_index)[:stop_page_offset])
else:
@ -70,8 +66,7 @@ class PagedList:
return self._get_page(page_index)[page_offset]
def _setup_page_cache(self):
self._get_page = functools.lru_cache(self.cache_size)(
self._get_page_org)
self._get_page = functools.lru_cache(self.cache_size)(self._get_page_org)
def __getstate__(self): # Don't pickle the lru_cache.
state = self.__dict__.copy()

View file

@ -55,23 +55,16 @@ class Status(enum.IntEnum):
timed_out = 7
_STATUS_COLORS = {Status.ok: termstr.Color.green,
Status.problem: termstr.Color.dark_green,
Status.not_applicable: termstr.Color.grey_80,
Status.running: termstr.Color.lime,
Status.error: termstr.Color.red,
Status.timed_out: termstr.Color.purple}
STATUS_MEANINGS = [
(Status.ok, "Ok"), (Status.problem, "Problem"),
(Status.not_applicable, "Not applicable"), (Status.running, "Running"),
(Status.timed_out, "Timed out"), (Status.pending, "Pending"),
(Status.error, "Error")
]
STATUS_TO_TERMSTR = {
status: termstr.TermStr(" ", termstr.CharStyle(bg_color=color))
for status, color in _STATUS_COLORS.items()}
STATUS_TO_TERMSTR[Status.pending] = termstr.TermStr(".").fg_color(
termstr.Color.grey_100)
_STATUS_COLORS = {Status.ok: termstr.Color.green, Status.problem: termstr.Color.dark_green,
Status.not_applicable: termstr.Color.grey_80, Status.running: termstr.Color.lime,
Status.error: termstr.Color.red, Status.timed_out: termstr.Color.purple}
STATUS_MEANINGS = [(Status.ok, "Ok"), (Status.problem, "Problem"),
(Status.not_applicable, "Not applicable"), (Status.running, "Running"),
(Status.timed_out, "Timed out"), (Status.pending, "Pending"),
(Status.error, "Error")]
STATUS_TO_TERMSTR = {status: termstr.TermStr(" ", termstr.CharStyle(bg_color=color))
for status, color in _STATUS_COLORS.items()}
STATUS_TO_TERMSTR[Status.pending] = termstr.TermStr(".").fg_color(termstr.Color.grey_100)
def get_ls_color_codes():
@ -85,8 +78,7 @@ TIMEOUT = 60
def _printable(text):
return "".join(char if ord(char) > 31 or char in ["\n", "\t"] else "#"
for char in text)
return "".join(char if ord(char) > 31 or char in ["\n", "\t"] else "#" for char in text)
def _fix_input(input_):
@ -94,23 +86,19 @@ def _fix_input(input_):
def _do_command(command, **kwargs):
completed_process = subprocess.run(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True,
**kwargs)
return (_fix_input(completed_process.stdout),
_fix_input(completed_process.stderr), completed_process.returncode)
completed_process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, **kwargs)
return (_fix_input(completed_process.stdout), _fix_input(completed_process.stderr),
completed_process.returncode)
def _run_command(command, error_status=None, has_color=False, timeout=None,
**kwargs):
def _run_command(command, error_status=None, has_color=False, timeout=None, **kwargs):
error_status = Status.problem if error_status is None else error_status
if has_color:
process = subprocess.run(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True,
timeout=timeout, **kwargs)
stdout, stderr, returncode = (
termstr.TermStr.from_term(process.stdout),
termstr.TermStr.from_term(process.stderr), process.returncode)
process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, timeout=timeout, **kwargs)
stdout, stderr, returncode = (termstr.TermStr.from_term(process.stdout),
termstr.TermStr.from_term(process.stderr), process.returncode)
else:
stdout, stderr, returncode = _do_command(command, timeout=timeout)
result_status = Status.ok if returncode == 0 else error_status
@ -131,8 +119,7 @@ def _syntax_highlight(text, lexer, style):
hex_rgb = hex_rgb[1:]
return tuple(eval("0x"+hex_rgb[index:index+2]) for index in [0, 2, 4])
def _char_style_for_token_type(token_type, default_bg_color,
default_style):
def _char_style_for_token_type(token_type, default_bg_color, default_style):
try:
token_style = style.style_for_token(token_type)
except KeyError:
@ -141,17 +128,14 @@ def _syntax_highlight(text, lexer, style):
else _parse_rgb(token_style["color"]))
bg_color = (default_bg_color if token_style["bgcolor"] is None
else _parse_rgb(token_style["bgcolor"]))
return termstr.CharStyle(fg_color, bg_color, token_style["bold"],
token_style["italic"],
return termstr.CharStyle(fg_color, bg_color, token_style["bold"], token_style["italic"],
token_style["underline"])
default_bg_color = _parse_rgb(style.background_color)
default_style = termstr.CharStyle(bg_color=default_bg_color)
text = fill3.join(
"", [termstr.TermStr(text, _char_style_for_token_type(
token_type, default_bg_color, default_style))
for token_type, text in pygments.lex(text, lexer)])
text_widget = fill3.Text(text, pad_char=termstr.TermStr(" ").bg_color(
default_bg_color))
text = fill3.join("", [termstr.TermStr(text, _char_style_for_token_type(
token_type, default_bg_color, default_style))
for token_type, text in pygments.lex(text, lexer)])
text_widget = fill3.Text(text, pad_char=termstr.TermStr(" ").bg_color(default_bg_color))
return fill3.join("\n", text_widget.text)
@ -171,8 +155,7 @@ def _permissions_in_octal(permissions):
for part_index in range(3):
index = part_index * 3 + 1
part = permissions[index:index+3]
digit = sum(2 ** (2 - index) for index, element in enumerate(part)
if element != "-")
digit = sum(2 ** (2 - index) for index, element in enumerate(part) if element != "-")
result.append(str(digit))
return "".join(result)
@ -183,12 +166,11 @@ def _pretty_bytes(bytes):
units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
unit_index = int(math.floor(math.log(bytes, 1024)))
power = math.pow(1024, unit_index)
conversion = round(bytes/power, 2)
conversion = round(bytes / power, 2)
return f"{conversion} {units[unit_index]}"
@deps(deps={"file", "coreutils"}, url="https://github.com/ahamilton/eris",
executables={"file"})
@deps(deps={"file", "coreutils"}, url="https://github.com/ahamilton/eris", executables={"file"})
def metadata(path):
def detail(value, unit):
@ -198,41 +180,31 @@ def metadata(path):
stat_result = os.stat(path)
permissions = stat.filemode(stat_result.st_mode)
hardlinks = str(stat_result.st_nlink)
group = [pwd.getpwuid(stat_result.st_gid).pw_name,
detail(stat_result.st_gid, "gid")]
owner = [pwd.getpwuid(stat_result.st_uid).pw_name,
detail(stat_result.st_uid, "uid")]
modified, created, access = [
[time.asctime(time.gmtime(seconds)), detail(int(seconds), "secs")]
for seconds in (stat_result.st_mtime, stat_result.st_ctime,
stat_result.st_atime)]
size = [_pretty_bytes(stat_result.st_size),
detail(stat_result.st_size, "bytes")]
stdout, *rest = _do_command(
["file", "--dereference", "--brief", "--uncompress", "--mime", path])
group = [pwd.getpwuid(stat_result.st_gid).pw_name, detail(stat_result.st_gid, "gid")]
owner = [pwd.getpwuid(stat_result.st_uid).pw_name, detail(stat_result.st_uid, "uid")]
modified, created, access = [[time.asctime(time.gmtime(seconds)), detail(int(seconds), "secs")]
for seconds in (stat_result.st_mtime, stat_result.st_ctime,
stat_result.st_atime)]
size = [_pretty_bytes(stat_result.st_size), detail(stat_result.st_size, "bytes")]
stdout, *rest = _do_command(["file", "--dereference", "--brief", "--uncompress", "--mime",
path])
mime_type = stdout
stdout, *rest = _do_command(
["file", "--dereference", "--brief", "--uncompress", path])
stdout, *rest = _do_command(["file", "--dereference", "--brief", "--uncompress", path])
file_type = stdout
permissions_value = [permissions,
detail(_permissions_in_octal(permissions), None)]
permissions_value = [permissions, detail(_permissions_in_octal(permissions), None)]
text = []
for line in [
("size", size), ("permissions", permissions_value), None,
("modified time", modified), ("creation time", created),
("access time", access), None,
("owner", owner), ("group", group), None,
("hardlinks", hardlinks), ("symlink", is_symlink), None,
("mime type", mime_type.strip()),
("file type", file_type.strip())]:
for line in [("size", size), ("permissions", permissions_value), None,
("modified time", modified), ("creation time", created), ("access time", access),
None, ("owner", owner), ("group", group), None, ("hardlinks", hardlinks),
("symlink", is_symlink), None, ("mime type", mime_type.strip()),
("file type", file_type.strip())]:
if line is None:
text.append("\n")
else:
name, value = line
name = termstr.TermStr(name + ":").fg_color(
termstr.Color.blue).ljust(16)
name = termstr.TermStr(name + ":").fg_color(termstr.Color.blue).ljust(16)
text.append(name + fill3.join("", value) + "\n")
return (Status.ok, fill3.join("", text))
return Status.ok, fill3.join("", text)
@deps(deps={"python3-pygments"}, url="http://pygments.org/")
@ -264,8 +236,7 @@ def _is_python_test_file(path):
@deps(url="https://docs.python.org/3/library/unittest.html")
def python_unittests(path):
if _is_python_test_file(path):
command = ([path] if _has_shebang_line(path)
else [PYTHON_EXECUTABLE, path])
command = [path] if _has_shebang_line(path) else [PYTHON_EXECUTABLE, path]
stdout, stderr, returncode = _do_command(command, timeout=TIMEOUT)
status = Status.ok if returncode == 0 else Status.problem
return status, (stdout + "\n" + stderr)
@ -281,12 +252,10 @@ def pytest(path):
with tempfile.TemporaryDirectory() as temp_dir:
env = os.environ.copy()
env["COVERAGE_FILE"] = os.path.join(temp_dir, "coverage")
process = subprocess.run(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True,
timeout=TIMEOUT, env=env)
stdout, stderr, returncode = (
termstr.TermStr.from_term(process.stdout),
termstr.TermStr.from_term(process.stderr), process.returncode)
process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, timeout=TIMEOUT, env=env)
stdout, stderr, returncode = (termstr.TermStr.from_term(process.stdout),
termstr.TermStr.from_term(process.stderr), process.returncode)
if returncode == 5:
status = Status.not_applicable
else:
@ -297,17 +266,14 @@ def pytest(path):
@deps(deps={"python3-mypy"}, url="http://mypy-lang.org/")
def mypy(path):
stdout, stderr, returncode = _do_command(
[PYTHON_EXECUTABLE, "-m", "mypy", "--ignore-missing-imports", path],
timeout=TIMEOUT)
[PYTHON_EXECUTABLE, "-m", "mypy", "--ignore-missing-imports", path], timeout=TIMEOUT)
status = Status.ok if returncode == 0 else Status.problem
return status, stdout
def _colorize_coverage_report(lines):
line_color = {"> ": termstr.Color.green, "! ": termstr.Color.grey_150,
" ": None}
return fill3.join("", [termstr.TermStr(line).fg_color(line_color[line[:2]])
for line in lines])
line_color = {"> ": termstr.Color.green, "! ": termstr.Color.grey_150, " ": None}
return fill3.join("", [termstr.TermStr(line).fg_color(line_color[line[:2]]) for line in lines])
@deps(deps={"python3-coverage"}, url="https://coverage.readthedocs.io/")
@ -317,13 +283,11 @@ def python_coverage(path):
return Status.not_applicable, f'No "{coverage_path}" file.'
if os.stat(path).st_mtime > os.stat(coverage_path).st_mtime:
return (Status.not_applicable,
f'File has been modified since "{coverage_path}"'
' file was generated.')
f'File has been modified since "{coverage_path}" file was generated.')
path = os.path.normpath(path)
with tempfile.TemporaryDirectory() as temp_dir:
stdout, stderr, returncode = _do_command(
[PYTHON_EXECUTABLE, "-m", "coverage",
"annotate", "--directory", temp_dir, path])
stdout, stderr, returncode = _do_command([PYTHON_EXECUTABLE, "-m", "coverage", "annotate",
"--directory", temp_dir, path])
if returncode != 0:
return Status.problem, stdout
cover_filename = os.listdir(temp_dir)[0]
@ -380,8 +344,7 @@ def python_mccabe(path):
stdout, *rest = _do_command([PYTHON_EXECUTABLE, "-m", "mccabe", path])
max_score = 0
with contextlib.suppress(ValueError): # When there are no lines
max_score = max(_get_mccabe_line_score(line)
for line in stdout.splitlines())
max_score = max(_get_mccabe_line_score(line) for line in stdout.splitlines())
status = Status.problem if max_score > 10 else Status.ok
return status, _colorize_mccabe(stdout)
@ -394,8 +357,7 @@ def python_mccabe(path):
# Status.not_applicable)
@deps(deps={"perltidy"}, url="http://perltidy.sourceforge.net/",
executables={"perltidy"})
@deps(deps={"perltidy"}, url="http://perltidy.sourceforge.net/", executables={"perltidy"})
def perltidy(path):
stdout, *rest = _do_command(["perltidy", "-st", path])
return Status.ok, _syntax_highlight_using_path(stdout, path)
@ -414,8 +376,7 @@ def html_syntax(path):
def pandoc(path):
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = os.path.join(temp_dir, "temp.html")
_do_command(["pandoc", "-t", "html", "-o", temp_path, path],
timeout=TIMEOUT)
_do_command(["pandoc", "-t", "html", "-o", temp_path, path], timeout=TIMEOUT)
return elinks(temp_path)
@ -425,23 +386,19 @@ MAX_IMAGE_SIZE = 200
def _resize_image(image, new_width):
import PIL.Image # Here to avoid 'Segmentation Fault' in install-tools
scale = new_width / image.width
return image.resize((int(image.width * scale), int(image.height * scale)),
PIL.Image.ANTIALIAS)
return image.resize((int(image.width * scale), int(image.height * scale)), PIL.Image.ANTIALIAS)
def _image_to_text(image):
text = "" * image.width
data = list(image.getdata())
width = image.width
rows = [data[row_index*width:(row_index+1)*width]
for row_index in range(image.height)]
rows = [data[row_index*width:(row_index+1)*width] for row_index in range(image.height)]
if image.height % 2 == 1:
rows.append([None] * image.width)
return fill3.join("\n", [
termstr.TermStr(text, tuple(termstr.CharStyle(
fg_color=top_pixel, bg_color=bottom_pixel)
for top_pixel, bottom_pixel in zip(rows[index],
rows[index+1])))
termstr.TermStr(text, tuple(termstr.CharStyle(fg_color=top_pixel, bg_color=bottom_pixel)
for top_pixel, bottom_pixel in zip(rows[index], rows[index+1])))
for index in range(0, image.height, 2)])
@ -471,24 +428,18 @@ def godoc(path):
with tempfile.TemporaryDirectory() as temp_dir:
symlink_path = os.path.join(temp_dir, "file.go")
os.symlink(os.path.abspath(path), symlink_path)
stdout, stderr, returncode = _do_command(["go", "doc", "."],
cwd=temp_dir)
stdout, stderr, returncode = _do_command(["go", "doc", "."], cwd=temp_dir)
os.remove(symlink_path)
status = (Status.not_applicable if stdout.strip() == "" or returncode != 0
else Status.ok)
status = Status.not_applicable if stdout.strip() == "" or returncode != 0 else Status.ok
return status, stdout + stderr
@deps(deps={"git"}, url="https://git-scm.com/docs/git-log",
executables={"git"})
def git_log(path):
status, output = _run_command(
["git", "log", "--find-renames", "--follow", "--stat", "--color",
path], error_status=Status.not_applicable, has_color=True)
if output.data == "":
return Status.not_applicable, ""
else:
return status, output
status, output = _run_command(["git", "log", "--find-renames", "--follow", "--stat", "--color",
path], error_status=Status.not_applicable, has_color=True)
return (Status.not_applicable, "") if output.data == "" else (status, output)
def make_tool_function(dependencies, command, url=None, error_status=None,
@ -501,8 +452,7 @@ def make_tool_function(dependencies, command, url=None, error_status=None,
@deps(deps=set(dependencies), url=url, executables=executables)
def func(path):
return _run_command(command_parts + [path], error_status, has_color,
timeout)
return _run_command(command_parts + [path], error_status, has_color, timeout)
func.command = command
return func
@ -539,8 +489,7 @@ def lru_cache_with_eviction(maxsize=128, typed=False):
def remove_version(*args, **kwds):
return user_function(*args[1:], **kwds)
new_func = functools.lru_cache(maxsize=maxsize, typed=typed)(
remove_version)
new_func = functools.lru_cache(maxsize=maxsize, typed=typed)(remove_version)
def add_version(*args, **kwds):
key = make_key(args, kwds, typed)
@ -552,8 +501,7 @@ def lru_cache_with_eviction(maxsize=128, typed=False):
return decorating_function
def dump_pickle_safe(object_, path, protocol=pickle.HIGHEST_PROTOCOL,
open=open):
def dump_pickle_safe(object_, path, protocol=pickle.HIGHEST_PROTOCOL, open=open):
tmp_path = path + ".tmp"
try:
with open(tmp_path, "wb") as file_:
@ -566,15 +514,13 @@ def dump_pickle_safe(object_, path, protocol=pickle.HIGHEST_PROTOCOL,
@functools.lru_cache()
def compression_open_func(compression):
return (open if compression == "none" else
importlib.import_module(compression).open)
return open if compression == "none" else importlib.import_module(compression).open
class Result:
COMPLETED_STATUSES = {
Status.ok, Status.problem, Status.error, Status.not_applicable,
Status.timed_out}
COMPLETED_STATUSES = {Status.ok, Status.problem, Status.error,
Status.not_applicable, Status.timed_out}
def __init__(self, path, tool):
self.path = path
@ -594,8 +540,7 @@ class Result:
if self.status == Status.pending or self.compression is None:
return unknown_label
try:
with compression_open_func(self.compression)(
self.pickle_path(), "rb") as pickle_file:
with compression_open_func(self.compression)(self.pickle_path(), "rb") as pickle_file:
return pickle.load(pickle_file)
except FileNotFoundError:
return unknown_label
@ -603,8 +548,7 @@ class Result:
@result.setter
def result(self, value):
os.makedirs(os.path.dirname(self.pickle_path()), exist_ok=True)
dump_pickle_safe(value, self.pickle_path(),
open=compression_open_func(self.compression))
dump_pickle_safe(value, self.pickle_path(), open=compression_open_func(self.compression))
Result.result.fget.evict(self)
def set_status(self, status):
@ -627,10 +571,8 @@ class Result:
end_time = time.time()
self.set_status(new_status)
appearance_changed_event.set()
log.log_message(
["Finished running ", tool_name, " on ", path, ". ",
STATUS_TO_TERMSTR[new_status],
f" {round(end_time - start_time, 2)} secs"])
log.log_message(["Finished running ", tool_name, " on ", path, ". ",
STATUS_TO_TERMSTR[new_status], f" {round(end_time - start_time, 2)} secs"])
def reset(self):
self.set_status(Status.pending)
@ -641,8 +583,7 @@ class Result:
fg_color=termstr.Color.white, bg_color=status_color, is_bold=True))
def appearance_min(self):
return ([self._get_cursor() if self.is_highlighted else
STATUS_TO_TERMSTR[self.status]])
return [self._get_cursor() if self.is_highlighted else STATUS_TO_TERMSTR[self.status]]
def get_pages_dir(self):
return self.pickle_path() + ".pages"
@ -657,8 +598,7 @@ class Result:
def as_html(self):
html, styles = termstr.TermStr(
STATUS_TO_TERMSTR[self.status]).as_html()
return (f'<a title="{self.tool.__name__}" '
f'href="{self.path}/{self.tool.__name__}" '
return (f'<a title="{self.tool.__name__}" href="{self.path}/{self.tool.__name__}" '
f'target="listing">{html}</a>', styles)
@ -716,8 +656,7 @@ def splitext(path):
@functools.lru_cache()
def is_tool_available(tool):
if (hasattr(tool, "command") and tool.command.startswith(
f"{PYTHON_EXECUTABLE} -m ")):
if (hasattr(tool, "command") and tool.command.startswith(f"{PYTHON_EXECUTABLE} -m ")):
return importlib.util.find_spec(tool.command.split()[2]) is not None
try:
return all(shutil.which(executable) for executable in tool.executables)
@ -726,8 +665,7 @@ def is_tool_available(tool):
def tools_for_path(path):
git_tools = ([git_diff, git_blame, git_log]
if os.path.exists(".git") else [])
git_tools = [git_diff, git_blame, git_log] if os.path.exists(".git") else []
root, ext = splitext(path)
extra_tools = [] if ext == "" else _tools_for_extension().get(ext[1:], [])
tools = generic_tools() + git_tools + extra_tools
@ -764,10 +702,8 @@ def path_colored(path):
else:
dirname = dirname + os.path.sep
dir_style = _charstyle_of_path(os.path.sep)
parts = [termstr.TermStr(part, dir_style)
for part in dirname.split(os.path.sep)]
path_sep = termstr.TermStr(os.path.sep).fg_color(
termstr.Color.grey_100)
parts = [termstr.TermStr(part, dir_style) for part in dirname.split(os.path.sep)]
path_sep = termstr.TermStr(os.path.sep).fg_color(termstr.Color.grey_100)
dir_name = fill3.join(path_sep, parts)
return dir_name + termstr.TermStr(basename, char_style)

View file

@ -47,9 +47,8 @@ def make_listing_page(url_path):
result = index[(path, tool_name)]
tool = getattr(tools, tool_name)
tool_name_colored = tools.tool_name_colored(tool, path)
header = fill3.appearance_as_html(
[tools.path_colored(path) + " - " + tool_name_colored,
termstr.TermStr(" ").underline() * 100])
header = fill3.appearance_as_html([tools.path_colored(path) + " - " + tool_name_colored,
termstr.TermStr(" ").underline() * 100])
body = fill3.appearance_as_html(result.appearance_min())
return make_page(header + body, f"{path} - {tool_name}")
@ -90,13 +89,11 @@ def make_main_page(project_name):
def make_summary_page(project_name, summary):
summary_html, summary_styles = summary.as_html()
body_html = ("\n".join(style.as_html() for style in summary_styles)
+ "\n" + summary_html)
body_html = "\n".join(style.as_html() for style in summary_styles) + "\n" + summary_html
return make_page(body_html, "Summary of " + project_name)
def run(server_class=http.server.HTTPServer, handler_class=Webserver,
port=8080):
def run(server_class=http.server.HTTPServer, handler_class=Webserver, port=8080):
server_address = ("", port)
httpd = server_class(server_address, handler_class)
print("Starting httpd…")
@ -104,8 +101,7 @@ def run(server_class=http.server.HTTPServer, handler_class=Webserver,
def get_summary(project_path):
pickle_path = os.path.join(project_path, tools.CACHE_PATH,
"summary.pickle")
pickle_path = os.path.join(project_path, tools.CACHE_PATH, "summary.pickle")
with gzip.open(pickle_path, "rb") as file_:
screen = pickle.load(file_)
summary = screen._summary

View file

@ -26,9 +26,8 @@ class Worker:
async def create_process(self):
create = asyncio.create_subprocess_exec(
"eris-worker", stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
preexec_fn=os.setsid)
"eris-worker", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, preexec_fn=os.setsid)
self.process = await create
pid_line = await self.process.stdout.readline()
self.child_pgid = int(pid_line.strip())
@ -46,8 +45,7 @@ class Worker:
break
return tools.Status(int(data))
async def job_runner(self, screen, summary, log, jobs_added_event,
appearance_changed_event):
async def job_runner(self, screen, summary, log, jobs_added_event, appearance_changed_event):
await self.create_process()
while True:
await jobs_added_event.wait()

View file

@ -9,8 +9,7 @@ except ImportError:
setup(name="eris",
version="2021.10.18",
description=("Eris maintains an up-to-date set of reports for every"
" file in a codebase."),
description=("Eris maintains an up-to-date set of reports for every file in a codebase."),
url="https://github.com/ahamilton/eris",
author="Andrew Hamilton",
author_email="and_hamilton@yahoo.com",
@ -18,8 +17,7 @@ setup(name="eris",
packages=["eris"],
py_modules=["lscolors", "sorted_collection"],
package_data={"eris": ["LS_COLORS.sh", "tools.toml"]},
entry_points={"console_scripts":
["eris=eris.__main__:entry_point",
"eris-worker=eris.worker:main",
"eris-webserver=eris.webserver:main",
"pydoc_color=eris.pydoc_color:main"]})
entry_points={"console_scripts": ["eris=eris.__main__:entry_point",
"eris-worker=eris.worker:main",
"eris-webserver=eris.webserver:main",
"pydoc_color=eris.pydoc_color:main"]})

View file

@ -67,16 +67,12 @@ class ToolsTestCase(unittest.TestCase):
def test_metadata(self):
mock_stat_result = unittest.mock.Mock(
st_mode=0o755, st_mtime=1454282045, st_ctime=1454282045,
st_atime=1454282047, st_size=12, st_uid=1111, st_gid=1111,
st_nlink=2)
st_mode=0o755, st_mtime=1454282045, st_ctime=1454282045, st_atime=1454282047,
st_size=12, st_uid=1111, st_gid=1111, st_nlink=2)
mock_pw_entry = unittest.mock.Mock(pw_name="foo")
with unittest.mock.patch.object(os, "stat",
return_value=mock_stat_result):
with unittest.mock.patch.object(tools.pwd, "getpwuid",
return_value=mock_pw_entry):
self._test_tool(tools.metadata,
[("hi3.py", tools.Status.ok)])
with unittest.mock.patch.object(os, "stat", return_value=mock_stat_result):
with unittest.mock.patch.object(tools.pwd, "getpwuid", return_value=mock_pw_entry):
self._test_tool(tools.metadata, [("hi3.py", tools.Status.ok)])
def test_contents(self):
self._test_tool(tools.contents, [("hi3.py", tools.Status.ok)])
@ -107,19 +103,16 @@ class ToolsTestCase(unittest.TestCase):
self._test_tool(tools.python_mccabe, self.HI_OK)
def test_perl_syntax(self):
self._test_tool(tools.perl_syntax,
[("perl.pl", tools.Status.ok)])
self._test_tool(tools.perl_syntax, [("perl.pl", tools.Status.ok)])
def test_c_syntax_gcc(self):
self._test_tool(tools.c_syntax_gcc, [("hello.c", tools.Status.ok)])
def test_objdump_headers(self):
self._test_tool(tools.objdump_headers,
[("rotatingtree.o", tools.Status.ok)])
self._test_tool(tools.objdump_headers, [("rotatingtree.o", tools.Status.ok)])
def test_objdump_disassemble(self):
self._test_tool(tools.objdump_disassemble,
[("rotatingtree.o", tools.Status.problem)])
self._test_tool(tools.objdump_disassemble, [("rotatingtree.o", tools.Status.problem)])
def test_readelf(self):
self._test_tool(tools.readelf, [("rotatingtree.o", tools.Status.ok)])
@ -148,8 +141,7 @@ class ToolsTestCase(unittest.TestCase):
def test_pil(self):
for extension in ["png", "jpg", "gif", "bmp", "ppm", "tiff", "tga"]:
self._test_tool(tools.pil, [("circle." + extension,
tools.Status.ok)])
self._test_tool(tools.pil, [("circle." + extension, tools.Status.ok)])
class LruCacheWithEvictionTestCase(unittest.TestCase):