Merge pull request #1473 from laerus/hist_ref

history refactor
This commit is contained in:
Gil Forsyth 2016-07-31 14:25:20 -04:00 committed by GitHub
commit d2b614096d
5 changed files with 217 additions and 252 deletions

21
news/history-refactor.rst Normal file
View file

@ -0,0 +1,21 @@
**Added:**
* ``_hist_get`` that uses generators to filter and fetch
the history commands of each session.
* ``-n`` option to the show subcommand to choose
to numerate the commands.
**Changed:**
* ``_hist_show`` now uses ``_hist_get`` to print out the commands.
**Deprecated:** None
**Removed:** None
**Fixed:**
* ``_zsh_hist_parser`` not parsing history files without timestamps.
**Security:** None

View file

@ -1,10 +1,6 @@
# -*- coding: utf-8 -*-
"""Tests the xonsh history."""
# pylint: disable=protected-access
# TODO: Remove the following pylint directive when it correctly handles calls
# to nose assert_xxx functions.
# pylint: disable=no-value-for-parameter
from __future__ import unicode_literals, print_function
import io
import os
import sys
@ -71,63 +67,34 @@ def test_cmd_field(hist, xonsh_builtins):
assert 1 == hist.rtns[-1]
assert None == hist.outs[-1]
def run_show_cmd(hist_args, commands, base_idx=0, step=1):
"""Run and evaluate the output of the given show command."""
stdout = sys.stdout
stdout.seek(0, io.SEEK_SET)
stdout.truncate()
history.history_main(hist_args)
stdout.seek(0, io.SEEK_SET)
hist_lines = stdout.readlines()
assert len(commands) == len(hist_lines)
for idx, (cmd, actual) in enumerate(zip(commands, hist_lines)):
expected = ' {:d}: {:s}\n'.format(base_idx + idx * step, cmd)
assert expected == actual
def test_show_cmd(hist, xonsh_builtins):
cmds = ['ls', 'cat hello kitty', 'abc', 'def', 'touch me', 'grep from me']
@pytest.mark.parametrize('inp, commands, offset', [
('', cmds, (0, 1)),
('-r', list(reversed(cmds)), (len(cmds)- 1, -1)),
('0', cmds[0:1], (0, 1)),
('1', cmds[1:2], (1, 1)),
('-2', cmds[-2:-1], (len(cmds) -2 , 1)),
('1:3', cmds[1:3], (1, 1)),
('1::2', cmds[1::2], (1, 2)),
('-4:-2', cmds[-4:-2], (len(cmds) - 4, 1))
])
def test_show_cmd_numerate(inp, commands, offset, hist, xonsh_builtins, capsys):
"""Verify that CLI history commands work."""
cmds = ['ls', 'cat hello kitty', 'abc', 'def', 'touch me', 'grep from me']
sys.stdout = io.StringIO()
base_idx, step = offset
xonsh_builtins.__xonsh_history__ = hist
xonsh_builtins.__xonsh_env__['HISTCONTROL'] = set()
for ts,cmd in enumerate(cmds): # populate the shell history
hist.append({'inp': cmd, 'rtn': 0, 'ts':(ts+1, ts+1.5)})
# Verify an implicit "show" emits show history
run_show_cmd([], cmds)
exp = ('{}: {}'.format(base_idx + idx * step, cmd)
for idx, cmd in enumerate(list(commands)))
exp = '\n'.join(exp)
# Verify an explicit "show" with no qualifiers emits
# show history.
run_show_cmd(['show'], cmds)
# Verify an explicit "show" with a reversed qualifier
# emits show history in reverse order.
run_show_cmd(['show', '-r'], list(reversed(cmds)),
len(cmds) - 1, -1)
# Verify that showing a specific history entry relative to
# the start of the history works.
run_show_cmd(['show', '0'], [cmds[0]], 0)
run_show_cmd(['show', '1'], [cmds[1]], 1)
# Verify that showing a specific history entry relative to
# the end of the history works.
run_show_cmd(['show', '-2'], [cmds[-2]],
len(cmds) - 2)
# Verify that showing a history range relative to the start of the
# history works.
run_show_cmd(['show', '0:2'], cmds[0:2], 0)
run_show_cmd(['show', '1::2'], cmds[1::2], 1, 2)
# Verify that showing a history range relative to the end of the
# history works.
run_show_cmd(['show', '-2:'],
cmds[-2:], len(cmds) - 2)
run_show_cmd(['show', '-4:-2'],
cmds[-4:-2], len(cmds) - 4)
sys.stdout = sys.__stdout__
history.history_main(['show', '-n'] + shlex.split(inp))
out, err = capsys.readouterr()
assert out.rstrip() == exp
def test_histcontrol(hist, xonsh_builtins):
@ -195,16 +162,22 @@ def test_parse_args_help(args, capsys):
@pytest.mark.parametrize('args, exp', [
('', ('show', 'session', [])),
('show', ('show', 'session', [])),
('show session', ('show', 'session', [])),
('show session 15', ('show', 'session', ['15'])),
('show bash 3:5 15:66', ('show', 'bash', ['3:5', '15:66'])),
('show zsh 3 5:6 16 9:3', ('show', 'zsh', ['3', '5:6', '16', '9:3'])),
('', ('show', 'session', [], False, False)),
('1:5', ('show', 'session', ['1:5'], False, False)),
('show', ('show', 'session', [], False, False)),
('show 15', ('show', 'session', ['15'], False, False)),
('show bash 3:5 15:66', ('show', 'bash', ['3:5', '15:66'], False, False)),
('show -r', ('show', 'session', [], False, True)),
('show -rn bash', ('show', 'bash', [], True, True)),
('show -n -r -30:20', ('show', 'session', ['-30:20'], True, True)),
('show -n zsh 1:2:3', ('show', 'zsh', ['1:2:3'], True, False))
])
def test_parser_show(args, exp):
args = _hist_parse_args(shlex.split(args))
action, session, slices = exp
assert args.action == action
assert args.session == session
assert args.slices == slices
# use dict instead of argparse.Namespace for pretty pytest diff
exp_ns = {'action': exp[0],
'session': exp[1],
'slices': exp[2],
'numerate': exp[3],
'reverse': exp[4]}
ns = _hist_parse_args(shlex.split(args))
assert ns.__dict__ == exp_ns

