Fix commands cache testing (#4257)

* fix: update failing commands-cache tests

in some cases, the thread updates the binaries faster than the next call

* chore: upgrade black to 21.5b0

* chore: upgrade mypy to 0.812 version

* fix: handle corrupt commands-cache file

* fix: parser-table output dir. this should be same as the defaults

* fix: flake8 error
This commit is contained in:
Noorhteen Raja NJ 2021-05-06 19:24:09 +05:30 committed by GitHub
parent a5292f70d1
commit 4dc08232e6
Failed to generate hash of commit
18 changed files with 41 additions and 34 deletions

View file

@ -8,7 +8,5 @@ pyzmq
matplotlib
doctr
tornado
black
pre-commit
runthis-sphinxext
livereload

View file

@ -7,6 +7,6 @@ pytest-timeout
prompt-toolkit>=3.0
pygments>=2.2
coverage>=5.3.1
black==20.8b1
black==21.5b0
pre-commit
mypy==0.790
mypy==0.812

View file

@ -60,7 +60,7 @@ def qa(ns: argparse.Namespace):
echo "---------- Running mypy ----------"
mypy --version
mypy xonsh
mypy xonsh --exclude xonsh/ply
if __name__ == '__main__':

View file

@ -98,6 +98,8 @@ cache_dir = .cache/mypy/
# warn_unused_ignores = True
warn_unused_configs = True
warn_no_return = False
; a regex to exclude certain directories
exclude = xonsh/ply
# report
show_error_context = True

View file

@ -64,8 +64,10 @@ def test_commands_cached_between_runs(commands_cache_tmp, tmp_path):
os.remove(file)
def test_commands_cache_uses_pickle_file(commands_cache_tmp, tmp_path):
def test_commands_cache_uses_pickle_file(commands_cache_tmp, tmp_path, monkeypatch):
cc = commands_cache_tmp
update_cmds_cache = MagicMock()
monkeypatch.setattr(cc, "_update_cmds_cache", update_cmds_cache)
file = tmp_path / CommandsCache.CACHE_FILE
bins = {
"bin1": (
@ -80,6 +82,7 @@ def test_commands_cache_uses_pickle_file(commands_cache_tmp, tmp_path):
file.write_bytes(pickle.dumps(bins))
assert cc.all_commands == bins
assert cc._loaded_pickled
TRUE_SHELL_ARGS = [
@ -131,7 +134,7 @@ PATTERN_BIN_USING_TTY_OR_NOT = [
@pytest.mark.parametrize("args", PATTERN_BIN_USING_TTY_OR_NOT)
@skip_if_on_windows
def test_commands_cache_predictor_default(args):
def test_commands_cache_predictor_default(args, xonsh_builtins):
cc = CommandsCache()
use_tty, patterns = args
f = open("testfile", "wb")

View file

@ -369,7 +369,7 @@ def xonsh_exit(args, stdin=None):
def xonsh_reset(args, stdin=None):
""" Clears __xonsh__.ctx"""
"""Clears __xonsh__.ctx"""
builtins.__xonsh__.ctx.clear()

View file

@ -129,7 +129,7 @@ class CommandsCache(cabc.Mapping):
self.set_cmds_cache(self._cmds_cache)
return self._cmds_cache
if self.cache_file:
if self.cache_file and self.cache_file.exists():
# pickle the result only if XONSH_DATA_DIR is set
if not self._loaded_pickled:
# first time load the commands from cache-file
@ -175,7 +175,11 @@ class CommandsCache(cabc.Mapping):
def get_cached_commands(self) -> tp.Dict[str, str]:
if self.cache_file and self.cache_file.exists():
return pickle.loads(self.cache_file.read_bytes()) or {}
try:
return pickle.loads(self.cache_file.read_bytes()) or {}
except Exception:
# the file is corrupt
self.cache_file.unlink(missing_ok=True)
return {}
def set_cmds_cache(self, allcmds: tp.Dict[str, tp.Any]) -> tp.Dict[str, tp.Any]:

View file

@ -243,7 +243,7 @@ def _safe_wait_for_active_job(last_task=None, backgrounded=False):
def get_next_task():
""" Get the next active task and put it on top of the queue"""
"""Get the next active task and put it on top of the queue"""
_clear_dead_jobs()
selected_task = None
for tid in tasks:

View file

@ -482,7 +482,7 @@ class BaseParser(object):
if not yacc_debug:
yacc_kwargs["errorlog"] = yacc.NullLogger()
if outputdir is None:
outputdir = os.path.dirname(os.path.realpath(__file__))
outputdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
yacc_kwargs["outputdir"] = outputdir
if yacc_debug:
# create parser on main thread
@ -3443,7 +3443,7 @@ class BaseParser(object):
p[0] = p2
def p_empty(self, p):
"""empty : """
"""empty :"""
p[0] = None
def p_error(self, p):

View file

@ -292,7 +292,7 @@ class CompletionContextParser:
if not debug:
yacc_kwargs["errorlog"] = yacc.NullLogger()
if outputdir is None:
outputdir = os.path.dirname(os.path.realpath(__file__))
outputdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
yacc_kwargs["outputdir"] = outputdir
# create parser on main thread, it's small and should be fast

View file

@ -140,7 +140,7 @@ def pygments_version():
@functools.lru_cache(1)
def pygments_version_info():
""" Returns `pygments`'s version as tuple of integers. """
"""Returns `pygments`'s version as tuple of integers."""
if HAS_PYGMENTS:
return tuple(int(x) for x in pygments_version().strip("<>+-=.").split("."))
else:
@ -167,7 +167,7 @@ def ptk_version():
@functools.lru_cache(1)
def ptk_version_info():
""" Returns `prompt_toolkit`'s version as tuple of integers. """
"""Returns `prompt_toolkit`'s version as tuple of integers."""
if has_prompt_toolkit():
return tuple(int(x) for x in ptk_version().strip("<>+-=.").split("."))
else:

View file

@ -566,7 +566,7 @@ class SubprocSpec:
return spec
def redirect_leading(self):
"""Manage leading redirects such as with '< input.txt COMMAND'. """
"""Manage leading redirects such as with '< input.txt COMMAND'."""
while len(self.cmd) >= 3 and self.cmd[0] == "<":
self.stdin = safe_open(self.cmd[1], "r")
self.cmd = self.cmd[2:]

View file

@ -142,7 +142,7 @@ def _first_branch_timeout_message():
def _vc_has(binary):
""" This allows us to locate binaries after git only if necessary """
"""This allows us to locate binaries after git only if necessary"""
cmds = builtins.__xonsh__.commands_cache
if cmds.is_empty():
return bool(cmds.locate_binary(binary, ignore_alias=True))

View file

@ -223,12 +223,12 @@ def load_xonsh_bindings() -> KeyBindingsBase:
@handle(Keys.ControlX, Keys.ControlE, filter=~has_selection)
def open_editor(event):
""" Open current buffer in editor """
"""Open current buffer in editor"""
event.current_buffer.open_in_editor(event.cli)
@handle(Keys.BackTab, filter=insert_mode)
def insert_literal_tab(event):
""" Insert literal tab on Shift+Tab instead of autocompleting """
"""Insert literal tab on Shift+Tab instead of autocompleting"""
b = event.current_buffer
if b.complete_state:
b.complete_previous()
@ -325,7 +325,7 @@ def load_xonsh_bindings() -> KeyBindingsBase:
@handle(Keys.ControlJ, filter=IsMultiline() & insert_mode)
@handle(Keys.ControlM, filter=IsMultiline() & insert_mode)
def multiline_carriage_return(event):
""" Wrapper around carriage_return multiline parser """
"""Wrapper around carriage_return multiline parser"""
b = event.cli.current_buffer
carriage_return(b, event.cli)
@ -378,19 +378,19 @@ def load_xonsh_bindings() -> KeyBindingsBase:
@handle(Keys.ControlX, Keys.ControlX, filter=has_selection)
def _cut(event):
""" Cut selected text. """
"""Cut selected text."""
data = event.current_buffer.cut_selection()
event.app.clipboard.set_data(data)
@handle(Keys.ControlX, Keys.ControlC, filter=has_selection)
def _copy(event):
""" Copy selected text. """
"""Copy selected text."""
data = event.current_buffer.copy_selection()
event.app.clipboard.set_data(data)
@handle(Keys.ControlV, filter=insert_mode | has_selection)
def _yank(event):
""" Paste selected text. """
"""Paste selected text."""
buff = event.current_buffer
if buff.selection_state:
buff.cut_selection()

View file

@ -68,7 +68,7 @@ class XshFunction(pytest.Item):
self._test_func(*args, **kwargs)
def repr_failure(self, excinfo):
""" called when self.runtest() raises an exception. """
"""called when self.runtest() raises an exception."""
formatted_tb = _limited_traceback(excinfo)
formatted_tb.insert(0, "xonsh execution failed\n")
formatted_tb.append("{}: {}".format(excinfo.type.__name__, excinfo.value))

View file

@ -232,7 +232,7 @@ class EnvPath(cabc.MutableSequence):
return all(map(operator.eq, self, other))
def _repr_pretty_(self, p, cycle):
""" Pretty print path list """
"""Pretty print path list"""
if cycle:
p.text("EnvPath(...)")
else:
@ -811,7 +811,7 @@ def _executables_in_windows(path):
def executables_in(path):
"""Returns a generator of files in path that the user could execute. """
"""Returns a generator of files in path that the user could execute."""
if ON_WINDOWS:
func = _executables_in_windows
else:
@ -1936,7 +1936,7 @@ def register_custom_style(
def _token_attr_from_stylemap(stylemap):
"""yields tokens attr, and index from a stylemap """
"""yields tokens attr, and index from a stylemap"""
import prompt_toolkit as ptk
if builtins.__xonsh__.shell.shell_type == "prompt_toolkit1":
@ -1953,7 +1953,7 @@ def _token_attr_from_stylemap(stylemap):
def _get_color_lookup_table():
"""Returns the prompt_toolkit win32 ColorLookupTable """
"""Returns the prompt_toolkit win32 ColorLookupTable"""
if builtins.__xonsh__.shell.shell_type == "prompt_toolkit1":
from prompt_toolkit.terminal.win32_output import ColorLookupTable
else:
@ -1962,7 +1962,7 @@ def _get_color_lookup_table():
def _get_color_indexes(style_map):
"""Generates the color and windows color index for a style """
"""Generates the color and windows color index for a style"""
table = _get_color_lookup_table()
for token, attr in _token_attr_from_stylemap(style_map):
if attr.color:
@ -2038,7 +2038,7 @@ WIN10_COLOR_MAP = LazyObject(_win10_color_map, globals(), "WIN10_COLOR_MAP")
def _win_bold_color_map():
""" Map dark ansi colors to lighter version. """
"""Map dark ansi colors to lighter version."""
return {
"ansiblack": "ansibrightblack",
"ansiblue": "ansibrightblue",
@ -2372,7 +2372,7 @@ def normabspath(p):
def expanduser_abs_path(inp):
""" Provides user expanded absolute path """
"""Provides user expanded absolute path"""
return os.path.abspath(expanduser(inp))

View file

@ -801,7 +801,7 @@ def STRIP_COLOR_RE():
def _align_string(string, align="<", fill=" ", width=80):
""" Align and pad a color formatted string """
"""Align and pad a color formatted string"""
linelen = len(STRIP_COLOR_RE.sub("", string))
padlen = max(width - linelen, 0)
if align == "^":

View file

@ -58,7 +58,7 @@ def get_module_docstring(module: str) -> str:
spec = importlib.util.find_spec(module)
if spec and spec.has_location and spec.origin:
return ast.get_docstring(ast.parse(Path(spec.origin).read_text()))
return ast.get_docstring(ast.parse(Path(spec.origin).read_text())) or ""
return ""