mirror of
https://github.com/xonsh/xonsh.git
synced 2025-03-06 17:30:59 +01:00
471 lines
18 KiB
Python
471 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Aliases for the xonsh shell."""
|
|
import os
|
|
import sys
|
|
import shlex
|
|
import inspect
|
|
import argparse
|
|
import builtins
|
|
import collections.abc as cabc
|
|
|
|
from xonsh.lazyasd import lazyobject
|
|
from xonsh.dirstack import cd, pushd, popd, dirs, _get_cwd
|
|
from xonsh.environ import locate_binary
|
|
from xonsh.foreign_shells import foreign_shell_data
|
|
from xonsh.jobs import jobs, fg, bg, clean_jobs
|
|
from xonsh.history import history_main
|
|
from xonsh.platform import (ON_ANACONDA, ON_DARWIN, ON_WINDOWS, ON_FREEBSD,
|
|
ON_NETBSD)
|
|
from xonsh.proc import foreground
|
|
from xonsh.replay import replay_main
|
|
from xonsh.timings import timeit_alias
|
|
from xonsh.tools import argvquote, escape_windows_cmd_string, to_bool
|
|
from xonsh.xontribs import xontribs_main
|
|
|
|
import xonsh.completers._aliases as xca
|
|
import xonsh.xoreutils.which as xxw
|
|
|
|
|
|
class Aliases(cabc.MutableMapping):
|
|
"""Represents a location to hold and look up aliases."""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._raw = {}
|
|
self.update(*args, **kwargs)
|
|
|
|
def get(self, key, default=None):
|
|
"""Returns the (possibly modified) value. If the key is not present,
|
|
then `default` is returned.
|
|
If the value is callable, it is returned without modification. If it
|
|
is an iterable of strings it will be evaluated recursively to expand
|
|
other aliases, resulting in a new list or a "partially applied"
|
|
callable.
|
|
"""
|
|
val = self._raw.get(key)
|
|
if val is None:
|
|
return default
|
|
elif isinstance(val, cabc.Iterable) or callable(val):
|
|
return self.eval_alias(val, seen_tokens={key})
|
|
else:
|
|
msg = 'alias of {!r} has an inappropriate type: {!r}'
|
|
raise TypeError(msg.format(key, val))
|
|
|
|
def eval_alias(self, value, seen_tokens=frozenset(), acc_args=()):
|
|
"""
|
|
"Evaluates" the alias `value`, by recursively looking up the leftmost
|
|
token and "expanding" if it's also an alias.
|
|
|
|
A value like ["cmd", "arg"] might transform like this:
|
|
> ["cmd", "arg"] -> ["ls", "-al", "arg"] -> callable()
|
|
where `cmd=ls -al` and `ls` is an alias with its value being a
|
|
callable. The resulting callable will be "partially applied" with
|
|
["-al", "arg"].
|
|
"""
|
|
# Beware of mutability: default values for keyword args are evaluated
|
|
# only once.
|
|
if callable(value):
|
|
if acc_args: # Partial application
|
|
def _alias(args, stdin=None):
|
|
args = list(acc_args) + args
|
|
return value(args, stdin=stdin)
|
|
return _alias
|
|
else:
|
|
return value
|
|
else:
|
|
expand_path = builtins.__xonsh_expand_path__
|
|
token, *rest = map(expand_path, value)
|
|
if token in seen_tokens or token not in self._raw:
|
|
# ^ Making sure things like `egrep=egrep --color=auto` works,
|
|
# and that `l` evals to `ls --color=auto -CF` if `l=ls -CF`
|
|
# and `ls=ls --color=auto`
|
|
rtn = [token]
|
|
rtn.extend(rest)
|
|
rtn.extend(acc_args)
|
|
return rtn
|
|
else:
|
|
seen_tokens = seen_tokens | {token}
|
|
acc_args = rest + list(acc_args)
|
|
return self.eval_alias(self._raw[token], seen_tokens, acc_args)
|
|
|
|
def expand_alias(self, line):
|
|
"""Expands any aliases present in line if alias does not point to a
|
|
builtin function and if alias is only a single command.
|
|
"""
|
|
word = line.split(' ', 1)[0]
|
|
if word in builtins.aliases and isinstance(self.get(word),
|
|
cabc.Sequence):
|
|
word_idx = line.find(word)
|
|
expansion = ' '.join(self.get(word))
|
|
line = line[:word_idx] + expansion + line[word_idx+len(word):]
|
|
return line
|
|
|
|
#
|
|
# Mutable mapping interface
|
|
#
|
|
|
|
def __getitem__(self, key):
|
|
return self._raw[key]
|
|
|
|
def __setitem__(self, key, val):
|
|
if isinstance(val, str):
|
|
self._raw[key] = shlex.split(val)
|
|
else:
|
|
self._raw[key] = val
|
|
|
|
def __delitem__(self, key):
|
|
del self._raw[key]
|
|
|
|
def update(self, *args, **kwargs):
|
|
for key, val in dict(*args, **kwargs).items():
|
|
self[key] = val
|
|
|
|
def __iter__(self):
|
|
yield from self._raw
|
|
|
|
def __len__(self):
|
|
return len(self._raw)
|
|
|
|
def __str__(self):
|
|
return str(self._raw)
|
|
|
|
def __repr__(self):
|
|
return '{0}.{1}({2})'.format(self.__class__.__module__,
|
|
self.__class__.__name__, self._raw)
|
|
|
|
def _repr_pretty_(self, p, cycle):
|
|
name = '{0}.{1}'.format(self.__class__.__module__,
|
|
self.__class__.__name__)
|
|
with p.group(0, name + '(', ')'):
|
|
if cycle:
|
|
p.text('...')
|
|
elif len(self):
|
|
p.break_()
|
|
p.pretty(dict(self))
|
|
|
|
|
|
def xonsh_exit(args, stdin=None):
|
|
"""Sends signal to exit shell."""
|
|
if not clean_jobs():
|
|
# Do not exit if jobs not cleaned up
|
|
return None, None
|
|
builtins.__xonsh_exit__ = True
|
|
print() # gimme a newline
|
|
return None, None
|
|
|
|
|
|
@lazyobject
|
|
def _SOURCE_FOREIGN_PARSER():
|
|
desc = "Sources a file written in a foreign shell language."
|
|
parser = argparse.ArgumentParser('source-foreign', description=desc)
|
|
parser.add_argument('shell', help='Name or path to the foreign shell')
|
|
parser.add_argument('files_or_code', nargs='+',
|
|
help='file paths to source or code in the target '
|
|
'language.')
|
|
parser.add_argument('-i', '--interactive', type=to_bool, default=True,
|
|
help='whether the sourced shell should be interactive',
|
|
dest='interactive')
|
|
parser.add_argument('-l', '--login', type=to_bool, default=False,
|
|
help='whether the sourced shell should be login',
|
|
dest='login')
|
|
parser.add_argument('--envcmd', default=None, dest='envcmd',
|
|
help='command to print environment')
|
|
parser.add_argument('--aliascmd', default=None, dest='aliascmd',
|
|
help='command to print aliases')
|
|
parser.add_argument('--extra-args', default=(), dest='extra_args',
|
|
type=(lambda s: tuple(s.split())),
|
|
help='extra arguments needed to run the shell')
|
|
parser.add_argument('-s', '--safe', type=to_bool, default=True,
|
|
help='whether the source shell should be run safely, '
|
|
'and not raise any errors, even if they occur.',
|
|
dest='safe')
|
|
parser.add_argument('-p', '--prevcmd', default=None, dest='prevcmd',
|
|
help='command(s) to run before any other commands, '
|
|
'replaces traditional source.')
|
|
parser.add_argument('--postcmd', default='', dest='postcmd',
|
|
help='command(s) to run after all other commands')
|
|
parser.add_argument('--funcscmd', default=None, dest='funcscmd',
|
|
help='code to find locations of all native functions '
|
|
'in the shell language.')
|
|
parser.add_argument('--sourcer', default=None, dest='sourcer',
|
|
help='the source command in the target shell '
|
|
'language, default: source.')
|
|
parser.add_argument('--use-tmpfile', type=to_bool, default=False,
|
|
help='whether the commands for source shell should be '
|
|
'written to a temporary file.',
|
|
dest='use_tmpfile')
|
|
parser.add_argument('--seterrprevcmd', default=None, dest='seterrprevcmd',
|
|
help='command(s) to set exit-on-error before any'
|
|
'other commands.')
|
|
parser.add_argument('--seterrpostcmd', default=None, dest='seterrpostcmd',
|
|
help='command(s) to set exit-on-error after all'
|
|
'other commands.')
|
|
return parser
|
|
|
|
|
|
def source_foreign(args, stdin=None):
|
|
"""Sources a file written in a foreign shell language."""
|
|
ns = _SOURCE_FOREIGN_PARSER.parse_args(args)
|
|
if ns.prevcmd is not None:
|
|
pass # don't change prevcmd if given explicitly
|
|
elif os.path.isfile(ns.files_or_code[0]):
|
|
# we have filename to source
|
|
ns.prevcmd = '{} "{}"'.format(ns.sourcer, '" "'.join(ns.files_or_code))
|
|
elif ns.prevcmd is None:
|
|
ns.prevcmd = ' '.join(ns.files_or_code) # code to run, no files
|
|
foreign_shell_data.cache_clear() # make sure that we don't get prev src
|
|
fsenv, fsaliases = foreign_shell_data(shell=ns.shell, login=ns.login,
|
|
interactive=ns.interactive,
|
|
envcmd=ns.envcmd,
|
|
aliascmd=ns.aliascmd,
|
|
extra_args=ns.extra_args,
|
|
safe=ns.safe, prevcmd=ns.prevcmd,
|
|
postcmd=ns.postcmd,
|
|
funcscmd=ns.funcscmd,
|
|
sourcer=ns.sourcer,
|
|
use_tmpfile=ns.use_tmpfile,
|
|
seterrprevcmd=ns.seterrprevcmd,
|
|
seterrpostcmd=ns.seterrpostcmd)
|
|
if fsenv is None:
|
|
return (None, 'xonsh: error: Source failed: '
|
|
'{}\n'.format(ns.prevcmd), 1)
|
|
# apply results
|
|
env = builtins.__xonsh_env__
|
|
denv = env.detype()
|
|
for k, v in fsenv.items():
|
|
if k in denv and v == denv[k]:
|
|
continue # no change from original
|
|
env[k] = v
|
|
# Remove any env-vars that were unset by the script.
|
|
for k in denv:
|
|
if k not in fsenv:
|
|
env.pop(k, None)
|
|
# Update aliases
|
|
baliases = builtins.aliases
|
|
for k, v in fsaliases.items():
|
|
if k in baliases and v == baliases[k]:
|
|
continue # no change from original
|
|
baliases[k] = v
|
|
|
|
|
|
def source_alias(args, stdin=None):
|
|
"""Executes the contents of the provided files in the current context.
|
|
If sourced file isn't found in cwd, search for file along $PATH to source
|
|
instead.
|
|
"""
|
|
env = builtins.__xonsh_env__
|
|
encoding = env.get('XONSH_ENCODING')
|
|
errors = env.get('XONSH_ENCODING_ERRORS')
|
|
for fname in args:
|
|
if not os.path.isfile(fname):
|
|
fname = locate_binary(fname)
|
|
with open(fname, 'r', encoding=encoding, errors=errors) as fp:
|
|
src = fp.read()
|
|
if not src.endswith('\n'):
|
|
src += '\n'
|
|
builtins.execx(src, 'exec', builtins.__xonsh_ctx__)
|
|
|
|
|
|
def source_cmd(args, stdin=None):
|
|
"""Simple cmd.exe-specific wrapper around source-foreign."""
|
|
args = list(args)
|
|
fpath = locate_binary(args[0])
|
|
args[0] = fpath if fpath else args[0]
|
|
if not os.path.isfile(args[0]):
|
|
return (None, 'xonsh: error: File not found: {}\n'.format(args[0]), 1)
|
|
prevcmd = 'call '
|
|
prevcmd += ' '.join([argvquote(arg, force=True) for arg in args])
|
|
prevcmd = escape_windows_cmd_string(prevcmd)
|
|
args.append('--prevcmd={}'.format(prevcmd))
|
|
args.insert(0, 'cmd')
|
|
args.append('--interactive=0')
|
|
args.append('--sourcer=call')
|
|
args.append('--envcmd=set')
|
|
args.append('--seterrpostcmd=if errorlevel 1 exit 1')
|
|
args.append('--use-tmpfile=1')
|
|
with builtins.__xonsh_env__.swap(PROMPT='$P$G'):
|
|
return source_foreign(args, stdin=stdin)
|
|
|
|
|
|
def xexec(args, stdin=None):
|
|
"""exec [-h|--help] command [args...]
|
|
|
|
exec (also aliased as xexec) uses the os.execvpe() function to
|
|
replace the xonsh process with the specified program. This provides
|
|
the functionality of the bash 'exec' builtin::
|
|
|
|
>>> exec bash -l -i
|
|
bash $
|
|
|
|
The '-h' and '--help' options print this message and exit.
|
|
|
|
Notes
|
|
-----
|
|
This command **is not** the same as the Python builtin function
|
|
exec(). That function is for running Python code. This command,
|
|
which shares the same name as the sh-lang statement, is for launching
|
|
a command directly in the same process. In the event of a name conflict,
|
|
please use the xexec command directly or dive into subprocess mode
|
|
explicitly with ![exec command]. For more details, please see
|
|
http://xon.sh/faq.html#exec.
|
|
"""
|
|
if len(args) == 0:
|
|
return (None, 'xonsh: exec: no args specified\n', 1)
|
|
elif args[0] == '-h' or args[0] == '--help':
|
|
return inspect.getdoc(xexec)
|
|
else:
|
|
denv = builtins.__xonsh_env__.detype()
|
|
try:
|
|
os.execvpe(args[0], args, denv)
|
|
except FileNotFoundError as e:
|
|
return (None, 'xonsh: exec: file not found: {}: {}'
|
|
'\n'.format(e.args[1], args[0]), 1)
|
|
|
|
|
|
class AWitchAWitch(argparse.Action):
|
|
SUPPRESS = '==SUPPRESS=='
|
|
|
|
def __init__(self, option_strings, version=None, dest=SUPPRESS,
|
|
default=SUPPRESS, **kwargs):
|
|
super().__init__(option_strings=option_strings, dest=dest,
|
|
default=default, nargs=0, **kwargs)
|
|
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
import webbrowser
|
|
webbrowser.open('https://github.com/xonsh/xonsh/commit/f49b400')
|
|
parser.exit()
|
|
|
|
|
|
def xonfig(args, stdin=None):
|
|
"""Runs the xonsh configuration utility."""
|
|
from xonsh.xonfig import xonfig_main # lazy import
|
|
return xonfig_main(args)
|
|
|
|
|
|
@foreground
|
|
def trace(args, stdin=None):
|
|
"""Runs the xonsh tracer utility."""
|
|
from xonsh.tracer import tracermain # lazy import
|
|
try:
|
|
return tracermain(args)
|
|
except SystemExit:
|
|
pass
|
|
|
|
|
|
def showcmd(args, stdin=None):
|
|
"""usage: showcmd [-h|--help|cmd args]
|
|
|
|
Displays the command and arguments as a list of strings that xonsh would
|
|
run in subprocess mode. This is useful for determining how xonsh evaluates
|
|
your commands and arguments prior to running these commands.
|
|
|
|
optional arguments:
|
|
-h, --help show this help message and exit
|
|
|
|
example:
|
|
>>> showcmd echo $USER can't hear "the sea"
|
|
['echo', 'I', "can't", 'hear', 'the sea']
|
|
"""
|
|
if len(args) == 0 or (len(args) == 1 and args[0] in {'-h', '--help'}):
|
|
print(showcmd.__doc__.rstrip().replace('\n ', '\n'))
|
|
else:
|
|
sys.displayhook(args)
|
|
|
|
|
|
def make_default_aliases():
|
|
"""Creates a new default aliases dictionary."""
|
|
default_aliases = {
|
|
'cd': cd,
|
|
'pushd': pushd,
|
|
'popd': popd,
|
|
'dirs': dirs,
|
|
'jobs': jobs,
|
|
'fg': fg,
|
|
'bg': bg,
|
|
'EOF': xonsh_exit,
|
|
'exit': xonsh_exit,
|
|
'quit': xonsh_exit,
|
|
'exec': xexec,
|
|
'xexec': xexec,
|
|
'source': source_alias,
|
|
'source-zsh': ['source-foreign', 'zsh', '--sourcer=source'],
|
|
'source-bash': ['source-foreign', 'bash', '--sourcer=source'],
|
|
'source-cmd': source_cmd,
|
|
'source-foreign': source_foreign,
|
|
'history': history_main,
|
|
'replay': replay_main,
|
|
'trace': trace,
|
|
'timeit': timeit_alias,
|
|
'xonfig': xonfig,
|
|
'scp-resume': ['rsync', '--partial', '-h', '--progress', '--rsh=ssh'],
|
|
'showcmd': showcmd,
|
|
'ipynb': ['jupyter', 'notebook', '--no-browser'],
|
|
'which': xxw.which,
|
|
'xontrib': xontribs_main,
|
|
'completer': xca.completer_alias
|
|
}
|
|
if ON_WINDOWS:
|
|
# Borrow builtin commands from cmd.exe.
|
|
windows_cmd_aliases = {
|
|
'cls',
|
|
'copy',
|
|
'del',
|
|
'dir',
|
|
'echo',
|
|
'erase',
|
|
'md',
|
|
'mkdir',
|
|
'mklink',
|
|
'move',
|
|
'rd',
|
|
'ren',
|
|
'rename',
|
|
'rmdir',
|
|
'time',
|
|
'type',
|
|
'vol'
|
|
}
|
|
for alias in windows_cmd_aliases:
|
|
default_aliases[alias] = ['cmd', '/c', alias]
|
|
default_aliases['call'] = ['source-cmd']
|
|
default_aliases['source-bat'] = ['source-cmd']
|
|
default_aliases['clear'] = 'cls'
|
|
if ON_ANACONDA:
|
|
# Add aliases specific to the Anaconda python distribution.
|
|
default_aliases['activate'] = ['source-cmd', 'activate.bat']
|
|
default_aliases['deactivate'] = ['source-cmd', 'deactivate.bat']
|
|
if not locate_binary('sudo'):
|
|
import xonsh.winutils as winutils
|
|
|
|
def sudo(args, sdin=None):
|
|
if len(args) < 1:
|
|
print('You need to provide an executable to run as '
|
|
'Administrator.')
|
|
return
|
|
cmd = args[0]
|
|
if locate_binary(cmd):
|
|
return winutils.sudo(cmd, args[1:])
|
|
elif cmd.lower() in windows_cmd_aliases:
|
|
args = ['/D', '/C', 'CD', _get_cwd(), '&&'] + args
|
|
return winutils.sudo('cmd', args)
|
|
else:
|
|
msg = 'Cannot find the path for executable "{0}".'
|
|
print(msg.format(cmd))
|
|
|
|
default_aliases['sudo'] = sudo
|
|
elif ON_DARWIN:
|
|
default_aliases['ls'] = ['ls', '-G']
|
|
elif ON_FREEBSD:
|
|
default_aliases['grep'] = ['grep', '--color=auto']
|
|
default_aliases['egrep'] = ['egrep', '--color=auto']
|
|
default_aliases['fgrep'] = ['fgrep', '--color=auto']
|
|
default_aliases['ls'] = ['ls', '-G']
|
|
elif ON_NETBSD:
|
|
default_aliases['grep'] = ['grep', '--color=auto']
|
|
default_aliases['egrep'] = ['egrep', '--color=auto']
|
|
default_aliases['fgrep'] = ['fgrep', '--color=auto']
|
|
else:
|
|
default_aliases['grep'] = ['grep', '--color=auto']
|
|
default_aliases['egrep'] = ['egrep', '--color=auto']
|
|
default_aliases['fgrep'] = ['fgrep', '--color=auto']
|
|
default_aliases['ls'] = ['ls', '--color=auto', '-v']
|
|
return default_aliases
|