View file

@ -815,6 +815,7 @@ def test_bool_or_int_to_str(inp, exp):
@pytest.mark.parametrize('inp, exp', [
(42, slice(42, 43)),
(None, slice(None, None, None)),
(slice(1,2), slice(1,2)),
('42', slice(42, 43)),
('-42', slice(-42, -41)),
('1:2:3', slice(1, 2, 3)),
@ -823,25 +824,28 @@ def test_bool_or_int_to_str(inp, exp):
('1:', slice(1, None, None)),
('[1:2:3]', slice(1, 2, 3)),
('(1:2:3)', slice(1, 2, 3)),
((4, 8, 10), slice(4, 8, 10)),
([10,20], slice(10,20))
])
def test_ensure_slice(inp, exp):
obs = ensure_slice(inp)
assert exp == obs
@pytest.mark.parametrize('inp, error', [
('42.3', ValueError),
('3:asd5:1', ValueError),
('test' , ValueError),
('6.53:100:5', ValueError),
('4:-', ValueError),
('2:15-:3', ValueError),
('50:-:666', ValueError),
(object(), TypeError),
([], TypeError)
@pytest.mark.parametrize('inp', [
'42.3',
'3:asd5:1',
'test' ,
'6.53:100:5',
'4:-',
'2:15-:3',
'50:-:666',
object(),
[1,5,3,4],
('foo')
])
def test_ensure_slice_invalid(inp, error):
with pytest.raises(error):
def test_ensure_slice_invalid(inp):
with pytest.raises(ValueError):
obs = ensure_slice(inp)

View file

@ -2,15 +2,15 @@
"""Implements the xonsh history object."""
import os
import sys
import uuid
import time
import json
import glob
import json
import time
import uuid
import argparse
import operator
import datetime
import builtins
import datetime
import functools
import itertools
import threading
import collections
import collections.abc as abc
@ -236,36 +236,26 @@ class CommandField(abc.Sequence):
return self is self.hist._queue[0]
def _find_histfile_var(file_list=None, default=None):
if file_list is None:
return None
hist_file = None
found_hist = False
def _find_histfile_var(file_list, default=None):
"""Return the path of the history file
from the value of the envvar HISTFILE.
"""
for f in file_list:
f = expanduser_abs_path(f)
if not os.path.isfile(f):
continue
with open(f, 'r') as rc_file:
for line in rc_file:
if "HISTFILE=" in line:
evar = line.split(' ', 1)[-1]
hist_file = evar.split('=', 1)[-1]
for char in ['"', "'", '\n']:
hist_file = hist_file.replace(char, '')
if line.startswith('HISTFILE='):
hist_file = line.split('=', 1)[1].strip('\'"\n')
hist_file = expanduser_abs_path(hist_file)
if os.path.isfile(hist_file):
found_hist = True
break
if found_hist:
break
if hist_file is None:
default = expanduser_abs_path(default)
if os.path.isfile(default):
hist_file = default
return hist_file
return hist_file
else:
if default:
default = expanduser_abs_path(default)
if os.path.isfile(default):
return default
def _all_xonsh_parser(**kwargs):
@ -279,19 +269,17 @@ def _all_xonsh_parser(**kwargs):
files = [os.path.join(data_dir, f) for f in os.listdir(data_dir)
if f.startswith('xonsh-') and f.endswith('.json')]
file_hist = []
ind = 0
for f in files:
try:
json_file = LazyJSON(f, reopen=False)
file_hist.append(json_file.load()['cmds'])
except ValueError:
# Invalid json file
pass
commands = [(c['inp'][:-1] if c['inp'].endswith('\n') else c['inp'],
c['ts'][0])
for commands in file_hist for c in commands if c]
commands.sort(key=operator.itemgetter(1))
return [(c, t, ind) for ind, (c, t) in enumerate(commands)]
commands = json_file.load()['cmds']
for c in commands:
yield (c['inp'].rstrip(), c['ts'][0], ind)
ind += 1
def _curr_session_parser(hist=None, **kwargs):
@ -301,28 +289,22 @@ def _curr_session_parser(hist=None, **kwargs):
"""
if hist is None:
hist = builtins.__xonsh_history__
if not hist:
return None
start_times = [start for start, end in hist.tss]
names = [name[:-1] if name.endswith('\n') else name
for name in hist.inps]
commands = enumerate(zip(names, start_times))
return [(c, t, ind) for ind, (c, t) in commands]
start_times = (start for start, end in hist.tss)
names = (name.rstrip() for name in hist.inps)
for ind, (c, t) in enumerate(zip(names, start_times)):
yield (c, t, ind)
def _zsh_hist_parser(location=None, **kwargs):
default_location = os.path.join('~', '.zsh_history')
location_list = [os.path.join('~', '.zshrc'),
os.path.join('~', '.zprofile')]
"""Yield commands from zsh history file"""
if location is None:
location = _find_histfile_var(location_list, default_location)
z_hist_formatted = []
if location and os.path.isfile(location):
with open(location, 'r', errors='backslashreplace') as z_file:
z_txt = z_file.read()
z_hist = z_txt.splitlines()
if z_hist:
for ind, line in enumerate(z_hist):
location = _find_histfile_var([os.path.join('~', '.zshrc'),
os.path.join('~', '.zprofile')],
os.path.join('~', '.zsh_history'))
if location:
with open(location, 'r', errors='backslashreplace') as zsh_hist:
for ind, line in enumerate(zsh_hist):
if line.startswith(':'):
try:
start_time, command = line.split(';', 1)
except ValueError:
@ -331,29 +313,25 @@ def _zsh_hist_parser(location=None, **kwargs):
try:
start_time = float(start_time.split(':')[1])
except ValueError:
start_time = -1
z_hist_formatted.append((command, start_time, ind))
return z_hist_formatted
start_time = 0.0
yield (command.rstrip(), start_time, ind)
else:
yield (line.rstrip(), 0.0, ind)
else:
print("No zsh history file found", file=sys.stderr)
def _bash_hist_parser(location=None, **kwargs):
default_location = os.path.join('~', '.bash_history')
location_list = [os.path.join('~', '.bashrc'),
os.path.join('~', '.bash_profile')]
"""Yield commands from bash history file"""
if location is None:
location = _find_histfile_var(location_list, default_location)
bash_hist_formatted = []
if location and os.path.isfile(location):
with open(location, 'r', errors='backslashreplace') as bash_file:
b_txt = bash_file.read()
bash_hist = b_txt.splitlines()
if bash_hist:
for ind, command in enumerate(bash_hist):
bash_hist_formatted.append((command, 0.0, ind))
return bash_hist_formatted
location = _find_histfile_var([os.path.join('~', '.bashrc'),
os.path.join('~', '.bash_profile')],
os.path.join('~', '.bash_history'))
if location:
with open(location, 'r', errors='backslashreplace') as bash_hist:
for ind, line in enumerate(bash_hist):
yield (line.rstrip(), 0.0, ind)
else:
print("No bash history file", file=sys.stderr)
@ -365,11 +343,11 @@ def _hist_create_parser():
description='Tools for dealing with history')
subp = p.add_subparsers(title='action', dest='action')
# session action
show = subp.add_parser('show', aliases=['session'],
help='displays session history, default action')
show = subp.add_parser('show', help='displays session history, default action')
show.add_argument('-r', dest='reverse', default=False,
action='store_true',
help='reverses the direction')
action='store_true', help='reverses the direction')
show.add_argument('-n', dest='numerate', default=False, action='store_true',
help='numerate each command')
show.add_argument('session', nargs='?', choices=_HIST_SESSIONS.keys(), default='session',
help='Choose a history session, defaults to current session')
show.add_argument('slices', nargs=argparse.REMAINDER, default=[],
@ -407,103 +385,83 @@ def _hist_create_parser():
return p
def _hist_show(ns=None, hist=None, start_index=None, end_index=None,
start_time=None, end_time=None, location=None):
"""Show the requested portion of shell history.
Accepts multiple history sources (xonsh, bash, zsh)
def _hist_get_portion(commands, slices):
"""Yield from portions of history commands."""
if len(slices) == 1:
s = ensure_slice(slices[0])
try:
yield from itertools.islice(commands, s.start, s.stop, s.step)
return
except ValueError: # islice failed
pass
commands = list(commands)
for s in slices:
s = ensure_slice(s)
yield from commands[s]
May be invoked as an alias with history all/bash/zsh which will
provide history as stdout or with __xonsh_history__.show()
which will return the history as a list with each item
in the tuple form (name, start_time, index).
If invoked via __xonsh_history__.show() then the ns parameter
can be supplied as a str with the follow options::
def _hist_filter_ts(commands, start_time=None, end_time=None):
"""Yield only the commands between start and end time."""
if start_time is None:
start_time = 0.0
elif isinstance(start_time, datetime.datetime):
start_time = start_time.timestamp()
if end_time is None:
end_time = float('inf')
elif isinstance(end_time, datetime.datetime):
end_time = end_time.timestamp()
for cmd in commands:
if start_time <= cmd[1] < end_time:
yield cmd
session - returns xonsh history from current session
xonsh - returns xonsh history from all sessions
all - alias of xonsh
zsh - returns all zsh history
bash - returns all bash history
def _hist_get(session='session', slices=None,
start_time=None, end_time=None, location=None):
"""Get the requested portion of shell history.
Parameters
----------
session: {'session', 'all', 'xonsh', 'bash', 'zsh'}
The history session to get.
slices : list of slice-like objects, optional
Get only portions of history.
start_time, end_time: float, optional
Filter commands by timestamp.
location: string, optional
The history file location (bash or zsh)
Returns
-------
generator
A filtered list of commands
"""
# Check if ns is a string, meaning it was invoked from
if hist is None:
hist = bultins.__xonsh_history__
alias = True
if isinstance(ns, str) and ns in _HIST_SESSIONS:
ns = _hist_create_parser().parse_args([ns])
alias = False
if not ns:
ns = _hist_create_parser().parse_args(['show', 'xonsh'])
alias = False
cmds = _HIST_SESSIONS[session](location=location)
if slices:
cmds = _hist_get_portion(cmds, slices)
if start_time or end_time:
cmds = _hist_filter_ts(cmds, start_time, end_time)
return cmds
def _hist_show(ns, *args, **kwargs):
"""Show the requested portion of shell history.
Accepts same parameters with `_hist_get`.
"""
commands = _hist_get(ns.session, ns.slices, **kwargs)
try:
commands = _HIST_SESSIONS[ns.session](hist=hist, location=location)
except KeyError:
print("{} is not a valid history session".format(ns.action))
return None
if not commands:
return None
if start_time:
if isinstance(start_time, datetime.datetime):
start_time = start_time.timestamp()
if isinstance(start_time, float):
commands = [c for c in commands if c[1] >= start_time]
if ns.reverse:
commands = reversed(list(commands))
if not ns.numerate:
for c, _, _ in commands:
print(c)
else:
print("Invalid start time, must be float or datetime.")
if end_time:
if isinstance(end_time, datetime.datetime):
end_time = end_time.timestamp()
if isinstance(end_time, float):
commands = [c for c in commands if c[1] <= end_time]
else:
print("Invalid end time, must be float or datetime.")
idx = None
if ns:
_commands = []
for s in ns.slices:
try:
s = ensure_slice(s)
except (ValueError, TypeError):
print('{!r} is not a valid slice format'.format(s), file=sys.stderr)
return
if s:
try:
_commands.extend(commands[s])
except IndexError:
err = "Index likely not in range. Only {} commands."
print(err.format(len(commands)), file=sys.stderr)
return
else:
if _commands:
commands = _commands
else:
idx = slice(start_index, end_index)
if (isinstance(idx, slice) and
start_time is None and end_time is None):
commands = commands[idx]
if ns and ns.reverse:
commands = list(reversed(commands))
if commands:
digits = len(str(max([i for c, t, i in commands])))
if alias:
for c, t, i in commands:
for line_ind, line in enumerate(c.split('\n')):
if line_ind == 0:
print('{:>{width}}: {}'.format(i, line,
width=digits + 1))
else:
print(' {:>>{width}} {}'.format('', line,
width=digits + 1))
else:
return commands
for c, _, i in commands:
print('{}: {}'.format(i, c))
except ValueError as err:
print("history: error: {}".format(err), file=sys.stderr)
#
# Interface to History
#
class History(object):
"""Xonsh session history.
@ -625,17 +583,15 @@ class History(object):
return hf
def show(self, *args, **kwargs):
"""
Returns shell history as a list
"""Return shell history as a list
Valid options:
`session` - returns xonsh history from current session
`show` - alias of `session`
`xonsh` - returns xonsh history from all sessions
`zsh` - returns all zsh history
`bash` - returns all bash history
"""
return _hist_show(*args, **kwargs)
return list(_hist_get(*args, **kwargs))
def _hist_info(ns, hist):
@ -687,14 +643,19 @@ def _hist_parse_args(args):
"""Parse arguments using the history argument parser."""
parser = _hist_create_parser()
if not args:
args = ['show']
elif args[0] not in ['-h', '--help'] and args[0] not in _HIST_MAIN_ACTIONS:
args.insert(0, 'show')
if (args[0] == 'show'
and len(args) > 1
and args[1] not in ['-h', '--help', '-r']
and args[1] not in _HIST_SESSIONS):
args.insert(1, 'session')
args = ['show', 'session']
elif args[0] not in _HIST_MAIN_ACTIONS and args[0] not in ('-h', '--help'):
args = ['show', 'session'] + args
elif args[0] == 'show':
slices_index = 0
for i, a in enumerate(args[1:], 1):
if a in _HIST_SESSIONS:
break
elif a.startswith('-') and a.lstrip('-').isalpha():
# get last optional arg, before slices
slices_index = i
else: # no session arg found, insert before slices
args.insert(slices_index + 1, 'session')
return parser.parse_args(args)
@ -702,4 +663,5 @@ def history_main(args=None, stdin=None):
"""This is the history command entry point."""
hist = builtins.__xonsh_history__
ns = _hist_parse_args(args)
_HIST_MAIN_ACTIONS[ns.action](ns, hist)
if ns:
_HIST_MAIN_ACTIONS[ns.action](ns, hist)

View file

@ -926,9 +926,11 @@ def SLICE_REG():
def ensure_slice(x):
"""Convert a string or int to a slice."""
if x is None:
"""Try to convert an object into a slice, complain on failure"""
if not x:
return slice(None)
elif isinstance(x, slice):
return x
try:
x = int(x)
s = slice(x, x+1)
@ -941,7 +943,10 @@ def ensure_slice(x):
else:
raise ValueError('cannot convert {!r} to slice'.format(x))
except TypeError:
raise TypeError('ensure_slice() argument must be a string or a number not {}'.format(type(x)))
try:
s = slice(*(int(i) for i in x))
except (TypeError, ValueError):
raise ValueError('cannot convert {!r} to slice'.format(x))
return s