Made vigil into a package with a setup.py file.

This commit is contained in:
Andrew Hamilton 2017-06-27 14:03:32 +01:00
parent 49f8d87659
commit 5728e5cff3
135 changed files with 76 additions and 50 deletions

View file

@ -34,11 +34,11 @@ import docopt
import pygments.styles
import pyinotify
import fill3
import terminal
import termstr
import tools
import worker
from vigil import fill3
from vigil import terminal
from vigil import termstr
from vigil import tools
from vigil import worker
USAGE = """
@ -1046,7 +1046,7 @@ def check_arguments():
return root_path, worker_count, editor_command, arguments["--theme"]
if __name__ == "__main__":
def entry_point():
root_path, worker_count, editor_command, theme = check_arguments()
with terminal.console_title("vigil: " + os.path.basename(root_path)):
manage_cache(root_path)
@ -1054,3 +1054,7 @@ if __name__ == "__main__":
loop = asyncio.get_event_loop()
main(root_path, loop, worker_count, editor_command, theme)
os._exit(0)
if __name__ == "__main__":
entry_point()

472
vigil/fill3.py Normal file
View file

@ -0,0 +1,472 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015-2017 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import asyncio
import collections
import contextlib
import itertools
import os
import signal
import sys
import urwid
import urwid.raw_display
import vigil.terminal as terminal
import vigil.termstr as termstr
def appearance_is_valid(appearance):
"""An appearance is a list of strings of equal length.
An empty list is valid. Empty strings are not allowed."""
return (all(isinstance(line, (str, termstr.TermStr)) and len(line) > 0
for line in appearance) and
len(set(len(line) for line in appearance)) < 2)
def appearance_resize(appearance, dimensions, pad_char=" "):
width, height = dimensions
result = [line[:width].ljust(width, pad_char)
for line in appearance[:height]]
if len(result) < height:
result.extend([pad_char * width] * (height - len(result)))
return result
def appearance_dimensions(appearance):
try:
return len(appearance[0]), len(appearance)
except IndexError:
return 0, 0
def join(seperator, parts):
"""Returns a string if all the parts and the seperator are plain strings.
In other words it returns a TermStr if anything is a TermStr."""
if parts == []:
return ""
try:
return seperator.join(parts)
except TypeError:
return termstr.TermStr(seperator).join(parts)
def join_horizontal(appearances):
heights = set(len(appearance) for appearance in appearances)
assert len(heights) == 1, heights
return [join("", parts) for parts in zip(*appearances)]
def even_widths(column_widgets, width):
column_count = len(column_widgets)
widths = []
for index, column_widget in enumerate(column_widgets):
start_pos = int(round(float(width) / column_count * index))
end_pos = int(round(float(width) / column_count * (index+1)))
widths.append(end_pos - start_pos)
return widths
class Row(collections.UserList):
def __init__(self, widgets, widths_func=even_widths):
collections.UserList.__init__(self, widgets)
self.widgets = self.data
self.widths_func = widths_func
def appearance(self, dimensions):
width, height = dimensions
widths = self.widths_func(self.widgets, width)
assert sum(widths) == width, (sum(widths), width)
return join_horizontal([column_widget.appearance((item_width, height))
for column_widget, item_width
in zip(self.widgets, widths)])
def appearance_min(self):
appearances = [column_widget.appearance_min()
for column_widget in self.widgets]
dimensions = [appearance_dimensions(appearance)
for appearance in appearances]
max_height = max(height for width, height in dimensions)
return join_horizontal([
appearance_resize(appearance, (width, max_height))
for appearance, (width, height) in zip(appearances, dimensions)])
def even_partition(row_widgets, height):
row_count = len(row_widgets)
heights = []
for index, row_widget in enumerate(row_widgets):
start_pos = int(round(float(height) / row_count * index))
end_pos = int(round(float(height) / row_count * (index+1)))
heights.append(end_pos - start_pos)
return heights
def join_vertical(appearances):
result = []
for appearance in appearances:
result.extend(appearance)
return result
class Column(collections.UserList):
def __init__(self, widgets, partition_func=even_partition,
background_char=" "):
collections.UserList.__init__(self, widgets)
self.widgets = self.data
self.partition_func = partition_func
self.background_char = background_char
def appearance(self, dimensions):
width, height = dimensions
if len(self.widgets) == 0: # FIX: Really allow zero widgets?
return [self.background_char * width] * height
heights = self.partition_func(self.widgets, height)
assert sum(heights) == height, (sum(heights), height)
return join_vertical([row_widget.appearance((width, item_height))
for row_widget, item_height
in zip(self.widgets, heights)])
def _appearance_list(self, widgets):
if widgets == []:
return []
appearances = [row_widget.appearance_min() for row_widget in widgets]
dimensions = [appearance_dimensions(appearance)
for appearance in appearances]
max_width = max(width for width, height in dimensions)
padded_appearances = [
appearance_resize(appearance, (max_width, height))
for appearance, (width, height) in zip(appearances, dimensions)]
result = []
for appearance in padded_appearances:
result.extend(appearance)
return result
def appearance_interval(self, interval):
start_y, end_y = interval
return self._appearance_list(self.widgets[start_y:end_y])
def appearance_min(self):
return self._appearance_list(self.widgets)
class Filler:
def __init__(self, widget):
self.widget = widget
def appearance(self, dimensions):
return appearance_resize(self.widget.appearance_min(), dimensions)
class ScrollBar:
_GREY_BACKGROUND_STYLE = termstr.CharStyle(bg_color=termstr.Color.grey_100)
_GREY_BLOCK = termstr.TermStr(" ", _GREY_BACKGROUND_STYLE)
def __init__(self, is_horizontal, interval=(0, 0), bar_char=_GREY_BLOCK,
background_char=" "):
self._is_horizontal = is_horizontal
self.interval = interval
self.bar_char = bar_char
self.background_char = background_char
def appearance(self, dimensions):
width, height = dimensions
assert width == 1 or height == 1, (width, height)
length = width if self._is_horizontal else height
assert all(0 <= fraction <= 1 for fraction in self.interval), \
self.interval
start_index, end_index = [int(fraction * length)
for fraction in self.interval]
if start_index == end_index and end_index < length:
end_index += 1
bar = (self.background_char * start_index +
self.bar_char * (end_index - start_index) +
self.background_char * (length - end_index))
return [bar] if self._is_horizontal else [char for char in bar]
class Portal:
def __init__(self, widget, position=(0, 0), background_char=" "):
self.widget = widget
self.position = position
self.background_char = background_char
self.last_dimensions = 0, 0
def _scroll_half_pages(self, dx, dy):
x, y = self.position
width, height = self.last_dimensions
self.position = (max(x + dx * (width // 2), 0),
max(y + dy * (height // 2), 0))
def scroll_up(self):
self._scroll_half_pages(0, -1)
def scroll_down(self):
self._scroll_half_pages(0, 1)
def scroll_left(self):
self._scroll_half_pages(-1, 0)
def scroll_right(self):
self._scroll_half_pages(1, 0)
def appearance(self, dimensions):
width, height = dimensions
x, y = self.position
try:
appearance = self.widget.appearance_interval((y, y+height))
except AttributeError:
appearance = self.widget.appearance_min()[y:y+height]
self.last_dimensions = dimensions
return appearance_resize([row[x:x+width] for row in appearance],
dimensions, self.background_char)
class View:
def __init__(self, portal, horizontal_scrollbar, vertical_scrollbar,
hide_scrollbars=True):
self.portal = portal
self.horizontal_scrollbar = horizontal_scrollbar
self.vertical_scrollbar = vertical_scrollbar
self.hide_scrollbars = hide_scrollbars
@classmethod
def from_widget(cls, widget):
return cls(Portal(widget), ScrollBar(is_horizontal=True),
ScrollBar(is_horizontal=False))
@property
def position(self):
return self.portal.position
@position.setter
def position(self, position):
self.portal.position = position
@property
def widget(self):
return self.portal.widget
@widget.setter
def widget(self, widget):
self.portal.widget = widget
def appearance(self, dimensions):
width, height = dimensions
try:
full_width, full_height = (self.portal.widget.
appearance_dimensions())
except AttributeError:
full_appearance = self.portal.widget.appearance_min()
full_width, full_height = appearance_dimensions(full_appearance)
if full_width == 0 or full_height == 0:
return self.portal.appearance(dimensions)
x, y = self.portal.position
hide_scrollbar_vertical = (self.hide_scrollbars and
full_height <= height and y == 0)
hide_scrollbar_horizontal = (self.hide_scrollbars and
full_width <= width and x == 0)
if not hide_scrollbar_horizontal:
full_width = max(full_width, x + width)
self.horizontal_scrollbar.interval = (x / full_width,
(x + width) / full_width)
height -= 1
if not hide_scrollbar_vertical:
full_height = max(full_height, y + height)
self.vertical_scrollbar.interval = (y / full_height,
(y + height) / full_height)
width -= 1
portal_appearance = self.portal.appearance((width, height))
if hide_scrollbar_vertical:
result = portal_appearance
else:
scrollbar_v_appearance = self.vertical_scrollbar.appearance(
(1, height))
result = join_horizontal([portal_appearance,
scrollbar_v_appearance])
if not hide_scrollbar_horizontal:
scrollbar_h_appearance = self.horizontal_scrollbar.appearance(
(width, 1))
result.append(scrollbar_h_appearance[0] +
("" if hide_scrollbar_vertical else " "))
return result
class Text:
def __init__(self, text, pad_char=" "):
lines = text.splitlines()
if len(lines) == 0:
self.text = []
elif len(lines) == 1:
self.text = [text]
else:
max_width = max(len(line) for line in lines)
height = len(lines)
self.text = appearance_resize(lines, (max_width, height), pad_char)
def appearance_min(self):
return self.text
def appearance(self, dimensions):
return appearance_resize(self.appearance_min(), dimensions)
class Table:
def __init__(self, table, pad_char=" "):
self._widgets = table
self._pad_char = pad_char
def appearance_min(self):
if self._widgets == []:
return []
appearances = [[cell.appearance_min() for cell in row]
for row in self._widgets]
row_heights = [0] * len(self._widgets)
column_widths = [0] * len(self._widgets[0])
for y, row in enumerate(appearances):
for x, appearance in enumerate(row):
width, height = appearance_dimensions(appearance)
row_heights[y] = max(row_heights[y], height)
column_widths[x] = max(column_widths[x], width)
return join_vertical([join_horizontal(
[appearance_resize(appearance, (column_widths[x], row_heights[y]),
pad_char=self._pad_char)
for x, appearance in enumerate(row)])
for y, row in enumerate(appearances)])
class Border:
THIN = ["", "", "", "", "", "", "", ""]
THICK = ["", "", "", "", "", "", "", ""]
ROUNDED = ["", "", "", "", "", "", "", ""]
DOUBLE = ["", "", "", "", "", "", "", ""]
def __init__(self, widget, title=None, characters=THIN):
self.widget = widget
self.title = title
self.set_style(characters)
def set_style(self, characters):
(self.top, self.bottom, self.left, self.right, self.top_left,
self.bottom_left, self.bottom_right, self.top_right) = characters
def _add_border(self, body_content):
content_width, content_height = appearance_dimensions(body_content)
if self.title is None:
title_bar = self.top * content_width
else:
padded_title = (" " + self.title + " ")[:content_width]
try:
title_bar = padded_title.center(content_width, self.top)
except TypeError:
padded_title = termstr.TermStr(padded_title)
title_bar = padded_title.center(content_width, self.top)
result = [self.top_left + title_bar + self.top_right]
result.extend(self.left + line + self.right for line in body_content)
result.append(self.bottom_left + self.bottom * content_width +
self.bottom_right)
return result
def appearance_min(self):
return self._add_border(self.widget.appearance_min())
def appearance(self, dimensions):
width, height = dimensions
return self._add_border(self.widget.appearance((width-2, height-2)))
class Placeholder:
def __init__(self, widget=None):
self.widget = widget
def appearance_min(self):
return self.widget.appearance_min()
def appearance(self, dimensions):
return self.widget.appearance(dimensions)
class Fixed:
def __init__(self, appearance):
self.appearance_min_ = appearance
def appearance_min(self):
return self.appearance_min_
##########################
def draw_screen(widget):
appearance = widget.appearance(os.get_terminal_size())
print(terminal.move(0, 0), *appearance, sep="", end="", flush=True)
_last_appearance = []
def patch_screen(widget):
global _last_appearance
appearance = widget.appearance(os.get_terminal_size())
zip_func = (itertools.zip_longest
if len(appearance) > len(_last_appearance) else zip)
changed_lines = (str(terminal.move(0, row_index)) + line
for row_index, (line, old_line)
in enumerate(zip_func(appearance, _last_appearance))
if line != old_line)
print(*changed_lines, sep="", end="", flush=True)
_last_appearance = appearance
@contextlib.contextmanager
def _urwid_screen():
screen = urwid.raw_display.Screen()
screen.set_mouse_tracking(True)
screen.start()
try:
yield screen
finally:
screen.stop()
@asyncio.coroutine
def _update_screen(screen_widget, appearance_changed_event):
while True:
yield from appearance_changed_event.wait()
appearance_changed_event.clear()
patch_screen(screen_widget)
def on_input(urwid_screen, screen_widget):
for event in urwid_screen.get_input():
screen_widget.on_input_event(event)
def main(loop, appearance_changed_event, screen_widget, exit_loop=None):
appearance_changed_event.set()
if exit_loop is None:
exit_loop = loop.stop
loop.add_signal_handler(signal.SIGWINCH, appearance_changed_event.set)
loop.add_signal_handler(signal.SIGINT, exit_loop)
loop.add_signal_handler(signal.SIGTERM, exit_loop)
asyncio.async(_update_screen(screen_widget, appearance_changed_event))
with terminal.hidden_cursor(), terminal.fullscreen(), \
_urwid_screen() as urwid_screen:
loop.add_reader(sys.stdin, on_input, urwid_screen, screen_widget)
loop.run_forever()

78
vigil/golden.py Normal file
View file

@ -0,0 +1,78 @@
# Copyright (C) 2015-2017 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import optparse
import os.path
import shutil
import subprocess
import sys
import tempfile
import unittest
def _accept_actual(failed):
for actual_str, golden_path in failed:
with open(golden_path, "w") as golden_file:
golden_file.write(actual_str)
print("Wrote golden file: %s" % golden_path)
def _run_meld_gui(failed):
temp_dir = tempfile.mkdtemp()
try:
golden_dir = os.path.join(temp_dir, "golden")
actual_dir = os.path.join(temp_dir, "actual")
os.mkdir(golden_dir)
os.mkdir(actual_dir)
for actual_str, golden_file in failed:
name = os.path.basename(golden_file)
actual_path = os.path.join(actual_dir, name)
with open(actual_path, "w") as actual:
actual.write(actual_str)
os.symlink(os.path.abspath(golden_file),
os.path.join(golden_dir, name))
subprocess.call(["meld", actual_dir, golden_dir])
finally:
shutil.rmtree(temp_dir)
_FAILED = set()
def assertGolden(actual, golden_path):
try:
with open(golden_path, "r") as golden_file:
expected = golden_file.read()
except FileNotFoundError:
expected = None
if actual != expected:
_FAILED.add((actual, golden_path))
if expected is None:
raise unittest.TestCase.failureException(
'The golden file does not exist: %r\nUse "--diff" or'
' "--accept" to create the golden file.' % golden_path)
else:
raise unittest.TestCase.failureException(
'Output does not match golden file: %r\nUse "--diff" or'
' "--accept" to update the golden file.' % golden_path)
def main():
parser = optparse.OptionParser()
parser.add_option("-a", "--accept", action="store_true",
dest="should_accept_actual")
parser.add_option("-d", "--diff", action="store_true", dest="should_diff")
options, args = parser.parse_args()
# unitest.main doesn't expect these arguments, so remove them.
for argument in ["-a", "--accept", "-d", "--diff"]:
if argument in sys.argv:
sys.argv.remove(argument)
try:
unittest.main()
finally:
if len(_FAILED) > 0:
if options.should_accept_actual:
_accept_actual(_FAILED)
if options.should_diff:
_run_meld_gui(_FAILED)

72
vigil/gut.py Executable file
View file

@ -0,0 +1,72 @@
#!/usr/bin/env python3
# Copyright (C) 2015-2017 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import re
import sys
USAGE = """Usage: gut.py <python file>
# gut.py test.py"""
INDENT_SIZE = 4
TAB_SIZE = 4
def indentation_of_line(line):
indentation = 0
for character in line:
if character == " ":
indentation += 1
elif character == "\t":
indentation += TAB_SIZE
elif character == "\n":
return None
else: # Is a non-whitespace character.
return indentation
def is_start_line_of_signature(line):
return re.match("^\s*(async)?\s*def\s", line) is not None
def is_end_line_of_signature(line):
return (re.match(".*\):\s*\n$", line) is not None or
re.match(".*\):\s*#.*\n$", line) is not None)
def gut_module(module_contents):
SIGNATURE, BODY, TOP_LEVEL = 1, 2, 3
state = TOP_LEVEL
body_depth = 0
result = []
for line in module_contents.splitlines(keepends=True):
indent = indentation_of_line(line)
if state == BODY and indent is not None and \
indent < body_depth:
state = TOP_LEVEL
result.append("\n")
if state == TOP_LEVEL and is_start_line_of_signature(line):
state = SIGNATURE
body_depth = indent + INDENT_SIZE
if state == SIGNATURE and is_end_line_of_signature(line):
result.append(line)
state = BODY
elif state != BODY:
result.append(line)
return "".join(result)
def main(module_path):
with open(module_path) as module_file:
print(gut_module(module_file.read()))
if __name__ == "__main__":
if len(sys.argv) != 2:
print(USAGE)
sys.exit(-1)
main(sys.argv[1])

