Merge branch 'krader1961-negative-history-slices'

This commit is contained in:
Anthony Scopatz 2015-11-06 21:29:49 -05:00
commit a764975cd1
2 changed files with 105 additions and 22 deletions

View file

@ -1,12 +1,15 @@
"""Tests the xonsh history."""
from __future__ import unicode_literals, print_function
import io
import os
import sys
import nose
from nose.tools import assert_equal, assert_true
from xonsh.lazyjson import LazyJSON
from xonsh.history import History, CommandField
from xonsh import history
HIST_TEST_KWARGS = dict(sessionid='SESSIONID', gc=False)
@ -68,5 +71,75 @@ def test_cmd_field():
yield assert_equal, None, hist.outs[-1]
os.remove(FNAME)
def test_show_cmd():
FNAME = 'xonsh-SESSIONID.json'
FNAME += '.show_cmd'
cmds = ['ls', 'cat hello kitty', 'abc', 'def', 'touch me', 'grep from me']
def format_hist_line(idx, cmd):
return ' {:d} {:s}\n'.format(idx, cmd)
def run_show_cmd(hist_args, commands, base_idx=0, step=1):
stdout.seek(0, io.SEEK_SET)
stdout.truncate()
history._main(hist, hist_args) # pylint: disable=protected-access
stdout.seek(0, io.SEEK_SET)
hist_lines = stdout.readlines()
yield assert_equal, len(commands), len(hist_lines)
for idx, (cmd, actual) in enumerate(zip(commands, hist_lines)):
expected = format_hist_line(base_idx + idx * step, cmd)
yield assert_equal, expected, actual
hist = History(filename=FNAME, here='yup', **HIST_TEST_KWARGS)
stdout = io.StringIO()
saved_stdout = sys.stdout
sys.stdout = stdout
for cmd in cmds: # populate the shell history
hist.append({'inp': cmd, 'rtn': 0})
# Verify an implicit "show" emits the entire history.
for x in run_show_cmd([], cmds):
yield x
# Verify an explicit "show" with no qualifiers emits the entire history.
for x in run_show_cmd(['show'], cmds):
yield x
# Verify an explicit "show" with a reversed qualifier emits the entire
# history in reverse order.
for x in run_show_cmd(['show', '-r'], list(reversed(cmds)),
len(cmds) - 1, -1):
yield x
# Verify that showing a specific history entry relative to the start of the
# history works.
for x in run_show_cmd(['show', '0'], [cmds[0]], 0):
yield x
for x in run_show_cmd(['show', '1'], [cmds[1]], 1):
yield x
# Verify that showing a specific history entry relative to the end of the
# history works.
for x in run_show_cmd(['show', '-2'], [cmds[-2]], len(cmds) - 2):
yield x
# Verify that showing a history range relative to the start of the
# history works.
for x in run_show_cmd(['show', '0:2'], cmds[0:2], 0):
yield x
for x in run_show_cmd(['show', '1::2'], cmds[1::2], 1, 2):
yield x
# Verify that showing a history range relative to the end of the
# history works.
for x in run_show_cmd(['show', '-2:'], cmds[-2:], len(cmds) - 2):
yield x
for x in run_show_cmd(['show', '-4:-2'], cmds[-4:-2], len(cmds) - 4):
yield x
sys.stdout = saved_stdout
os.remove(FNAME)
if __name__ == '__main__':
nose.runmodule()

View file

