Bulk of work done.

This commit is contained in:
JohnLunzer 2016-06-11 21:21:00 -04:00
parent 301792fffd
commit 1687ac2f25
3 changed files with 245 additions and 132 deletions

View file

@ -484,6 +484,8 @@ def test_ensure_int_or_slice():
('1:', slice(1, None, None)),
('[1:2:3]', slice(1, 2, 3)),
('(1:2:3)', slice(1, 2, 3)),
('r', False),
('r:11', False),
]
for inp, exp in cases:
obs = ensure_int_or_slice(inp)

View file

@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-
"""Implements the xonsh history object."""
import argparse
import functools
import os
from functools import lru_cache, partial
from os import listdir
from operator import itemgetter
import uuid
@ -232,6 +232,157 @@ class CommandField(Sequence):
return self is self.hist._queue[0]
def _all_xonsh_formatter(*args):
"""
Returns all history as found in XONSH_DATA_DIR.
return format: (name, start_time, index)
"""
data_dir = builtins.__xonsh_env__.get('XONSH_DATA_DIR')
data_dir = os.path.expanduser(data_dir)
files = [os.path.join(data_dir,f) for f in listdir(data_dir)
if f.startswith('xonsh-') and f.endswith('.json')]
file_hist = [lazyjson.LazyJSON(f, reopen=False).load()['cmds']
for f in files]
commands = [(c['inp'].replace('\n', ''), c['ts'][0])
for commands in file_hist for c in commands if c]
commands.sort(key=itemgetter(1))
return [(c,t,ind) for ind,(c,t) in enumerate(commands)]
def _curr_session_formatter(hist):
"""
Take in History object and return command list tuple with
format: (name, start_time, index)
"""
if not hist:
hist = builtins.__xonsh_history__
start_times= [start for start,end in hist.tss]
names = [name[:-1] for name in hist.inps]
commands = enumerate(zip(names, start_times))
return [(c,t,ind) for ind,(c,t) in commands]
def _zsh_hist_formatter(location = None):
if not location:
location = os.path.join('~','.zsh_history')
z_hist_formatted = list()
z_path = os.path.expanduser(location)
if os.path.isfile(z_path):
with open(z_path, 'r') as z_file:
z_hist = z_file.read().splitlines()
try:
if z_hist:
for ind, line in enumerate(z_hist):
start_time, command = line.split(';')
try:
start_time = float(start_time.split(':')[1])
except ValueError:
start_time = -1
z_hist_formatted.append((command, start_time, ind))
return z_hist_formatted
except:
print("There was a problem parsing {}.".format(bash_path))
return None
else:
print("No zsh history file found at: {}".format(z_path))
return None
def _bash_hist_formatter(location = None):
if not location:
location = os.path.join('~','.bash_history')
bash_hist_formatted = list()
bash_path = os.path.expanduser(location)
if os.path.isfile(bash_path):
try:
with open(bash_path, 'r') as bash_file:
bash_hist = bash_file.read().splitlines()
if bash_hist:
for ind, command in enumerate(bash_hist):
bash_hist_formatted.append((command, 0.0, ind))
return bash_hist_formatted
except Exception as e:
print("There was a problem parsing {}.".format(bash_path))
return None
else:
print("No bash history file found at: {}".format(bash_path))
return None
#
# Interface to History
#
@lru_cache()
def _create_parser():
"""Create a parser for the "history" command."""
p = argparse.ArgumentParser(prog='history',
description='Tools for dealing with history')
subp = p.add_subparsers(title='action', dest='action')
# show action
session = subp.add_parser('session',
help='displays session history, default action')
session.add_argument('-r', dest='reverse', default=False,
action='store_true',
help='reverses the direction')
session.add_argument('n', nargs='?', default=None,
help='display n\'th history entry if n is a simple '
'int, or range of entries if it is Python '
'slice notation')
# 'id' subcommand
show_all = subp.add_parser('all',
help='displays history from all sessions')
show_all.add_argument('-r', dest='reverse', default=False,
action='store_true',
help='reverses the direction')
show_all.add_argument('n', nargs='?', default=None,
help='display n\'th history entry if n is a '
'simple int, or range of entries if it '
'is Python slice notation')
zsh = subp.add_parser('zsh', help='displays history from zsh sessions')
zsh.add_argument('-r', dest='reverse', default=False,
action='store_true',
help='reverses the direction')
zsh.add_argument('n', nargs='?', default=None,
help='display n\'th history entry if n is a '
'simple int, or range of entries if it '
'is Python slice notation')
bash = subp.add_parser('bash', help='displays history from bash sessions')
bash.add_argument('-r', dest='reverse', default=False,
action='store_true',
help='reverses the direction')
bash.add_argument('n', nargs='?', default=None,
help='display n\'th history entry if n is a '
'simple int, or range of entries if it '
'is Python slice notation')
subp.add_parser('id', help='displays the current session id')
# 'file' subcommand
subp.add_parser('file', help='displays the current history filename')
# 'info' subcommand
info = subp.add_parser('info', help=('displays information about the '
'current history'))
info.add_argument('--json', dest='json', default=False, action='store_true',
help='print in JSON format')
# diff
diff = subp.add_parser('diff', help='diffs two xonsh history files')
diff_history._create_parser(p=diff)
# replay, dynamically
from xonsh import replay
rp = subp.add_parser('replay', help='replays a xonsh history file')
replay._create_parser(p=rp)
_MAIN_ACTIONS['replay'] = replay._main_action
# gc
gcp = subp.add_parser('gc', help='launches a new history garbage collector')
gcp.add_argument('--size', nargs=2, dest='size', default=None,
help=('next two arguments represent the history size and '
'units; e.g. "--size 8128 commands"'))
bgcp = gcp.add_mutually_exclusive_group()
bgcp.add_argument('--blocking', dest='blocking', default=True,
action='store_true',
help=('ensures that the gc blocks the main thread, '
'default True'))
bgcp.add_argument('--non-blocking', dest='blocking', action='store_false',
help='makes the gc non-blocking, and thus return sooner')
return p
class History(object):
"""Xonsh session history."""
@ -337,128 +488,81 @@ class History(object):
return hf
#
# Interface to History
#
@functools.lru_cache()
def _create_parser():
"""Create a parser for the "history" command."""
p = argparse.ArgumentParser(prog='history',
description='Tools for dealing with history')
subp = p.add_subparsers(title='action', dest='action')
# show action
show = subp.add_parser('show',
help='displays current history, default action')
show.add_argument('-r', dest='reverse', default=False, action='store_true',
help='reverses the direction')
show.add_argument('n', nargs='?', default=None,
help='display n\'th history entry if n is a simple int, '
'or range of entries if it is Python slice notation')
# 'id' subcommand
show_all = subp.add_parser('all', help='displays history from all sessions')
# 'id' subcommand
show_all.add_argument('-l', dest='return_list', default=False,
action='store_true',
help='returns history as a list instead of printing')
subp.add_parser('id', help='displays the current session id')
# 'file' subcommand
subp.add_parser('file', help='displays the current history filename')
# 'info' subcommand
info = subp.add_parser('info', help=('displays information about the '
'current history'))
info.add_argument('--json', dest='json', default=False, action='store_true',
help='print in JSON format')
# diff
diff = subp.add_parser('diff', help='diffs two xonsh history files')
diff_history._create_parser(p=diff)
# replay, dynamically
from xonsh import replay
rp = subp.add_parser('replay', help='replays a xonsh history file')
replay._create_parser(p=rp)
_MAIN_ACTIONS['replay'] = replay._main_action
# gc
gcp = subp.add_parser('gc', help='launches a new history garbage collector')
gcp.add_argument('--size', nargs=2, dest='size', default=None,
help=('next two arguments represent the history size and '
'units; e.g. "--size 8128 commands"'))
bgcp = gcp.add_mutually_exclusive_group()
bgcp.add_argument('--blocking', dest='blocking', default=True,
action='store_true',
help=('ensures that the gc blocks the main thread, '
'default True'))
bgcp.add_argument('--non-blocking', dest='blocking', action='store_false',
help='makes the gc non-blocking, and thus return sooner')
return p
@staticmethod
def show(ns = None, hist = None, start_index = None, end_index = None,
start_time=None, end_time=None, location = None):
"""
Show the requested portion of shell history.
Accepts multiple history sources (xonsh, bash, zsh)
May be invoked as an alias with `history all` which will
provide history as stdout or with `__xonsh_history__.show()`
which will return the history as a list with each item
in the tuple form (name, start_time, index).
"""
# Check if ns is a string, meaning it was invoked from
# __xonsh_history__
alias = True
valid_formats = {'all': _all_xonsh_formatter,
'session': partial(_curr_session_formatter, hist),
'zsh': partial(_zsh_hist_formatter, location),
'bash': partial(_bash_hist_formatter, location)}
if isinstance(ns, str) and ns in valid_formats.keys():
ns = _create_parser().parse_args([ns])
alias = False
if not ns:
ns = _create_parser().parse_args(['all'])
alias = False
try:
commands = valid_formats[ns.action]()
except KeyError:
print("{} is not a valid history format".format(ns.action))
return None
if not commands:
return None
num_of_commands = len(commands)
digits = len(str(num_of_commands))
if start_time:
if isinstance(start_time,datetime):
start_time = start_time.timestamp()
if isinstance(start_time,float):
commands = [c for c in commands if c[1] >= start_time]
else:
print("Invalid start time, must be float or datetime.")
if end_time:
if isinstance(end_time,datetime):
end_time = end_time.timestamp()
if isinstance(end_time,float):
commands = [c for c in commands if c[1] <= end_time]
else:
print("Invalid end time, must be float or datetime.")
idx = None
if ns:
idx = ensure_int_or_slice(ns.n)
if idx is False:
return None
elif isinstance(idx, int):
try:
commands = [commands[idx]]
except IndexError:
err = "Index likely not in range. Only {} commands."
print(err.format(len(commands)))
return None
else:
idx = slice(start_index,end_index)
if (isinstance(idx, slice) and
start_time is None and end_time is None):
commands = commands[idx]
if ns and ns.reverse:
commands = reversed(commands)
def _show(ns, hist):
"""Show the requested portion of the shell history."""
idx = ensure_int_or_slice(ns.n)
if len(hist) == 0:
return
inps = hist.inps[idx]
if isinstance(idx, int):
inps = [inps]
indices = [idx if idx >= 0 else len(hist) + idx]
else:
indices = list(range(*idx.indices(len(hist))))
ndigits = len(str(indices[-1]))
indent = ' '*(ndigits + 3)
if ns.reverse:
indices = reversed(indices)
inps = reversed(inps)
for i, inp in zip(indices, inps):
lines = inp.splitlines()
lines[0] = ' {0:>{1}} {2}'.format(i, ndigits, lines[0])
lines[1:] = [indent + x for x in lines[1:]]
print('\n'.join(lines))
def _all(ns, hist):
"""Show the requested portion of the shell history."""
"""
get ALL history!
"""
hist_dir = os.path.abspath(builtins.__xonsh_env__.get('XONSH_DATA_DIR'))
firstComm = None
lastComm = None
firstTime=None
lastTime=None
returnList = ns.return_list
grep = None
files = [os.path.join(hist_dir,f) for f in listdir(hist_dir)
if f.startswith('xonsh-') and f.endswith('.json')]
fileHist = [lazyjson.LazyJSON(f, reopen=False).load()['cmds']
for f in files]
commands = [(c['inp'].replace('\n', ''), c['ts'][0])
for commands in fileHist for c in commands if c]
commands.sort(key=itemgetter(1))
commands = [(c,t,ind) for ind,(c,t) in enumerate(commands)]
numOfCommands = len(commands)
digits = len(str(numOfCommands))
if grep:
commands = [c for c in commands if grep in c[0]]
if firstTime is not None:
if isinstance(firstTime,datetime):
firstTime = firstTime.timestamp()
if isinstance(firstTime,float):
commands = [c for c in commands if c[1] >= firstTime]
if lastTime is not None:
if isinstance(lastTime,datetime):
lastTime = lastTime.timestamp()
if isinstance(lastTime,float):
commands = [c for c in commands if c[1] <= lastTime]
if firstComm != None or lastComm != None and firstTime == None and lastTime == None:
commands = commands[firstComm:lastComm]
if returnList:
return [(c, datetime.fromtimestamp(t), i) for c,t,i in commands]
else:
for c,t,i in commands:
print('{:>{width}}: {}'.format(i,c, width=digits+1))
if alias:
for c,t,i in commands:
print('{:>{width}}: {}'.format(i,c, width=digits+1))
else:
return commands
def _info(ns, hist):
@ -487,8 +591,10 @@ def _gc(ns, hist):
_MAIN_ACTIONS = {
'show': _show,
'all': _all,
'session': History.show,
'all': History.show,
'zsh': History.show,
'bash': History.show,
'id': lambda ns, hist: print(hist.sessionid),
'file': lambda ns, hist: print(hist.filename),
'info': _info,
@ -501,13 +607,14 @@ def _main(hist, args):
"""This implements the history CLI."""
if not args or (args[0] not in _MAIN_ACTIONS and
args[0] not in {'-h', '--help'}):
args.insert(0, 'show')
if (args[0] == 'show' and len(args) > 1 and args[-1].startswith('-') and
args[-1][1].isdigit()):
args.insert(0, 'session')
if (args[0] in ['session', 'all', 'zsh', 'bash']
and len(args) > 1 and args[-1].startswith('-') and
args[-1][1].isdigit()):
args.insert(-1, '--') # ensure parsing stops before a negative int
ns = _create_parser().parse_args(args)
if ns.action is None: # apply default action
ns = _create_parser().parse_args(['show'] + args)
ns = _create_parser().parse_args(['session'] + args)
_MAIN_ACTIONS[ns.action](ns, hist)

View file

@ -701,11 +701,15 @@ def ensure_int_or_slice(x):
elif is_int(x):
return x
# must have a string from here on
if ':' in x:
x = x.strip('[]()')
return slice(*(int(x) if len(x) > 0 else None for x in x.split(':')))
else:
return int(x)
try:
if ':' in x:
x = x.strip('[]()')
return slice(*(int(x) if len(x) > 0 else None for x in x.split(':')))
else:
return int(x)
except ValueError:
print("Could not convert index or slice to int.")
return False
def is_string_set(x):