130
vigil/lscolors.py Normal file
View file

@ -0,0 +1,130 @@
# Copyright (C) 2011, 2015-2017 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import os
import os.path
import stat
import syslog
FILE_KEY = "fi"
DIRECTORY_KEY = "di"
OTHER_WRITABLE_KEY = "ow"
EXECUTABLE_KEY = "ex"
SETUID_KEY = "su"
SETGUID_KEY = "sg"
SYMLINK_KEY = "ln"
ORPHAN_KEY = "or"
PIPE_KEY = "pi"
CHARACTER_DEVICE_KEY = "cd"
BLOCK_DEVICE_KEY = "bd"
STICKY_KEY = "st"
STICKY_OTHER_WRITABLE_KEY = "tw"
SOCKET_KEY = "so"
MISSING_KEY = "mi"
MULTI_HARDLINK_KEY = "mh"
def _parse_ls_colors(ls_codes):
color_codes = {}
for entry in ls_codes.split(":"):
if "=" not in entry:
continue
entry_key, entry_value = entry.split("=")
if entry_key.startswith("*."):
entry_key = entry_key[1:]
color_codes[entry_key] = entry_value
assert color_codes != {}, color_codes
return color_codes
_DEFAULT_COLOR_CODES = \
{BLOCK_DEVICE_KEY: '01;33', SYMLINK_KEY: '01;36',
STICKY_OTHER_WRITABLE_KEY: '30;42', DIRECTORY_KEY: '01;34',
SETUID_KEY: '37;41', CHARACTER_DEVICE_KEY: '01;33', SOCKET_KEY: '01;35',
EXECUTABLE_KEY: '01;32', STICKY_KEY: '37;44',
OTHER_WRITABLE_KEY: '34;42', PIPE_KEY: '33', SETGUID_KEY: '30;43',
ORPHAN_KEY: '40;31;01'}
def get_color_codes(environment):
if "LS_COLORS" in environment:
try:
return _parse_ls_colors(environment["LS_COLORS"])
except:
syslog.syslog("Syntax error in LS_COLORS environment variable. "
"Using default colors.")
return _DEFAULT_COLOR_CODES
def color_key_for_path(path, color_codes, is_link_target=True):
# see print_color_indicator in the file 'ls.c' in the coreutils codebase
if not os.path.lexists(path):
return MISSING_KEY
elif os.path.islink(path):
if is_link_target:
try:
link_path = os.path.join(os.path.dirname(path),
os.readlink(path))
file_stat = os.stat(link_path)
except OSError:
return ORPHAN_KEY
else:
return SYMLINK_KEY
else:
file_stat = os.stat(path)
mode = file_stat.st_mode
if stat.S_ISREG(mode):
if mode & stat.S_ISUID and SETUID_KEY in color_codes:
return SETUID_KEY
elif mode & stat.S_ISGID and SETGUID_KEY in color_codes:
return SETGUID_KEY
elif ((mode & stat.S_IXUSR or mode & stat.S_IXGRP or
mode & stat.S_IXOTH) and EXECUTABLE_KEY in color_codes):
return EXECUTABLE_KEY
elif file_stat.st_nlink > 1 and MULTI_HARDLINK_KEY in color_codes:
return MULTI_HARDLINK_KEY
else:
return FILE_KEY
elif stat.S_ISDIR(mode):
if (mode & stat.S_ISVTX and mode & stat.S_IWOTH and
STICKY_OTHER_WRITABLE_KEY in color_codes):
return STICKY_OTHER_WRITABLE_KEY
elif (mode & stat.S_IWOTH) != 0 and OTHER_WRITABLE_KEY in color_codes:
return OTHER_WRITABLE_KEY
elif (mode & stat.S_ISVTX) != 0 and STICKY_KEY in color_codes:
return STICKY_KEY
else:
return DIRECTORY_KEY
for test_function, color_key in [(stat.S_ISFIFO, PIPE_KEY),
(stat.S_ISSOCK, SOCKET_KEY),
(stat.S_ISBLK, BLOCK_DEVICE_KEY),
(stat.S_ISCHR, CHARACTER_DEVICE_KEY)]:
if test_function(mode):
return color_key
return ORPHAN_KEY
def color_code_for_path(path, color_codes):
def get_extension(basename, color_codes):
parts = basename.split(".")
if len(parts) == 2:
extension = "." + parts[1]
if extension in color_codes:
return extension
elif len(parts) > 2:
for extension in color_codes:
if extension.startswith(".") and \
basename.endswith(extension):
return extension
target_link = color_codes.get(SYMLINK_KEY, None)
color_key = color_key_for_path(path, color_codes,
target_link == "target")
if color_key == FILE_KEY:
filename = os.path.basename(path)
if "." in filename:
extension = get_extension(filename, color_codes)
if extension is not None:
color_key = extension
return color_codes.get(color_key, None)