@ -15,8 +15,9 @@ from xonsh import diff_history
class HistoryGC(Thread):
def __init__(self, wait_for_shell=True, size=None, *args, **kwargs):
"""Thread responsible for garbage collecting old history. May wait for
shell (and thus xonshrc to have been loaded) to start work.
"""Thread responsible for garbage collecting old history.
May wait for shell (and thus xonshrc to have been loaded) to start work.
"""
super(HistoryGC, self).__init__(*args, **kwargs)
self.daemon = True
@ -76,7 +77,7 @@ class HistoryGC(Thread):
pass
def unlocked_files(self):
"""Finds the history files and returns the ones that are unlocked, this is
"""Finds the history files and returns the ones that are unlocked, this is
sorted by the last closed time. Returns a list of (timestamp, file) tuples.
"""
xdd = os.path.abspath(builtins.__xonsh_env__.get('XONSH_DATA_DIR'))
@ -138,7 +139,7 @@ class CommandField(Sequence):
def __init__(self, field, hist, default=None):
"""Represents a field in the 'cmds' portion of history. Will query the buffer
for the relevant data, if possible. Otherwise it will lazily acquire data from
for the relevant data, if possible. Otherwise it will lazily acquire data from
the file.
Parameters
@ -197,23 +198,23 @@ class History(object):
Parameters
----------
filename : str, optional
Location of history file, defaults to
Location of history file, defaults to
``$XONSH_DATA_DIR/xonsh-{sessionid}.json``.
sessionid : int, uuid, str, optional
Current session identifier, will generate a new sessionid if not set.
buffersize : int, optional
Maximum buffersize in memory.
meta : optional
Top-level metadata to store along with the history. The kwargs 'cmds' and
Top-level metadata to store along with the history. The kwargs 'cmds' and
'sessionid' are not allowed and will be overwritten.
gc : bool, optional
Run garbage collector flag.
"""
self.sessionid = sid = uuid.uuid4() if sessionid is None else sessionid
if filename is None:
self.filename = os.path.join(builtins.__xonsh_env__.get('XONSH_DATA_DIR'),
if filename is None:
self.filename = os.path.join(builtins.__xonsh_env__.get('XONSH_DATA_DIR'),
'xonsh-{0}.json'.format(sid))
else:
else:
self.filename = filename
self.buffer = []
self.buffersize = buffersize
@ -241,7 +242,7 @@ class History(object):
Parameters
----------
cmd : dict
cmd : dict
Command dictionary that should be added to the ordered history.
Returns
@ -263,7 +264,7 @@ class History(object):
Parameters
----------
at_exit : bool, optional
Whether the HistoryFlusher should act as a thread in the background,
Whether the HistoryFlusher should act as a thread in the background,
or execute immeadiately and block.
Returns
@ -273,7 +274,7 @@ class History(object):
"""
if len(self.buffer) == 0:
return
hf = HistoryFlusher(self.filename, tuple(self.buffer), self._queue, self._cond,
hf = HistoryFlusher(self.filename, tuple(self.buffer), self._queue, self._cond,
at_exit=at_exit)
self.buffer.clear()
return hf
@ -289,16 +290,16 @@ def _create_parser():
if _HIST_PARSER is not None:
return _HIST_PARSER
from argparse import ArgumentParser
p = ArgumentParser(prog='history',
p = ArgumentParser(prog='history',
description='Tools for dealing with history')
subp = p.add_subparsers(title='action', dest='action')
# show action
show = subp.add_parser('show', help='displays current history, default action')
show.add_argument('-r', dest='reverse', default=False, action='store_true',
help='reverses the direction')
show.add_argument('n', nargs='?', default=None,
help='displays n current history entries, n may be an int or use '
'Python slice notation')
show.add_argument('n', nargs='?', default=None,
help='display n\'th history entry if n is a simple int, '
'or range of entries if it is Python slice notation')
# id
idp = subp.add_parser('id', help='displays the current session id')
# file
@ -323,7 +324,7 @@ def _create_parser():
bgcp = gcp.add_mutually_exclusive_group()
bgcp.add_argument('--blocking', dest='blocking', default=True, action='store_true',
help='ensures that the gc blocks the main thread, default True')
bgcp.add_argument('--non-blocking', dest='blocking', action='store_false',
bgcp.add_argument('--non-blocking', dest='blocking', action='store_false',
help='makes the gc non-blocking, and thus return sooner')
# set and return
_HIST_PARSER = p
@ -338,7 +339,7 @@ def _show(ns, hist):
if isinstance(idx, int):
inps = [inps]
indices = [idx if idx >= 0 else len(hist) + idx]
else:
else:
indices = list(range(*idx.indices(len(hist))))
ndigits = len(str(indices[-1]))
indent = ' '*(ndigits + 3)
@ -373,7 +374,7 @@ def _gc(ns, hist):
if ns.blocking:
while gc.is_alive():
continue
_MAIN_ACTIONS = {
'show': _show,
@ -384,11 +385,20 @@ _MAIN_ACTIONS = {
'gc': _gc,
}
def main(args=None, stdin=None):
"""This acts as a main funtion for history command line interfaces."""
hist = builtins.__xonsh_history__
def _main(hist, args):
"""This implements the history CLI."""
if not args or args[0] not in _MAIN_ACTIONS:
args.insert(0, 'show')
if (args[0] == 'show' and len(args) > 1 and args[-1].startswith('-') and
args[-1][1].isdigit()):
args.insert(-1, '--') # ensure parsing stops before a negative int
parser = _create_parser()
ns = parser.parse_args(args)
if ns.action is None: # apply default action
ns = parser.parse_args(['show'] + args)
_MAIN_ACTIONS[ns.action](ns, hist)
def main(args=None, stdin=None):
"""This is the history command entry point."""
_ = stdin
_main(builtins.__xonsh_history__, args)