64
vigil/terminal.py Normal file
View file

@ -0,0 +1,64 @@
# Copyright (C) 2015-2017 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import contextlib
import sys
ESC = "\x1b"
normal = ESC + "(B\x1b[m" # sgr0 "[0m" ?
bold = ESC + "[1m" # bold
italic = ESC + "[3m" # sitm
standout = ESC + "[7m" # smso
underline = ESC + "[4m" # smul
enter_fullscreen = ESC + "[?1049h" # smcup
exit_fullscreen = ESC + "[?1049l" # rmcup
hide_cursor = ESC + "[?25l" # civis
normal_cursor = ESC + "[?25l\x1b[?25h" # cnorm
clear = ESC + "[H\x1b[2J" # clear
save = ESC + "7" # sc
restore = ESC + "8" # rc
def color(color_number, is_foreground):
return "\x1b[%s;5;%im" % ("38" if is_foreground else "48", color_number)
def rgb_color(rgb, is_foreground):
return "\x1b[%s;2;" % ("38" if is_foreground else "48") + "%i;%i;%im" % rgb
def move(x, y): # cup
return "\x1b[%i;%iH" % (y + 1, x + 1)
@contextlib.contextmanager
def fullscreen():
sys.stdout.write(enter_fullscreen)
try:
yield
finally:
sys.stdout.write(exit_fullscreen)
@contextlib.contextmanager
def hidden_cursor():
sys.stdout.write(hide_cursor)
try:
yield
finally:
sys.stdout.write(normal_cursor)
@contextlib.contextmanager
def console_title(title):
sys.stdout.write(save)
sys.stdout.write("\033]0;%s\007" % title)
try:
yield
finally:
sys.stdout.write(restore)

264
vigil/termstr.py Normal file
View file

@ -0,0 +1,264 @@
# Copyright (C) 2015-2017 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import collections
import itertools
import os
import weakref
import pygments.formatters.terminal256
import vigil.terminal as terminal
def _cache_first_result(user_function):
def decorator(self, *args, **kwds):
try:
return self._cache
except AttributeError:
self._cache = user_function(self, *args, **kwds)
return self._cache
return decorator
class Color:
black = (0, 0, 0)
white = (255, 255, 255)
red = (255, 0, 0)
green = (0, 255, 0)
blue = (0, 0, 255)
yellow = (255, 255, 0)
grey_50 = (50, 50, 50)
grey_100 = (100, 100, 100)
light_blue = (90, 90, 255)
purple = (200, 0, 200)
class CharStyle:
_POOL = weakref.WeakValueDictionary()
_TERMINAL256_FORMATTER = \
pygments.formatters.terminal256.Terminal256Formatter()
def __new__(cls, fg_color=None, bg_color=None, is_bold=False,
is_italic=False, is_underlined=False):
if fg_color is None:
fg_color = Color.white
if bg_color is None:
bg_color = Color.black
key = (fg_color, bg_color, is_bold, is_italic, is_underlined)
try:
return CharStyle._POOL[key]
except KeyError:
obj = object.__new__(cls)
obj.fg_color, obj.bg_color, obj.is_bold, obj.is_italic, \
obj.is_underlined = key
return CharStyle._POOL.setdefault(key, obj)
def __getnewargs__(self):
return (self.fg_color, self.bg_color, self.is_bold, self.is_italic,
self.is_underlined)
def __getstate__(self):
state = self.__dict__.copy()
if "_cache" in state:
del state["_cache"]
return state
def __setstate__(self, state):
self.__dict__ = state
def __repr__(self):
attributes = []
if self.is_bold:
attributes.append("b")
if self.is_italic:
attributes.append("i")
if self.is_underlined:
attributes.append("u")
return ("<CharStyle: fg:%s bg:%s attr:%s>" %
(self.fg_color, self.bg_color, ",".join(attributes)))
def termcode_of_color(self, color, is_foreground):
if isinstance(color, int):
return terminal.color(color, is_foreground)
else: # true color
if os.environ["TERM"] == "xterm":
closest_color = self._TERMINAL256_FORMATTER._closest_color(
*color)
return terminal.color(closest_color, is_foreground)
else:
return terminal.rgb_color(color, is_foreground)
@_cache_first_result
def code_for_term(self):
fg_termcode = self.termcode_of_color(self.fg_color, True)
bg_termcode = self.termcode_of_color(self.bg_color, False)
bold_code = terminal.bold if self.is_bold else ""
italic_code = terminal.italic if self.is_italic else ""
underline_code = terminal.underline if self.is_underlined else ""
return "".join([terminal.normal, fg_termcode, bg_termcode, bold_code,
italic_code, underline_code])
def _join_lists(lists):
return list(itertools.chain.from_iterable(lists))
class TermStr(collections.UserString):
def __init__(self, data, style=CharStyle()):
if isinstance(data, self.__class__):
self.data = data.data
self.style = data.style
else:
self.data = data
self.style = (style if isinstance(style, tuple)
else (style,) * len(data))
def __eq__(self, other):
return (self is other or
(isinstance(other, self.__class__) and
self.data == other.data and self.style == other.style))
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self.data, self.style))
@_cache_first_result
def _partition_style(self):
if self.data == "":
return []
last_style, last_index = None, 0
result = []
for index, style in enumerate(self.style):
if style != last_style:
if last_style is not None:
result.append(
(last_style, self.data[last_index:index], last_index))
last_style, last_index = style, index
result.append(
(last_style, self.data[last_index:len(self.style)], last_index))
return result
def __str__(self):
return "".join(_join_lists(
[style.code_for_term(), str_]
for style, str_, position in self._partition_style()) +
[terminal.normal])
def __repr__(self):
return "<TermStr: %r>" % self.data
def __add__(self, other):
if isinstance(other, str):
other = TermStr(other)
return self.__class__(self.data + other.data, self.style + other.style)
def __radd__(self, other):
if isinstance(other, str):
other = TermStr(other)
return self.__class__(other.data + self.data, other.style + self.style)
def __mul__(self, n):
return self.__class__(self.data*n, self.style*n)
__rmul__ = __mul__
def __getitem__(self, index):
return self.__class__(self.data[index], self.style[index])
def join(self, parts):
parts = [TermStr(part) if isinstance(part, str) else part
for part in parts]
joined_style = _join_lists(self.style + part.style for part in parts)
return self.__class__(self.data.join(part.data for part in parts),
tuple(joined_style[len(self.style):]))
def _split_style(self, parts, sep_length):
result = []
cursor = 0
for part in parts:
style_part = self.style[cursor:cursor+len(part)]
result.append(self.__class__(part, style_part))
cursor += (len(part) + sep_length)
return result
def split(self, sep=None, maxsplit=-1):
return self._split_style(self.data.split(sep, maxsplit), len(sep))
def splitlines(self, keepends=0):
lines_with_ends = self.data.splitlines(keepends=True)
lines_without_ends = self.data.splitlines()
result_parts = lines_with_ends if keepends else lines_without_ends
result = []
cursor = 0
for line, line_with_end in zip(result_parts, lines_with_ends):
style_part = self.style[cursor:cursor+len(line)]
result.append(self.__class__(line, style_part))
cursor += len(line_with_end)
return result
def capitalize(self):
return self.__class__(self.data.capitalize(), self.style)
def lower(self):
return self.__class__(self.data.lower(), self.style)
def swapcase(self):
return self.__class__(self.data.swapcase(), self.style)
def title(self):
return self.__class__(self.data.title(), self.style)
def upper(self):
return self.__class__(self.data.upper(), self.style)
def ljust(self, width, fillchar=" "):
return self + self.__class__(fillchar * (width - len(self.data)))
def rjust(self, width, fillchar=" "):
return self.__class__(fillchar * (width - len(self.data))) + self
def center(self, width, fillchar=" "):
left_width = (width - len(self.data)) // 2
if left_width < 1:
return self
return (self.__class__(fillchar * left_width) + self +
self.__class__(fillchar *
(width - left_width - len(self.data))))
# Below are extra methods useful for termstrs.
def transform_style(self, transform_func):
new_style = tuple(_join_lists([transform_func(style)] * len(str_)
for style, str_, position
in self._partition_style()))
return self.__class__(self.data, new_style)
def bold(self):
def make_bold(style):
return CharStyle(style.fg_color, style.bg_color, is_bold=True,
is_underlined=style.is_underlined)
return self.transform_style(make_bold)
def underline(self):
def make_underlined(style):
return CharStyle(style.fg_color, style.bg_color,
is_bold=style.is_bold, is_underlined=True)
return self.transform_style(make_underlined)
def fg_color(self, fg_color):
def set_fgcolor(style):
return CharStyle(fg_color, style.bg_color, is_bold=style.is_bold,
is_underlined=style.is_underlined)
return self.transform_style(set_fgcolor)
def bg_color(self, bg_color):
def set_bgcolor(style):
return CharStyle(style.fg_color, bg_color, is_bold=style.is_bold,
is_underlined=style.is_underlined)
return self.transform_style(set_bgcolor)

991
vigil/tools.py Normal file
View file

@ -0,0 +1,991 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015-2017 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import ast
import asyncio
import contextlib
import dis
import enum
import functools
import gzip
import hashlib
import io
import math
import os
import os.path
import pickle
import pwd
import stat
import subprocess
import tempfile
import time
import traceback
import PIL.Image
import pygments
import pygments.lexers
import pygments.styles
import vigil.fill3 as fill3
import vigil.gut as gut
import vigil.lscolors as lscolors
import vigil.termstr as termstr
CACHE_PATH = ".vigil"
if "PYGMENT_STYLE" not in os.environ:
os.environ["PYGMENT_STYLE"] = "native"
class Status(enum.IntEnum):
ok = 1
problem = 2
normal = 3
error = 4
not_applicable = 5
running = 6
pending = 7
paused = 8
timed_out = 9
_STATUS_COLORS = {Status.ok: termstr.Color.green,
Status.problem: termstr.Color.red,
Status.normal: termstr.Color.white,
Status.not_applicable: termstr.Color.grey_100,
Status.running: termstr.Color.light_blue,
Status.paused: termstr.Color.yellow,
Status.timed_out: termstr.Color.purple}
STATUS_MEANINGS = [
(Status.normal, "Normal"), (Status.ok, "Ok"),
(Status.problem, "Problem"), (Status.not_applicable, "Not applicable"),
(Status.running, "Running"), (Status.paused, "Paused"),
(Status.timed_out, "Timed out"), (Status.pending, "Pending"),
(Status.error, "Error")
]
_STATUS_TO_TERMSTR = {
status: termstr.TermStr(" ", termstr.CharStyle(bg_color=color))
for status, color in _STATUS_COLORS.items()}
_STATUS_TO_TERMSTR[Status.error] = termstr.TermStr(
"E", termstr.CharStyle(bg_color=termstr.Color.red))
_STATUS_TO_TERMSTR[Status.pending] = "."
STATUS_CURSOR_COLORS = {Status.ok: termstr.Color.black,
Status.problem: termstr.Color.white,
Status.normal: termstr.Color.black,
Status.not_applicable: termstr.Color.white,
Status.running: termstr.Color.white,
Status.paused: termstr.Color.black,
Status.timed_out: termstr.Color.white}
def get_ls_color_codes():
if "LS_COLORS" not in os.environ:
script = os.path.join(os.path.dirname(__file__), "LS_COLORS.sh")
with open(script) as file_:
codes = file_.readline().strip()[len("LS_COLORS='"):-len("';")]
os.environ["LS_COLORS"] = codes
return lscolors.get_color_codes(os.environ)
_LS_COLOR_CODES = get_ls_color_codes()
TIMEOUT = 60
def _printable(text):
return "".join(char if ord(char) > 31 or char in ["\n", "\t"] else "#"
for char in text)
def _fix_input(input_):
input_str = input_.decode("utf-8") if isinstance(input_, bytes) else input_
return _printable(input_str).expandtabs(tabsize=4)
def _do_command(command, timeout=None, **kwargs):
stdout, stderr = "", ""
with contextlib.suppress(subprocess.CalledProcessError):
process = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, **kwargs)
try:
stdout, stderr = process.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
process.kill()
raise
return _fix_input(stdout), _fix_input(stderr), process.returncode
def _run_command(command, status_text=Status.ok):
status, output = status_text, ""
try:
process = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
output = stdout + stderr
except subprocess.CalledProcessError:
status = Status.problem
if process.returncode != 0:
status = Status.problem
return status, fill3.Text(_fix_input(output))
def deps(**kwargs):
def decorating_func(func):
for key, value in kwargs.items():
setattr(func, key, value)
return func
return decorating_func
def _syntax_highlight(text, lexer, style):
def _parse_rgb(hex_rgb):
if hex_rgb.startswith("#"):
hex_rgb = hex_rgb[1:]
return tuple(eval("0x"+hex_rgb[index:index+2]) for index in [0, 2, 4])
def _char_style_for_token_type(token_type, default_bg_color):
token_style = style.style_for_token(token_type)
fg_color = (termstr.Color.black if token_style["color"] is None
else _parse_rgb(token_style["color"]))
bg_color = (default_bg_color if token_style["bgcolor"] is None
else _parse_rgb(token_style["bgcolor"]))
return termstr.CharStyle(fg_color, bg_color, token_style["bold"],
token_style["italic"],
token_style["underline"])
default_bg_color = _parse_rgb(style.background_color)
text = fill3.join(
"", [termstr.TermStr(text, _char_style_for_token_type(
token_type, default_bg_color))
for token_type, text in pygments.lex(text, lexer)])
return fill3.Text(text, pad_char=termstr.TermStr(" ").bg_color(
default_bg_color))
def _syntax_highlight_using_path(text, path):
lexer = pygments.lexers.get_lexer_for_filename(path, text)
style = pygments.styles.get_style_by_name(os.environ["PYGMENT_STYLE"])
return _syntax_highlight(text, lexer, style)
def pygments_(path):
with open(path) as file_:
try:
text = file_.read()
except UnicodeDecodeError:
return Status.not_applicable, fill3.Text("Not unicode")
else:
try:
source_widget = _syntax_highlight_using_path(_fix_input(text),
path)
except pygments.util.ClassNotFound:
return Status.normal, fill3.Text(text)
return Status.normal, source_widget
def linguist(path):
# Dep: ruby?, ruby-dev, libicu-dev, cmake, "gem install github-linguist"
return _run_command(["linguist", path], Status.normal)
def _permissions_in_octal(permissions):
result = []
for part_index in range(3):
index = part_index * 3 + 1
part = permissions[index:index+3]
digit = sum(2 ** (2 - index) for index, element in enumerate(part)
if element != "-")
result.append(str(digit))
return "".join(result)
def _pretty_bytes(bytes):
if bytes == 0:
return "0 B"
units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
unit_index = int(math.floor(math.log(bytes, 1024)))
power = math.pow(1024, unit_index)
conversion = round(bytes/power, 2)
return "%s %s" % (conversion, units[unit_index])
def _md5(path):
with open(path, "rb") as file:
return hashlib.md5(file.read()).hexdigest()
@deps(deps={"file", "coreutils"}, executables={"file", "sha1sum"})
def metadata(path):
def detail(value, unit):
result = (" (%s)" % value if unit is None else " (%s %s)" %
(value, unit))
return termstr.TermStr(result).fg_color(termstr.Color.grey_100)
is_symlink = "yes" if os.path.islink(path) else "no"
stat_result = os.stat(path)
permissions = stat.filemode(stat_result.st_mode)
hardlinks = str(stat_result.st_nlink)
group = [pwd.getpwuid(stat_result.st_gid).pw_name,
detail(stat_result.st_gid, "gid")]
owner = [pwd.getpwuid(stat_result.st_uid).pw_name,
detail(stat_result.st_uid, "uid")]
modified, created, access = [
[time.asctime(time.gmtime(seconds)), detail(int(seconds), "secs")]
for seconds in (stat_result.st_mtime, stat_result.st_ctime,
stat_result.st_atime)]
size = [_pretty_bytes(stat_result.st_size),
detail(stat_result.st_size, "bytes")]
stdout, *rest = _do_command(
["file", "--dereference", "--brief", "--uncompress", "--mime", path])
mime_type = stdout
stdout, *rest = _do_command(
["file", "--dereference", "--brief", "--uncompress", path])
file_type = stdout
md5sum = _md5(path)
stdout, *rest = _do_command(["sha1sum", path])
sha1sum = stdout.split()[0]
permissions_value = [permissions,
detail(_permissions_in_octal(permissions), None)]
text = []
for line in [
("size", size), ("permissions", permissions_value), None,
("modified time", modified), ("creation time", created),
("access time", access), None,
("owner", owner), ("group", group), None,
("hardlinks", hardlinks), ("symlink", is_symlink), None,
("md5", md5sum), ("sha1", sha1sum), None,
("mime type", mime_type.strip()),
("file type", file_type.strip())]:
if line is None:
text.append("\n")
else:
name, value = line
name = termstr.TermStr(name + ":").fg_color(
termstr.Color.light_blue).ljust(16)
text.append(name + fill3.join("", value) + "\n")
return (Status.normal, fill3.Text(fill3.join("", text)))
@deps(deps={"python3-pygments"}, arch_deps={"python-pygments"},
opensuse_deps={"python3-Pygments"}, gentoo_deps={"pygments"})
def contents(path):
root, ext = splitext(path)
if ext == "":
with open(path) as file_:
return Status.normal, fill3.Text(_fix_input(file_.read()))
else:
return pygments_(path)
def _is_python_syntax_correct(path, python_version):
if python_version == "python":
stdin, stdout, returncode = _do_command(
["python", "-c",
"__import__('compiler').parse(open('%s').read())" % path])
return returncode == 0
else: # python3
with open(path) as f:
source = f.read()
try:
ast.parse(source)
except:
return False
return True
def _python_version(path): # Need a better hueristic
for version in ["python3", "python"]:
if _is_python_syntax_correct(path, version):
return version
return "python3"
@deps(deps={"python"}, gentoo_deps={"python"},
url="https://en.wikipedia.org/wiki/Python_syntax_and_semantics")
def python_syntax(path):
status = (Status.ok if _is_python_syntax_correct(path, "python") or
_is_python_syntax_correct(path, "python3") else Status.problem)
return status, fill3.Text("")
def _has_shebang_line(path):
with open(path, "rb") as file_:
return file_.read(2) == "#!"
def _is_python_test_file(path):
path = str(os.path.basename(path))
return path.endswith("_test.py") or path.startswith("test_")
@deps(deps={"python", "python3"}, gentoo_deps={"python"},
url="https://docs.python.org/3/library/unittest.html")
def python_unittests(path):
if _is_python_test_file(path):
command = ([path] if _has_shebang_line(path)
else [_python_version(path), path])
stdout, stderr, returncode = _do_command(command, timeout=TIMEOUT)
status = Status.ok if returncode == 0 else Status.problem
return status, fill3.Text(stdout + "\n" + stderr)
else:
return Status.not_applicable, fill3.Text("No tests.")
@deps(deps={"python", "python3"},
url="https://docs.python.org/3/library/pydoc.html",
missing_in={"gentoo"})
def pydoc(path):
stdout, stderr, returncode = _do_command(
[_python_version(path), "-m", "pydoc", path], timeout=TIMEOUT)
status = Status.normal if returncode == 0 else Status.not_applicable
if not stdout.startswith("Help on module"):
status = Status.not_applicable
return status, fill3.Text(_fix_input(stdout))
@deps(deps={"mypy"}, url="mypy", fedora_deps={"python3-mypy"},
debian_deps={"pip3/mypy"}, arch_deps={"pip3/mypy"},
opensuse_deps={"pip3/mypy"}, executables={"mypy"}, missing_in={"gentoo"})
def mypy(path):
stdout, stderr, returncode = _do_command(["mypy", path], timeout=TIMEOUT)
status = Status.ok if returncode == 0 else Status.normal
return status, fill3.Text(stdout)
def _colorize_coverage_report(text):
line_color = {"> ": termstr.Color.green, "! ": termstr.Color.red,
" ": None}
return fill3.join("", [termstr.TermStr(line).fg_color(line_color[line[:2]])
for line in text.splitlines(keepends=True)])
@deps(deps={"python-coverage", "python3-coverage"},
arch_deps={"python2-coverage", "python-coverage"},
opensuse_deps={"python2-coverage", "python3-coverage"},
gentoo_deps={"coverage"}, url="python3-coverage")
def python_coverage(path):
# FIX: Also use test_*.py files.
test_path = path[:-(len(".py"))] + "_test.py"
if os.path.exists(test_path):
with tempfile.TemporaryDirectory() as temp_dir:
coverage_cmd = [_python_version(path), "-m", "coverage"]
coverage_path = os.path.join(temp_dir, "coverage")
env = os.environ.copy()
env["COVERAGE_FILE"] = coverage_path
stdout, *rest = _do_command(
coverage_cmd + ["run", test_path], env=env, timeout=TIMEOUT)
path = os.path.normpath(path)
stdout, *rest = _do_command(
coverage_cmd + ["annotate", "--directory", temp_dir, path],
env=env)
flat_path = path.replace("/", "_")
with open(os.path.join(temp_dir, flat_path + ",cover"), "r") as f:
stdout = f.read()
return Status.normal, fill3.Text(_colorize_coverage_report(stdout))
else:
return Status.not_applicable, fill3.Text(
"No corresponding test file: " + os.path.normpath(test_path))
@deps(deps={"python-pycodestyle", "python3-pycodestyle"},
fedora_deps={"python2-pycodestyle", "python3-pycodestyle"},
debian_deps={"pip/pycodestyle", "pip3/pycodestyle"},
arch_deps={"python-pycodestyle", "python2-pycodestyle"},
opensuse_deps={"python2-pycodestyle", "python3-pycodestyle"},
gentoo_deps={"pycodestyle"}, url="python-pycodestyle")
def pycodestyle(path):
return _run_command([_python_version(path), "-m", "pycodestyle", path])
@deps(deps={"python-pyflakes", "python3-pyflakes"},
arch_deps={"python2-pyflakes", "python-pyflakes"},
opensuse_deps={"python2-pyflakes", "python3-pyflakes"}, url="pyflakes",
missing_in={"gentoo"})
def pyflakes(path):
return _run_command([_python_version(path), "-m", "pyflakes", path])
@deps(deps={"pylint", "pylint3"}, fedora_deps={"pylint", "python3-pylint"},
arch_deps={"python2-pylint", "python-pylint"},
opensuse_deps={"python2-pylint", "python3-pylint"},
debian_deps={"pip/pylint", "pip3/pylint"}, gentoo_deps={"pylint"},
url="pylint3")
def pylint(path):
return _run_command([_python_version(path), "-m", "pylint",
"--errors-only", path])
@deps(url="https://github.com/ahamilton/vigil/blob/master/gut.py")
def python_gut(path):
with open(path) as module_file:
output = gut.gut_module(module_file.read())
source_widget = _syntax_highlight_using_path(_fix_input(output), path)
return Status.normal, source_widget
@deps(deps={"python", "python3"}, gentoo_deps={"python"},
url="https://docs.python.org/3/library/modulefinder.html")
def python_modulefinder(path):
return _run_command([_python_version(path), "-m", "modulefinder", path],
Status.normal)
def _get_mccabe_line_score(line, python_version):
position, function_name, score = line.split()
return int(score if python_version == "python3" else score[:-1])
def _colorize_mccabe(text, python_version):
return fill3.join("", [
termstr.TermStr(line).fg_color(termstr.Color.yellow)
if _get_mccabe_line_score(line, python_version) > 10 else line
for line in text.splitlines(keepends=True)])
@deps(deps={"python-mccabe", "python3-mccabe"},
arch_deps={"python2-mccabe", "python-mccabe"},
opensuse_deps={"python2-mccabe", "python3-mccabe"},
gentoo_deps={"mccabe"}, url="python3-mccabe")
def python_mccabe(path):
python_version = _python_version(path)
stdout, *rest = _do_command([python_version, "-m", "mccabe", path])
max_score = 0
with contextlib.suppress(ValueError): # When there are no lines
max_score = max(_get_mccabe_line_score(line, python_version)
for line in stdout.splitlines())
status = Status.problem if max_score > 10 else Status.ok
return status, fill3.Text(_colorize_mccabe(stdout, python_version))
def python_tidy(path): # Deps: found on internet?
stdout, *rest = _do_command(["python", "python-tidy.py", path])
return Status.normal, _syntax_highlight_using_path(stdout, path)
@deps(url="https://docs.python.org/3/library/dis.html")
def disassemble_pyc(path):
with open(path, "rb") as file_:
bytecode = file_.read()
stringio = io.StringIO()
dis.dis(bytecode, file=stringio)
stringio.seek(0)
return Status.normal, fill3.Text(stringio.read())
@deps(deps={"python-bandit", "python3-bandit"}, fedora_deps={"bandit"},
debian_deps={"pip/bandit", "pip3/bandit"}, arch_deps={"bandit"},
opensuse_deps={"pip/bandit", "pip3/bandit"}, gentoo_deps={"bandit"},
url="python3-bandit")
def bandit(path):
python_version = _python_version(path)
stdout, stderr, returncode = _do_command(
[python_version, "-m", "bandit.cli.main", "-f", "txt", path],
timeout=TIMEOUT)
status = Status.ok if returncode == 0 else Status.normal
text = stdout if python_version == "python" else _fix_input(eval(stdout))
text_without_timestamp = "".join(text.splitlines(keepends=True)[2:])
return status, fill3.Text(text_without_timestamp)
def _perl_version(path):
# stdout, stderr, returncode = _do_command(["perl", "-c", path])
# return "perl6" if "Perl v6.0.0 required" in stderr else "perl"
return "perl"
@deps(deps={"perl"}, url="https://en.wikipedia.org/wiki/Perl")
def perl_syntax(path):
return _run_command([_perl_version(path), "-c", path])
@deps(deps={"perl-doc"}, fedora_deps={"perl-Pod-Perldoc"},
arch_deps={"perl-pod-perldoc"}, gentoo_deps={"perl-IO"},
url="http://perldoc.perl.org/", executables={"perldoc"})
def perldoc(path):
stdout, stderr, returncode = _do_command(["perldoc", "-t", path])
return ((Status.normal, fill3.Text(stdout)) if returncode == 0
else (Status.not_applicable, fill3.Text(stderr)))
@deps(deps={"perltidy"}, arch_deps={"perl-test-perltidy"},
opensuse_deps={"perl-Test-PerlTidy"}, gentoo_deps={"Perl-Tidy"},
url="http://perltidy.sourceforge.net/", executables={"perltidy"})
def perltidy(path):
stdout, *rest = _do_command(["perltidy", "-st", path])
return Status.normal, _syntax_highlight_using_path(stdout, path)
# def perl6_syntax(path):
# return _run_command(["perl6", "-c", path])
# perl6_syntax.deps={"rakudo"}
@deps(deps={"gcc"}, url="https://gcc.gnu.org/", executables={"gcc"})
def c_syntax_gcc(path):
return _run_command(["gcc", "-fsyntax-only", path])
@deps(deps={"clang"}, url="http://clang.llvm.org/", executables={"clang"},
missing_in={"gentoo"})
def c_syntax_clang(path):
return _run_command(["clang", "-fsyntax-only", path])
@deps(deps={"splint"}, url="splint", executables={"splint"})
def splint(path):
stdout, stderr, returncode = _do_command(["splint", "-preproc", path])
status = Status.ok if returncode == 0 else Status.problem
return status, fill3.Text(stdout + stderr)
_OBJDUMP_URL = "https://en.wikipedia.org/wiki/Objdump"
@deps(deps={"binutils"}, url=_OBJDUMP_URL, executables={"objdump"})
def objdump_headers(path):
return _run_command(["objdump", "--all-headers", path], Status.normal)
@deps(deps={"binutils"}, url=_OBJDUMP_URL, executables={"objdump"})
def objdump_disassemble(path):
return _run_command(
["objdump", "--disassemble", "--reloc", "--dynamic-reloc", path],
Status.normal)
@deps(deps={"binutils"}, url=_OBJDUMP_URL, executables={"readelf"})
def readelf(path):
return _run_command(["readelf", "--all", path], Status.normal)
@deps(deps={"unzip"}, url="unzip", executables={"unzip"})
def unzip(path):
return _run_command(["unzip", "-l", path], Status.normal)
_TAR_URL = "http://www.gnu.org/software/tar/manual/tar.html"
@deps(deps={"tar"}, url=_TAR_URL, executables={"tar"})
def tar_gz(path):
return _run_command(["tar", "ztvf", path], Status.normal)
@deps(deps={"tar"}, url=_TAR_URL, executables={"tar"})
def tar_bz2(path):
return _run_command(["tar", "jtvf", path], Status.normal)
@deps(deps={"binutils"}, url="https://linux.die.net/man/1/nm",
executables={"nm"})
def nm(path):
return _run_command(["nm", "--demangle", path], Status.normal)
@deps(deps={"python-pdfminer"}, arch_deps=set(), url="python-pdfminer",
executables={"pdf2txt"}, missing_in={"arch", "fedora", "opensuse",
"gentoo"})
def pdf2txt(path):
return _run_command(["pdf2txt", path], Status.normal)
@deps(deps={"tidy"}, url="tidy", executables={"tidy"}, missing_in={"gentoo"})
def html_syntax(path):
# Maybe only show errors
stdout, stderr, returncode = _do_command(["tidy", path])
status = Status.ok if returncode == 0 else Status.problem
return status, fill3.Text(stderr)
@deps(deps={"tidy"}, url="tidy", executables={"tidy"}, missing_in={"gentoo"})
def tidy(path):
stdout, *rest = _do_command(["tidy", path])
return Status.normal, fill3.Text(stdout)
@deps(deps={"html2text"}, arch_deps={"python-html2text"},
url="html2text", executables={"html2text"}, missing_in={"gentoo"})
def html2text(path):
return _run_command(["html2text", path], Status.normal)
@deps(deps={"gcc"}, url="https://gcc.gnu.org/", executables={"gcc"})
def cpp_syntax_gcc(path):
return _run_command(["gcc", "-fsyntax-only", path])
@deps(deps={"clang"}, url="http://clang.llvm.org/", executables={"clang"},
missing_in={"gentoo"})
def cpp_syntax_clang(path):
return _run_command(["clang", "-fsyntax-only", path])
@deps(deps={"bcpp"}, fedora_deps=set(), arch_deps=set(), executables={"bcpp"},
missing_in={"arch", "fedora", "opensuse"})
def bcpp(path):
stdout, stderr, returncode = _do_command(["bcpp", "-fi", path])
status = Status.normal if returncode == 0 else Status.problem
return status, _syntax_highlight_using_path(stdout, path)
@deps(deps={"uncrustify"}, debian_deps=set(), url="uncrustify",
executables={"uncrustify"}, missing_in={"debian"})
def uncrustify(path):
with tempfile.TemporaryDirectory() as temp_dir:
config_path = os.path.join(temp_dir, "uncrustify.cfg")
stdout, stderr, returncode = _do_command(
["uncrustify", "--detect", "-f", path, "-o", config_path])
if returncode == 0:
stdout, stderr, returncode = _do_command(
["uncrustify", "-c", config_path, "-f", path])
status = Status.normal if returncode == 0 else Status.problem
return status, _syntax_highlight_using_path(stdout, path)
@deps(deps={"php"}, opensuse_deps={"php5"},
url="https://en.wikipedia.org/wiki/PHP", executables={"php"},
missing_in={"debian"})
def php5_syntax(path):
return _run_command(["php", "--syntax-check", path])
def _pil_pixels(pil_image):
data = list(pil_image.getdata())
width = pil_image.width
return [data[row_index*width:(row_index+1)*width]
for row_index in range(pil_image.height)]
MAX_IMAGE_SIZE = 80
def _resize_image(image, new_width):
scale = new_width / image.width
return image.resize((int(image.width * scale), int(image.height * scale)),
PIL.Image.ANTIALIAS)
@deps(deps={"python3-pil"}, fedora_deps={"python3-pillow"},
arch_deps={"python-pillow"}, opensuse_deps={"python3-Pillow"},
gentoo_deps={"pillow"}, url="python3-pil")
def pil(path):
with open(path, "rb") as image_file:
with PIL.Image.open(image_file).convert("RGB") as image:
if image.width > (MAX_IMAGE_SIZE // 2):
image = _resize_image(image, MAX_IMAGE_SIZE // 2)
text = " " * 2 * image.width
result = []
for row in _pil_pixels(image):
row_style = []
for pixel in row:
style = termstr.CharStyle(bg_color=pixel)
row_style.extend([style, style])
result.append(termstr.TermStr(text, tuple(row_style)))
return Status.normal, fill3.Fixed(result)
@deps(deps={"python3-pil"}, fedora_deps={"python3-pillow"},
arch_deps={"python-pillow"}, opensuse_deps={"python3-Pillow"},
gentoo_deps={"pillow"}, url="python3-pil")
def pil_half(path):
with open(path, "rb") as image_file:
with PIL.Image.open(image_file).convert("RGB") as image:
if image.width > MAX_IMAGE_SIZE:
image = _resize_image(image, MAX_IMAGE_SIZE)
text = "" * image.width
rows = _pil_pixels(image)
if image.height % 2 == 1:
rows.append([None] * image.width)
result = fill3.Fixed([
termstr.TermStr(text, tuple(termstr.CharStyle(
fg_color=top_pixel, bg_color=bottom_pixel)
for top_pixel, bottom_pixel in zip(rows[index],
rows[index+1])))
for index in range(0, image.height, 2)])
return Status.normal, result
#############################
LOG_PATH = os.path.join(os.getcwd(), "vigil.log")
def log_error(message=None):
message = traceback.format_exc() if message is None else message + "\n"
with open(LOG_PATH, "a") as log_file:
log_file.write(message)
def lru_cache_with_eviction(maxsize=128, typed=False):
versions = {}
make_key = functools._make_key
def evict(*args, **kwds):
key = make_key(args, kwds, typed)
if key in versions:
versions[key] += 1
def decorating_function(user_function):
def remove_version(*args, **kwds):
return user_function(*args[1:], **kwds)
new_func = functools.lru_cache(maxsize=maxsize, typed=typed)(
remove_version)
def add_version(*args, **kwds):
key = make_key(args, kwds, typed)
return new_func(*((versions.setdefault(key, 0),) + args), **kwds)
add_version.versions = versions
add_version.cache_info = new_func.cache_info
add_version.evict = evict
return functools.update_wrapper(add_version, user_function)
return decorating_function
def dump_pickle_safe(object_, path, protocol=pickle.HIGHEST_PROTOCOL,
open=open):
tmp_path = path + ".tmp"
try:
with open(tmp_path, "wb") as file_:
pickle.dump(object_, file_, protocol=protocol)
except (OSError, KeyboardInterrupt):
os.remove(tmp_path)
else:
os.rename(tmp_path, path)
def status_to_str(status):
return (_STATUS_TO_TERMSTR[status] if isinstance(status, enum.Enum)
else status)
class Result:
def __init__(self, path, tool):
self.path = path
self.tool = tool
self.pickle_path = os.path.join(CACHE_PATH, path + "-" + tool.__name__)
self.scroll_position = (0, 0)
self.is_completed = False
self.is_placeholder = True
self.status = Status.pending
@property
@lru_cache_with_eviction(maxsize=50)
def result(self):
unknown_label = fill3.Text("?")
if self.is_placeholder:
return unknown_label
try:
with gzip.open(self.pickle_path, "rb") as pickle_file:
return pickle.load(pickle_file)
except FileNotFoundError:
return unknown_label
@result.setter
def result(self, value):
os.makedirs(os.path.dirname(self.pickle_path), exist_ok=True)
dump_pickle_safe(value, self.pickle_path, open=gzip.open)
Result.result.fget.evict(self)
def set_status(self, status):
self.status = status
self.entry.appearance_cache = None
@asyncio.coroutine
def run(self, log, appearance_changed_event, runner):
self.is_placeholder = False
tool_name = tool_name_colored(self.tool, self.path)
path = path_colored(self.path)
log.log_message(["Running ", tool_name, " on ", path, "..."])
self.set_status(Status.running)
if runner.is_already_paused:
runner.is_already_paused = False
runner.pause()
appearance_changed_event.set()
start_time = time.time()
new_status = yield from runner.run_tool(self.path, self.tool)
Result.result.fget.evict(self)
end_time = time.time()
self.set_status(new_status)
appearance_changed_event.set()
self.is_completed = True
log.log_message(
["Finished running ", tool_name, " on ", path, ". ",
status_to_str(new_status), " %s secs" %
round(end_time - start_time, 2)])
def reset(self):
self.is_placeholder = True
self.set_status(Status.pending)
def appearance_min(self):
return [status_to_str(self.status)]
def _generic_tools():
return [contents, metadata]
IMAGE_EXTENSIONS = ["png", "jpg", "gif", "bmp", "ppm", "tiff", "tga"]
TOOLS_FOR_EXTENSIONS = \
[
(["py"], [python_syntax, python_unittests, pydoc, mypy,
python_coverage, pycodestyle, pyflakes, pylint, python_gut,
python_modulefinder, python_mccabe, bandit]),
(["pyc"], [disassemble_pyc]),
(["pl", "pm", "t"], [perl_syntax, perldoc, perltidy]),
# (["p6", "pm6"], [perl6_syntax, perldoc]),
(["pod", "pod6"], [perldoc]),
(["java"], [uncrustify]),
(["c", "h"], [c_syntax_gcc, c_syntax_clang, splint, uncrustify]),
(["o"], [objdump_headers, objdump_disassemble, readelf]),
(["cc", "cpp", "hpp"], [cpp_syntax_gcc, cpp_syntax_clang, bcpp,
uncrustify]),
(["pdf"], [pdf2txt]),
(["html"], [html_syntax, tidy, html2text]),
(["php"], [php5_syntax]),
(["zip"], [unzip]),
(["tar.gz", "tgz"], [tar_gz]),
(["tar.bz2"], [tar_bz2]),
(["a", "so"], [nm]),
(IMAGE_EXTENSIONS, [pil, pil_half])
]
def is_tool_in_distribution(tool, distribution):
try:
return distribution not in tool.missing_in
except AttributeError:
return tool
def get_distro_id():
with open("/etc/os-release") as os_release_file:
for line in os_release_file:
if line.startswith("ID="):
return line[len("ID="):].strip()
raise AssertionError
@functools.lru_cache(maxsize=1)
def _tools_for_extension():
distribution = get_distro_id()
result = {}
for extensions, tools in TOOLS_FOR_EXTENSIONS:
for extension in extensions:
result[extension] = [tool for tool in tools if
is_tool_in_distribution(tool, distribution)]
return result
def tools_all():
tools_ = set(_generic_tools())
for tool_list in _tools_for_extension().values():
tools_.update(set(tool_list))
return tools_
def tool_dependencies(tool, distribution="ubuntu"):
try:
return getattr(tool, distribution + "_deps")
except AttributeError:
try:
return tool.deps
except AttributeError:
return set()
def dependencies(distribution="ubuntu"):
dependencies_all = set()
for tool in tools_all():
dependencies_all.update(tool_dependencies(tool, distribution))
return dependencies_all
def splitext(path):
root, ext = os.path.splitext(path)
if "." in root:
for compound_ext in [".tar.gz", ".tar.bz2"]:
if path.endswith(compound_ext):
return path[:-len(compound_ext)], path[-len(compound_ext):]
return root, ext
def tools_for_path(path):
root, ext = splitext(path)
extra_tools = [] if ext == "" else _tools_for_extension().get(ext[1:], [])
return _generic_tools() + extra_tools
def run_tool_no_error(path, tool):
try:
status, result = tool(path)
except subprocess.TimeoutExpired:
status, result = Status.timed_out, fill3.Text("Timed out")
except:
status, result = Status.error, _syntax_highlight(
traceback.format_exc(), pygments.lexers.PythonTracebackLexer(),
pygments.styles.get_style_by_name(os.environ["PYGMENT_STYLE"]))
return status, result
def _convert_lscolor_code_to_charstyle(lscolor_code):
parts = lscolor_code.split(";")
if len(parts) == 1:
is_bold = parts[0] == "1"
fg_color = None
elif len(parts) == 2:
is_bold = False
fg_color = int(parts[1])
else:
is_bold = len(parts) == 4 and parts[3] == "1"
fg_color = int(parts[2])
return termstr.CharStyle(fg_color=fg_color, is_bold=is_bold)
def _charstyle_of_path(path):
color_code = lscolors.color_code_for_path(path, _LS_COLOR_CODES)
return (termstr.CharStyle() if color_code is None else
_convert_lscolor_code_to_charstyle(color_code))
@functools.lru_cache(maxsize=100)
def path_colored(path):
char_style = _charstyle_of_path(path)
path = path[2:]
dirname, basename = os.path.split(path)
if dirname == "":
return termstr.TermStr(basename, char_style)
else:
dirname = dirname + os.path.sep
return (termstr.TermStr(dirname, _charstyle_of_path(dirname)) +
termstr.TermStr(basename, char_style))
@functools.lru_cache(maxsize=100)
def tool_name_colored(tool, path):
char_style = (termstr.CharStyle(is_bold=True) if tool in _generic_tools()
else _charstyle_of_path(path))
return termstr.TermStr(tool.__name__, char_style)
@functools.lru_cache()
def get_homepage_of_package(package):
line = subprocess.getoutput("dpkg-query --status %s | grep Homepage"
% package)
return line.split()[1]
def url_of_tool(tool):
try:
url = tool.url
return url if url.startswith("http") else get_homepage_of_package(url)
except AttributeError:
return None

20
vigil/urwid/__init__.py Normal file
View file

@ -0,0 +1,20 @@
#
# Urwid __init__.py - all the stuff you're likely to care about
#
# Copyright (C) 2004-2012 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: http://excess.org/urwid/

393
vigil/urwid/escape.py Normal file
View file

@ -0,0 +1,393 @@
# -*- coding: utf-8 -*-
#
# Urwid escape sequences common to curses_display and raw_display
# Copyright (C) 2004-2011 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: http://excess.org/urwid/
"""
Terminal Escape Sequences for input and display
"""
import re
SO = "\x0e"
SI = "\x0f"
IBMPC_ON = "\x1b[11m"
IBMPC_OFF = "\x1b[10m"
DEC_TAG = "0"
DEC_SPECIAL_CHARS = '▮◆▒␉␌␍␊°±␤␋┘┐┌└┼⎺⎻─⎼⎽├┤┴┬│≤≥π≠£·'
ALT_DEC_SPECIAL_CHARS = "_`abcdefghijklmnopqrstuvwxyz{|}~"
DEC_SPECIAL_CHARMAP = {}
assert len(DEC_SPECIAL_CHARS) == len(ALT_DEC_SPECIAL_CHARS), repr((DEC_SPECIAL_CHARS, ALT_DEC_SPECIAL_CHARS))
for c, alt in zip(DEC_SPECIAL_CHARS, ALT_DEC_SPECIAL_CHARS):
DEC_SPECIAL_CHARMAP[ord(c)] = SO + alt + SI
SAFE_ASCII_DEC_SPECIAL_RE = re.compile("^[ -~%s]*$" % DEC_SPECIAL_CHARS)
DEC_SPECIAL_RE = re.compile("[%s]" % DEC_SPECIAL_CHARS)
###################
## Input sequences
###################
class MoreInputRequired(Exception):
pass
def escape_modifier( digit ):
mode = ord(digit) - ord("1")
return "shift "*(mode&1) + "meta "*((mode&2)//2) + "ctrl "*((mode&4)//4)
input_sequences = [
('[A','up'),('[B','down'),('[C','right'),('[D','left'),
('[E','5'),('[F','end'),('[G','5'),('[H','home'),
('[1~','home'),('[2~','insert'),('[3~','delete'),('[4~','end'),
('[5~','page up'),('[6~','page down'),
('[7~','home'),('[8~','end'),
('[[A','f1'),('[[B','f2'),('[[C','f3'),('[[D','f4'),('[[E','f5'),
('[11~','f1'),('[12~','f2'),('[13~','f3'),('[14~','f4'),
('[15~','f5'),('[17~','f6'),('[18~','f7'),('[19~','f8'),
('[20~','f9'),('[21~','f10'),('[23~','f11'),('[24~','f12'),
('[25~','f13'),('[26~','f14'),('[28~','f15'),('[29~','f16'),
('[31~','f17'),('[32~','f18'),('[33~','f19'),('[34~','f20'),
('OA','up'),('OB','down'),('OC','right'),('OD','left'),
('OH','home'),('OF','end'),
('OP','f1'),('OQ','f2'),('OR','f3'),('OS','f4'),
('Oo','/'),('Oj','*'),('Om','-'),('Ok','+'),
('[Z','shift tab'),
('On', '.'),
('[200~', 'begin paste'), ('[201~', 'end paste'),
] + [
(prefix + letter, modifier + key)
for prefix, modifier in zip('O[', ('meta ', 'shift '))
for letter, key in zip('abcd', ('up', 'down', 'right', 'left'))
] + [
("[" + digit + symbol, modifier + key)
for modifier, symbol in zip(('shift ', 'meta '), '$^')
for digit, key in zip('235678',
('insert', 'delete', 'page up', 'page down', 'home', 'end'))
] + [
('O' + chr(ord('p')+n), str(n)) for n in range(10)
] + [
# modified cursor keys + home, end, 5 -- [#X and [1;#X forms
(prefix+digit+letter, escape_modifier(digit) + key)
for prefix in ("[", "[1;")
for digit in "12345678"
for letter,key in zip("ABCDEFGH",
('up','down','right','left','5','end','5','home'))
] + [
# modified F1-F4 keys -- O#X form
("O"+digit+letter, escape_modifier(digit) + key)
for digit in "12345678"
for letter,key in zip("PQRS",('f1','f2','f3','f4'))
] + [
# modified F1-F13 keys -- [XX;#~ form
("["+str(num)+";"+digit+"~", escape_modifier(digit) + key)
for digit in "12345678"
for num,key in zip(
(3,5,6,11,12,13,14,15,17,18,19,20,21,23,24,25,26,28,29,31,32,33,34),
('delete', 'page up', 'page down',
'f1','f2','f3','f4','f5','f6','f7','f8','f9','f10','f11',
'f12','f13','f14','f15','f16','f17','f18','f19','f20'))
] + [
# mouse reporting (special handling done in KeyqueueTrie)
('[M', 'mouse'),
# report status response
('[0n', 'status ok')
]
class KeyqueueTrie(object):
def __init__( self, sequences ):
self.data = {}
for s, result in sequences:
assert type(result) != dict
self.add(self.data, s, result)
def add(self, root, s, result):
assert type(root) == dict, "trie conflict detected"
assert len(s) > 0, "trie conflict detected"
if ord(s[0]) in root:
return self.add(root[ord(s[0])], s[1:], result)
if len(s)>1:
d = {}
root[ord(s[0])] = d
return self.add(d, s[1:], result)
root[ord(s)] = result
def get(self, keys, more_available):
result = self.get_recurse(self.data, keys, more_available)
if not result:
result = self.read_cursor_position(keys, more_available)
return result
def get_recurse(self, root, keys, more_available):
if type(root) != dict:
if root == "mouse":
return self.read_mouse_info(keys,
more_available)
return (root, keys)
if not keys:
# get more keys
if more_available:
raise MoreInputRequired()
return None
if keys[0] not in root:
return None
return self.get_recurse(root[keys[0]], keys[1:], more_available)
def read_mouse_info(self, keys, more_available):
if len(keys) < 3:
if more_available:
raise MoreInputRequired()
return None
b = keys[0] - 32
x, y = (keys[1] - 33)%256, (keys[2] - 33)%256 # supports 0-255
prefix = ""
if b & 4: prefix = prefix + "shift "
if b & 8: prefix = prefix + "meta "
if b & 16: prefix = prefix + "ctrl "
if (b & MOUSE_MULTIPLE_CLICK_MASK)>>9 == 1: prefix = prefix + "double "
if (b & MOUSE_MULTIPLE_CLICK_MASK)>>9 == 2: prefix = prefix + "triple "
# 0->1, 1->2, 2->3, 64->4, 65->5
button = ((b&64)/64*3) + (b & 3) + 1
if b & 3 == 3:
action = "release"
button = 0
elif b & MOUSE_RELEASE_FLAG:
action = "release"
elif b & MOUSE_DRAG_FLAG:
action = "drag"
elif b & MOUSE_MULTIPLE_CLICK_MASK:
action = "click"
else:
action = "press"
return ( (prefix + "mouse " + action, button, x, y), keys[3:] )
def read_cursor_position(self, keys, more_available):
"""
Interpret cursor position information being sent by the
user's terminal. Returned as ('cursor position', x, y)
where (x, y) == (0, 0) is the top left of the screen.
"""
if not keys:
if more_available:
raise MoreInputRequired()
return None
if keys[0] != ord('['):
return None
# read y value
y = 0
i = 1
for k in keys[i:]:
i += 1
if k == ord(';'):
if not y:
return None
break
if k < ord('0') or k > ord('9'):
return None
if not y and k == ord('0'):
return None
y = y * 10 + k - ord('0')
if not keys[i:]:
if more_available:
raise MoreInputRequired()
return None
# read x value
x = 0
for k in keys[i:]:
i += 1
if k == ord('R'):
if not x:
return None
return (("cursor position", x-1, y-1), keys[i:])
if k < ord('0') or k > ord('9'):
return None
if not x and k == ord('0'):
return None
x = x * 10 + k - ord('0')
if not keys[i:]:
if more_available:
raise MoreInputRequired()
return None
# This is added to button value to signal mouse release by curses_display
# and raw_display when we know which button was released. NON-STANDARD
MOUSE_RELEASE_FLAG = 2048
# This 2-bit mask is used to check if the mouse release from curses or gpm
# is a double or triple release. 00 means single click, 01 double,
# 10 triple. NON-STANDARD
MOUSE_MULTIPLE_CLICK_MASK = 1536
# This is added to button value at mouse release to differentiate between
# single, double and triple press. Double release adds this times one,
# triple release adds this times two. NON-STANDARD
MOUSE_MULTIPLE_CLICK_FLAG = 512
# xterm adds this to the button value to signal a mouse drag event
MOUSE_DRAG_FLAG = 32
#################################################
# Build the input trie from input_sequences list
input_trie = KeyqueueTrie(input_sequences)
#################################################
_keyconv = {
-1:None,
8:'backspace',
9:'tab',
10:'enter',
13:'enter',
127:'backspace',
# curses-only keycodes follow.. (XXX: are these used anymore?)
258:'down',
259:'up',
260:'left',
261:'right',
262:'home',
263:'backspace',
265:'f1', 266:'f2', 267:'f3', 268:'f4',
269:'f5', 270:'f6', 271:'f7', 272:'f8',
273:'f9', 274:'f10', 275:'f11', 276:'f12',
277:'shift f1', 278:'shift f2', 279:'shift f3', 280:'shift f4',
281:'shift f5', 282:'shift f6', 283:'shift f7', 284:'shift f8',
285:'shift f9', 286:'shift f10', 287:'shift f11', 288:'shift f12',
330:'delete',
331:'insert',
338:'page down',
339:'page up',
343:'enter', # on numpad
350:'5', # on numpad
360:'end',
}
def process_keyqueue(codes, more_available):
"""
codes -- list of key codes
more_available -- if True then raise MoreInputRequired when in the
middle of a character sequence (escape/utf8/wide) and caller
will attempt to send more key codes on the next call.
returns (list of input, list of remaining key codes).
"""
code = codes[0]
if code >= 32 and code <= 126:
key = chr(code)
return [key], codes[1:]
if code in _keyconv:
return [_keyconv[code]], codes[1:]
if code >0 and code <27:
return ["ctrl %s" % chr(ord('a')+code-1)], codes[1:]
if code >27 and code <32:
return ["ctrl %s" % chr(ord('A')+code-1)], codes[1:]
if code >127 and code <256:
key = chr(code)
return [key], codes[1:]
if code != 27:
return ["<%d>"%code], codes[1:]
result = input_trie.get(codes[1:], more_available)
if result is not None:
result, remaining_codes = result
return [result], remaining_codes
if codes[1:]:
# Meta keys -- ESC+Key form
run, remaining_codes = process_keyqueue(codes[1:],
more_available)
if run[0] == "esc" or run[0].find("meta ") >= 0:
return ['esc']+run, remaining_codes
return ['meta '+run[0]]+run[1:], remaining_codes
return ['esc'], codes[1:]
####################
## Output sequences
####################
ESC = "\x1b"
CURSOR_HOME = ESC+"[H"
CURSOR_HOME_COL = "\r"
APP_KEYPAD_MODE = ESC+"="
NUM_KEYPAD_MODE = ESC+">"
SWITCH_TO_ALTERNATE_BUFFER = ESC+"7"+ESC+"[?47h"
RESTORE_NORMAL_BUFFER = ESC+"[?47l"+ESC+"8"
#RESET_SCROLL_REGION = ESC+"[;r"
#RESET = ESC+"c"
REPORT_STATUS = ESC + "[5n"
REPORT_CURSOR_POSITION = ESC+"[6n"
INSERT_ON = ESC + "[4h"
INSERT_OFF = ESC + "[4l"
def set_cursor_position( x, y ):
assert type(x) == int
assert type(y) == int
return ESC+"[%d;%dH" %(y+1, x+1)
def move_cursor_right(x):
if x < 1: return ""
return ESC+"[%dC" % x
def move_cursor_up(x):
if x < 1: return ""
return ESC+"[%dA" % x
def move_cursor_down(x):
if x < 1: return ""
return ESC+"[%dB" % x
HIDE_CURSOR = ESC+"[?25l"
SHOW_CURSOR = ESC+"[?25h"
MOUSE_TRACKING_ON = ESC+"[?1000h"+ESC+"[?1002h"
MOUSE_TRACKING_OFF = ESC+"[?1002l"+ESC+"[?1000l"
DESIGNATE_G1_SPECIAL = ESC+")0"
ERASE_IN_LINE_RIGHT = ESC+"[K"

332
vigil/urwid/raw_display.py Normal file
View file

@ -0,0 +1,332 @@
#
# Urwid raw display module
# Copyright (C) 2004-2009 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: http://excess.org/urwid/
"""
Direct terminal UI implementation
"""
import os
import select
import sys
import termios
import tty
from urwid import escape
class Screen:
def __init__(self, input=sys.stdin, output=sys.stdout):
"""Initialize a screen that directly prints escape codes to an output
terminal.
"""
self.prev_input_resize = 0
self.set_input_timeouts()
self._mouse_tracking_enabled = False
self._next_timeout = None
# Our connections to the world
self._term_output_file = output
self._term_input_file = input
def set_input_timeouts(self, max_wait=None, complete_wait=0.125,
resize_wait=0.125):
"""
Set the get_input timeout values. All values are in floating
point numbers of seconds.
max_wait -- amount of time in seconds to wait for input when
there is no input pending, wait forever if None
complete_wait -- amount of time in seconds to wait when
get_input detects an incomplete escape sequence at the
end of the available input
resize_wait -- amount of time in seconds to wait for more input
after receiving two screen resize requests in a row to
stop Urwid from consuming 100% cpu during a gradual
window resize operation
"""
self.max_wait = max_wait
if max_wait is not None:
if self._next_timeout is None:
self._next_timeout = max_wait
else:
self._next_timeout = min(self._next_timeout, self.max_wait)
self.complete_wait = complete_wait
self.resize_wait = resize_wait
def set_mouse_tracking(self, enable=True):
"""
Enable (or disable) mouse tracking.
After calling this function get_input will include mouse
click events along with keystrokes.
"""
enable = bool(enable)
if enable == self._mouse_tracking_enabled:
return
self._mouse_tracking(enable)
self._mouse_tracking_enabled = enable
def _mouse_tracking(self, enable):
if enable:
self.write(escape.MOUSE_TRACKING_ON)
else:
self.write(escape.MOUSE_TRACKING_OFF)
def start(self, alternate_buffer=True):
"""
Initialize the screen and input mode.
alternate_buffer -- use alternate screen buffer
"""
if alternate_buffer:
self.write(escape.SWITCH_TO_ALTERNATE_BUFFER)
self._rows_used = None
else:
self._rows_used = 0
fd = self._term_input_file.fileno()
if os.isatty(fd):
self._old_termios_settings = termios.tcgetattr(fd)
tty.setcbreak(fd)
self._alternate_buffer = alternate_buffer
self._next_timeout = self.max_wait
self._mouse_tracking(self._mouse_tracking_enabled)
def stop(self):
"""
Restore the screen.
"""
fd = self._term_input_file.fileno()
if os.isatty(fd):
termios.tcsetattr(fd, termios.TCSADRAIN,
self._old_termios_settings)
self._mouse_tracking(False)
move_cursor = ""
if self._alternate_buffer:
move_cursor = escape.RESTORE_NORMAL_BUFFER
self.write(
escape.SI
+ move_cursor
+ escape.SHOW_CURSOR)
self.flush()
def write(self, data):
"""Write some data to the terminal.
You may wish to override this if you're using something other than
regular files for input and output.
"""
self._term_output_file.write(data)
def flush(self):
"""Flush the output buffer.
You may wish to override this if you're using something other than
regular files for input and output.
"""
self._term_output_file.flush()
def get_input(self, raw_keys=False):
"""Return pending input as a list.
raw_keys -- return raw keycodes as well as translated versions
This function will immediately return all the input since the
last time it was called. If there is no input pending it will
wait before returning an empty list. The wait time may be
configured with the set_input_timeouts function.
If raw_keys is False (default) this function will return a list
of keys pressed. If raw_keys is True this function will return
a ( keys pressed, raw keycodes ) tuple instead.
Examples of keys returned:
* ASCII printable characters: " ", "a", "0", "A", "-", "/"
* ASCII control characters: "tab", "enter"
* Escape sequences: "up", "page up", "home", "insert", "f1"
* Key combinations: "shift f1", "meta a", "ctrl b"
* Window events: "window resize"
When a narrow encoding is not enabled:
* "Extended ASCII" characters: "\\xa1", "\\xb2", "\\xfe"
When a wide encoding is enabled:
* Double-byte characters: "\\xa1\\xea", "\\xb2\\xd4"
When utf8 encoding is enabled:
* Unicode characters: u"\\u00a5", u'\\u253c"
Examples of mouse events returned:
* Mouse button press: ('mouse press', 1, 15, 13),
('meta mouse press', 2, 17, 23)
* Mouse drag: ('mouse drag', 1, 16, 13),
('mouse drag', 1, 17, 13),
('ctrl mouse drag', 1, 18, 13)
* Mouse button release: ('mouse release', 0, 18, 13),
('ctrl mouse release', 0, 17, 23)
"""
self._wait_for_input_ready(self._next_timeout)
keys, raw = self.parse_input(None, None, self.get_available_raw_input())
# Avoid pegging CPU at 100% when slowly resizing
if keys==['window resize'] and self.prev_input_resize:
while True:
self._wait_for_input_ready(self.resize_wait)
keys, raw2 = self.parse_input(None, None, self.get_available_raw_input())
raw += raw2
#if not keys:
# keys, raw2 = self._get_input(
# self.resize_wait)
# raw += raw2
if keys!=['window resize']:
break
if keys[-1:]!=['window resize']:
keys.append('window resize')
if keys==['window resize']:
self.prev_input_resize = 2
elif self.prev_input_resize == 2 and not keys:
self.prev_input_resize = 1
else:
self.prev_input_resize = 0
if raw_keys:
return keys, raw
return keys
_input_timeout = None
_partial_codes = None
def get_available_raw_input(self):
"""
Return any currently-available input. Does not block.
This method is only used by the default `hook_event_loop`
implementation; you can safely ignore it if you implement your own.
"""
codes = self._get_keyboard_codes()
if self._partial_codes:
codes = self._partial_codes + codes
self._partial_codes = None
return codes
def parse_input(self, event_loop, callback, codes, wait_for_more=True):
"""
Read any available input from get_available_raw_input, parses it into
keys, and calls the given callback.
The current implementation tries to avoid any assumptions about what
the screen or event loop look like; it only deals with parsing keycodes
and setting a timeout when an incomplete one is detected.
`codes` should be a sequence of keycodes, i.e. bytes. A bytearray is
appropriate, but beware of using bytes, which only iterates as integers
on Python 3.
"""
# Note: event_loop may be None for 100% synchronous support, only used
# by get_input. Not documented because you shouldn't be doing it.
if self._input_timeout and event_loop:
event_loop.remove_alarm(self._input_timeout)
self._input_timeout = None
original_codes = codes
processed = []
try:
while codes:
run, codes = escape.process_keyqueue(
codes, wait_for_more)
processed.extend(run)
except escape.MoreInputRequired:
# Set a timer to wait for the rest of the input; if it goes off
# without any new input having come in, use the partial input
k = len(original_codes) - len(codes)
processed_codes = original_codes[:k]
self._partial_codes = codes
def _parse_incomplete_input():
self._input_timeout = None
self._partial_codes = None
self.parse_input(
event_loop, callback, codes, wait_for_more=False)
if event_loop:
self._input_timeout = event_loop.alarm(
self.complete_wait, _parse_incomplete_input)
else:
processed_codes = original_codes
self._partial_codes = None
if callback:
callback(processed, processed_codes)
else:
# For get_input
return processed, processed_codes
def _get_keyboard_codes(self):
codes = []
while True:
code = self._getch_nodelay()
if code < 0:
break
codes.append(code)
return codes
def _wait_for_input_ready(self, timeout):
ready = None
fd_list = [self._term_input_file.fileno()]
while True:
try:
if timeout is None:
ready,w,err = select.select(
fd_list, [], fd_list)
else:
ready,w,err = select.select(
fd_list,[],fd_list, timeout)
break
except select.error as e:
if e.args[0] != 4:
raise
return ready
def _getch(self, timeout):
ready = self._wait_for_input_ready(timeout)
if self._term_input_file.fileno() in ready:
return ord(os.read(self._term_input_file.fileno(), 1))
return -1
def _getch_nodelay(self):
return self._getch(0)

91
vigil/worker.py Executable file
View file

@ -0,0 +1,91 @@
#!/usr/bin/env python3
# Copyright (C) 2015-2017 Andrew Hamilton. All rights reserved.
# Licensed under the Artistic License 2.0.
import asyncio
import os
import signal
import vigil.tools as tools
class Worker:
def __init__(self, is_already_paused, is_being_tested):
self.is_already_paused = is_already_paused
self.is_being_tested = is_being_tested
self.result = None
self.process = None
self.child_pgid = None
@asyncio.coroutine
def create_process(self):
create = asyncio.create_subprocess_exec(
"vigil-worker", stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
preexec_fn=os.setsid)
self.process = yield from create
pid_line = yield from self.process.stdout.readline()
self.child_pgid = int(pid_line.strip())
os.setpriority(os.PRIO_PGRP, self.child_pgid, 19)
@asyncio.coroutine
def run_tool(self, path, tool):
self.process.stdin.write(("%s\n%s\n" %
(tool.__qualname__, path)).encode("utf-8"))
data = yield from self.process.stdout.readline()
return tools.Status(int(data))
@asyncio.coroutine
def job_runner(self, summary, log, jobs_added_event,
appearance_changed_event):
yield from self.create_process()
while True:
yield from jobs_added_event.wait()
while True:
try:
self.result = summary.get_closest_placeholder()
except StopIteration:
self.result = None
if summary.result_total == summary.completed_total:
log.log_message("All results are up to date.")
if self.is_being_tested:
os.kill(os.getpid(), signal.SIGINT)
break
yield from self.result.run(log, appearance_changed_event, self)
summary.completed_total += 1
jobs_added_event.clear()
def pause(self):
if self.result is not None and \
self.result.status == tools.Status.running:
os.killpg(self.child_pgid, signal.SIGSTOP)
self.result.set_status(tools.Status.paused)
def continue_(self):
if self.result is not None and \
self.result.status == tools.Status.paused:
self.result.set_status(tools.Status.running)
os.killpg(self.child_pgid, signal.SIGCONT)
def kill(self):
if self.child_pgid is not None:
os.killpg(self.child_pgid, signal.SIGKILL)
def main():
print(os.getpgid(os.getpid()), flush=True)
try:
while True:
tool_name, path = input(), input()
tool = getattr(tools, tool_name)
result = tools.Result(path, tool)
status, result.result = tools.run_tool_no_error(path, tool)
print(status.value, flush=True)
except:
tools.log_error()
if __name__ == "__main__":
main()