Merge branch 'master' of https://github.com/scopatz/xonsh into hg-prompt

Conflicts:
	xonsh/environ.py
This commit is contained in:
Nathan Goldbaum 2015-05-16 19:08:13 -07:00
commit c723cb9802
21 changed files with 990 additions and 366 deletions

10
docs/api/base_shell.rst Normal file
View file

@ -0,0 +1,10 @@
.. _xonsh_base_shell:
******************************************************
Base Shell Class (``xonsh.base_shell``)
******************************************************
.. automodule:: xonsh.base_shell
:members:
:undoc-members:
:inherited-members:

10
docs/api/history.rst Normal file
View file

@ -0,0 +1,10 @@
.. _xonsh_history:
******************************************************
History Object (``xonsh.history``)
******************************************************
.. automodule:: xonsh.history
:members:
:undoc-members:
:inherited-members:

View file

@ -31,6 +31,11 @@ For those of you who want the gritty details.
inspectors
completer
shell
base_shell
readline_shell
prompt_toolkit_shell
prompt_toolkit_completer
history
**Helpers:**

View file

@ -0,0 +1,10 @@
.. _xonsh_prompt_toolkit_completer:
*************************************************************
Prompt Toolkit Completer (``xonsh.prompt_toolkit_completer``)
*************************************************************
.. automodule:: xonsh.prompt_toolkit_completer
:members:
:undoc-members:
:inherited-members:

View file

@ -0,0 +1,10 @@
.. _xonsh_prompt_toolkit_shell:
******************************************************
Prompt Toolkit Shell (``xonsh.prompt_toolkit_shell``)
******************************************************
.. automodule:: xonsh.prompt_toolkit_shell
:members:
:undoc-members:
:inherited-members:

View file

@ -0,0 +1,10 @@
.. _xonsh_readline_shell:
******************************************************
Readline Shell (``xonsh.readline_shell``)
******************************************************
.. automodule:: xonsh.readline_shell
:members:
:undoc-members:
:inherited-members:

View file

@ -1,7 +1,7 @@
.. _xonsh_shell:
******************************************************
Shell Command Prompt (``xonsh.shell``)
Main Shell Command Prompt (``xonsh.shell``)
******************************************************
.. automodule:: xonsh.shell

View file

@ -1,4 +1,4 @@
"""Tests the xonsh lexer."""
"""Tests the xonsh builtins."""
from __future__ import unicode_literals, print_function
import os
import re
@ -7,37 +7,11 @@ import nose
from nose.tools import assert_equal, assert_true, assert_not_in
from xonsh import built_ins
from xonsh.built_ins import Env, reglob, regexpath, helper, superhelper, \
from xonsh.built_ins import reglob, regexpath, helper, superhelper, \
ensure_list_of_strs
from xonsh.environ import Env
from xonsh.tools import ON_WINDOWS
def test_env_normal():
env = Env(VAR='wakka')
assert_equal('wakka', env['VAR'])
def test_env_path_list():
env = Env(MYPATH=['wakka'])
assert_equal(['wakka'], env['MYPATH'])
def test_env_path_str():
env = Env(MYPATH='wakka' + os.pathsep + 'jawaka')
assert_equal(['wakka', 'jawaka'], env['MYPATH'])
def test_env_detype():
env = Env(MYPATH=['wakka', 'jawaka'])
assert_equal({'MYPATH': 'wakka' + os.pathsep + 'jawaka'}, env.detype())
def test_env_detype_mutable_access_clear():
env = Env(MYPATH=['wakka', 'jawaka'])
assert_equal({'MYPATH': 'wakka' + os.pathsep + 'jawaka'}, env.detype())
env['MYPATH'][0] = 'woah'
assert_equal(None, env._detyped)
assert_equal({'MYPATH': 'woah' + os.pathsep + 'jawaka'}, env.detype())
def test_env_detype_no_dict():
env = Env(YO={'hey': 42})
det = env.detype()
assert_not_in('YO', det)
def test_reglob_tests():
testfiles = reglob('test_.*')

40
tests/test_environ.py Normal file
View file

@ -0,0 +1,40 @@
"""Tests the xonsh environment."""
from __future__ import unicode_literals, print_function
import os
import nose
from nose.tools import assert_equal, assert_true, assert_not_in
from xonsh.environ import Env
def test_env_normal():
env = Env(VAR='wakka')
assert_equal('wakka', env['VAR'])
def test_env_path_list():
env = Env(MYPATH=['wakka'])
assert_equal(['wakka'], env['MYPATH'])
def test_env_path_str():
env = Env(MYPATH='wakka' + os.pathsep + 'jawaka')
assert_equal(['wakka', 'jawaka'], env['MYPATH'])
def test_env_detype():
env = Env(MYPATH=['wakka', 'jawaka'])
assert_equal({'MYPATH': 'wakka' + os.pathsep + 'jawaka'}, env.detype())
def test_env_detype_mutable_access_clear():
env = Env(MYPATH=['wakka', 'jawaka'])
assert_equal({'MYPATH': 'wakka' + os.pathsep + 'jawaka'}, env.detype())
env['MYPATH'][0] = 'woah'
assert_equal(None, env._detyped)
assert_equal({'MYPATH': 'woah' + os.pathsep + 'jawaka'}, env.detype())
def test_env_detype_no_dict():
env = Env(YO={'hey': 42})
det = env.detype()
assert_not_in('YO', det)
if __name__ == '__main__':
nose.runmodule()

View file

@ -0,0 +1,43 @@
"""Tests some tools function for prompt_toolkit integration."""
from __future__ import unicode_literals, print_function
import nose
from nose.tools import assert_equal
from xonsh.tools import FakeChar
from xonsh.tools import format_prompt_for_prompt_toolkit
def test_format_prompt_for_prompt_toolkit():
cases = [
('root $ ', ['r', 'o', 'o', 't', ' ', '$', ' ']),
('\001\033[0;31m\002>>',
[FakeChar('>', prefix='\001\033[0;31m\002'), '>']
),
('\001\033[0;31m\002>>\001\033[0m\002',
[FakeChar('>', prefix='\001\033[0;31m\002'),
FakeChar('>', suffix='\001\033[0m\002')]
),
('\001\033[0;31m\002>\001\033[0m\002',
[FakeChar('>',
prefix='\001\033[0;31m\002',
suffix='\001\033[0m\002')
]
),
('\001\033[0;31m\002> $\001\033[0m\002',
[FakeChar('>', prefix='\001\033[0;31m\002'),
' ',
FakeChar('$', suffix='\001\033[0m\002')]
),
('\001\033[0;31m\002\001\033[0;32m\002$> \001\033[0m\002',
[FakeChar('$', prefix='\001\033[0;31m\002\001\033[0;32m\002'),
'>',
FakeChar(' ', suffix='\001\033[0m\002')]
),
]
for test, ans in cases:
assert_equal(format_prompt_for_prompt_toolkit(test), ans)
if __name__ == '__main__':
nose.runmodule()

View file

@ -1,12 +1,14 @@
"""Tests the xonsh lexer."""
from __future__ import unicode_literals, print_function
import os
import nose
from nose.tools import assert_equal
from nose.tools import assert_equal, assert_true, assert_false
from xonsh.lexer import Lexer
from xonsh.tools import subproc_toks, subexpr_from_unbalanced
from xonsh.tools import escape_windows_title_string
from xonsh.tools import subproc_toks, subexpr_from_unbalanced, is_int, \
always_true, always_false, ensure_string, is_env_path, str_to_env_path, \
env_path_to_str, escape_windows_title_string
LEXER = Lexer()
LEXER.build()
@ -147,6 +149,55 @@ def test_subexpr_from_unbalanced_parens():
obs = subexpr_from_unbalanced(expr, '(', ')')
yield assert_equal, exp, obs
def test_is_int():
yield assert_true, is_int(42)
yield assert_false, is_int('42')
def test_always_true():
yield assert_true, always_true(42)
yield assert_true, always_true('42')
def test_always_false():
yield assert_false, always_false(42)
yield assert_false, always_false('42')
def test_ensure_string():
cases = [
(42, '42'),
('42', '42'),
]
for inp, exp in cases:
obs = ensure_string(inp)
yield assert_equal, exp, obs
def test_is_env_path():
cases = [
('/home/wakka', False),
(['/home/jawaka'], True),
]
for inp, exp in cases:
obs = is_env_path(inp)
yield assert_equal, exp, obs
def test_str_to_env_path():
cases = [
('/home/wakka', ['/home/wakka']),
('/home/wakka' + os.pathsep + '/home/jawaka',
['/home/wakka', '/home/jawaka']),
]
for inp, exp in cases:
obs = str_to_env_path(inp)
yield assert_equal, exp, obs
def test_env_path_to_str():
cases = [
(['/home/wakka'], '/home/wakka'),
(['/home/wakka', '/home/jawaka'],
'/home/wakka' + os.pathsep + '/home/jawaka'),
]
for inp, exp in cases:
obs = env_path_to_str(inp)
yield assert_equal, exp, obs
def test_escape_windows_title_string():

109
xonsh/base_shell.py Normal file
View file

@ -0,0 +1,109 @@
"""The base class for xonsh shell"""
import sys
import builtins
import traceback
from xonsh.execer import Execer
from xonsh.tools import XonshError, escape_windows_title_string
from xonsh.tools import ON_WINDOWS
from xonsh.completer import Completer
from xonsh.environ import multiline_prompt, format_prompt
class BaseShell(object):
"""The xonsh shell."""
def __init__(self, execer, ctx, **kwargs):
super().__init__(**kwargs)
self.execer = execer
self.ctx = ctx
self.completer = Completer()
self.buffer = []
self.need_more_lines = False
self.mlprompt = None
def emptyline(self):
"""Called when an empty line has been entered."""
self.need_more_lines = False
self.default('')
def precmd(self, line):
"""Called just before execution of line."""
return line if self.need_more_lines else line.lstrip()
def default(self, line):
"""Implements code execution."""
line = line if line.endswith('\n') else line + '\n'
code = self.push(line)
if code is None:
return
try:
self.execer.exec(code, mode='single', glbs=self.ctx) # no locals
except XonshError as e:
print(e.args[0], file=sys.stderr, end='')
except:
traceback.print_exc()
if builtins.__xonsh_exit__:
return True
def push(self, line):
"""Pushes a line onto the buffer and compiles the code in a way that
enables multiline input.
"""
code = None
self.buffer.append(line)
if self.need_more_lines:
return code
src = ''.join(self.buffer)
try:
code = self.execer.compile(src,
mode='single',
glbs=None,
locs=self.ctx)
self.reset_buffer()
except SyntaxError:
if line == '\n':
self.reset_buffer()
traceback.print_exc()
return None
self.need_more_lines = True
return code
def reset_buffer(self):
"""Resets the line buffer."""
self.buffer.clear()
self.need_more_lines = False
self.mlprompt = None
def settitle(self):
"""Sets terminal title."""
env = builtins.__xonsh_env__
term = env.get('TERM', None)
if term is None or term == 'linux':
return
if 'TITLE' in env:
t = env['TITLE']
else:
return
t = format_prompt(t)
if ON_WINDOWS and 'ANSICON' not in env:
t = escape_windows_title_string(t)
os.system('title {}'.format(t))
else:
sys.stdout.write("\x1b]2;{0}\x07".format(t))
@property
def prompt(self):
"""Obtains the current prompt string."""
if self.need_more_lines:
if self.mlprompt is None:
self.mlprompt = multiline_prompt()
return self.mlprompt
env = builtins.__xonsh_env__
if 'PROMPT' in env:
p = env['PROMPT']
p = format_prompt(p)
else:
p = "set '$PROMPT = ...' $ "
self.settitle()
return p

View file

@ -6,7 +6,6 @@ import re
import sys
import shlex
import signal
import locale
import inspect
import builtins
import subprocess
@ -20,7 +19,7 @@ from collections import Sequence, MutableMapping, Iterable, namedtuple, \
from xonsh.tools import string_types
from xonsh.tools import suggest_commands, XonshError, ON_POSIX, ON_WINDOWS
from xonsh.inspectors import Inspector
from xonsh.environ import default_env
from xonsh.environ import Env, default_env
from xonsh.aliases import DEFAULT_ALIASES, bash_aliases
from xonsh.jobs import add_job, wait_for_active_job
from xonsh.proc import ProcProxy, SimpleProcProxy
@ -28,126 +27,6 @@ from xonsh.proc import ProcProxy, SimpleProcProxy
ENV = None
BUILTINS_LOADED = False
INSPECTOR = Inspector()
LOCALE_CATS = {
'LC_CTYPE': locale.LC_CTYPE,
'LC_COLLATE': locale.LC_COLLATE,
'LC_NUMERIC': locale.LC_NUMERIC,
'LC_MONETARY': locale.LC_MONETARY,
'LC_TIME': locale.LC_TIME
}
try:
LOCALE_CATS['LC_MESSAGES'] = locale.LC_MESSAGES
except AttributeError:
pass
class Env(MutableMapping):
"""A xonsh environment, whose variables have limited typing
(unlike BASH). Most variables are, by default, strings (like BASH).
However, the following rules also apply based on variable-name:
* PATH: any variable whose name ends in PATH is a list of strings.
* XONSH_HISTORY_SIZE: this variable is an int.
* LC_* (locale categories): locale catergory names get/set the Python
locale via locale.getlocale() and locale.setlocale() functions.
An Env instance may be converted to an untyped version suitable for
use in a subprocess.
"""
_arg_regex = re.compile(r'ARG(\d+)')
def __init__(self, *args, **kwargs):
"""If no initial environment is given, os.environ is used."""
self._d = {}
if len(args) == 0 and len(kwargs) == 0:
args = (os.environ, )
for key, val in dict(*args, **kwargs).items():
self[key] = val
self._detyped = None
self._orig_env = None
def detype(self):
if self._detyped is not None:
return self._detyped
ctx = {}
for key, val in self._d.items():
if callable(val) or isinstance(val, MutableMapping):
continue
if not isinstance(key, string_types):
key = str(key)
if 'PATH' in key:
val = os.pathsep.join(val)
elif not isinstance(val, string_types):
val = str(val)
ctx[key] = val
self._detyped = ctx
return ctx
def replace_env(self):
"""Replaces the contents of os.environ with a detyped version
of the xonsh environement.
"""
if self._orig_env is None:
self._orig_env = dict(os.environ)
os.environ.clear()
os.environ.update(self.detype())
def undo_replace_env(self):
"""Replaces the contents of os.environ with a detyped version
of the xonsh environement.
"""
if self._orig_env is not None:
os.environ.clear()
os.environ.update(self._orig_env)
self._orig_env = None
#
# Mutable mapping interface
#
def __getitem__(self, key):
m = self._arg_regex.match(key)
if (m is not None) and (key not in self._d) and ('ARGS' in self._d):
args = self._d['ARGS']
ix = int(m.group(1))
if ix >= len(args):
e = "Not enough arguments given to access ARG{0}."
raise IndexError(e.format(ix))
return self._d['ARGS'][ix]
val = self._d[key]
if isinstance(val, (MutableSet, MutableSequence, MutableMapping)):
self._detyped = None
return self._d[key]
def __setitem__(self, key, val):
if isinstance(key, string_types) and 'PATH' in key:
val = val.split(os.pathsep) if isinstance(val, string_types) \
else val
elif key == 'XONSH_HISTORY_SIZE' and not isinstance(val, int):
val = int(val)
elif key in LOCALE_CATS:
locale.setlocale(LOCALE_CATS[key], val)
val = locale.setlocale(LOCALE_CATS[key])
self._d[key] = val
self._detyped = None
def __delitem__(self, key):
del self._d[key]
self._detyped = None
def __iter__(self):
yield from self._d
def __len__(self):
return len(self._d)
def __str__(self):
return str(self._d)
def __repr__(self):
return '{0}.{1}({2})'.format(self.__class__.__module__,
self.__class__.__name__, self._d)
class Aliases(MutableMapping):
@ -298,7 +177,6 @@ def reglob(path, parts=None, i=None):
return paths
def regexpath(s):
"""Takes a regular expression string and returns a list of file
paths that match the regex.

View file

@ -1,17 +1,21 @@
"""Environment for the xonsh shell.
"""
"""Environment for the xonsh shell."""
import os
import re
import locale
import socket
import string
import locale
import builtins
import subprocess
from warnings import warn
from functools import wraps
from collections import MutableMapping, MutableSequence, MutableSet, \
defaultdict, namedtuple
from xonsh import __version__ as XONSH_VERSION
from xonsh.tools import TERM_COLORS, ON_WINDOWS, ON_MAC
from xonsh.tools import TERM_COLORS, ON_WINDOWS, ON_MAC, string_types, is_int,\
always_true, always_false, ensure_string, is_env_path, str_to_env_path, \
env_path_to_str
from xonsh.dirstack import _get_cwd
try:
@ -19,6 +23,164 @@ try:
except ImportError:
hglib = None
LOCALE_CATS = {
'LC_CTYPE': locale.LC_CTYPE,
'LC_COLLATE': locale.LC_COLLATE,
'LC_NUMERIC': locale.LC_NUMERIC,
'LC_MONETARY': locale.LC_MONETARY,
'LC_TIME': locale.LC_TIME,
}
try:
LOCALE_CATS['LC_MESSAGES'] = locale.LC_MESSAGES
except AttributeError:
pass
def locale_convert(key):
"""Creates a converter for a locale key."""
def lc_converter(val):
locale.setlocale(LOCALE_CATS[key], val)
val = locale.setlocale(LOCALE_CATS[key])
return val
return lc_converter
Ensurer = namedtuple('Ensurer', ['validate', 'convert', 'detype'])
Ensurer.__doc__ = """Named tuples whose elements are functions that
represent environment variable validation, conversion, detyping.
"""
DEFAULT_ENSURERS = {
re.compile('\w*PATH'): (is_env_path, str_to_env_path, env_path_to_str),
'LC_CTYPE': (always_false, locale_convert('LC_CTYPE'), ensure_string),
'LC_MESSAGES': (always_false, locale_convert('LC_MESSAGES'), ensure_string),
'LC_COLLATE': (always_false, locale_convert('LC_COLLATE'), ensure_string),
'LC_NUMERIC': (always_false, locale_convert('LC_NUMERIC'), ensure_string),
'LC_MONETARY': (always_false, locale_convert('LC_MONETARY'), ensure_string),
'LC_TIME': (always_false, locale_convert('LC_TIME'), ensure_string),
'XONSH_HISTORY_SIZE': (is_int, int, str),
}
class Env(MutableMapping):
"""A xonsh environment, whose variables have limited typing
(unlike BASH). Most variables are, by default, strings (like BASH).
However, the following rules also apply based on variable-name:
* PATH: any variable whose name ends in PATH is a list of strings.
* XONSH_HISTORY_SIZE: this variable is an int.
* LC_* (locale categories): locale catergory names get/set the Python
locale via locale.getlocale() and locale.setlocale() functions.
An Env instance may be converted to an untyped version suitable for
use in a subprocess.
"""
_arg_regex = re.compile(r'ARG(\d+)')
def __init__(self, *args, **kwargs):
"""If no initial environment is given, os.environ is used."""
self._d = {}
self.ensurers = {k: Ensurer(*v) for k, v in DEFAULT_ENSURERS.items()}
if len(args) == 0 and len(kwargs) == 0:
args = (os.environ, )
for key, val in dict(*args, **kwargs).items():
self[key] = val
self._detyped = None
self._orig_env = None
def detype(self):
if self._detyped is not None:
return self._detyped
ctx = {}
for key, val in self._d.items():
if callable(val) or isinstance(val, MutableMapping):
continue
if not isinstance(key, string_types):
key = str(key)
ensurer = self.get_ensurer(key)
val = ensurer.detype(val)
ctx[key] = val
self._detyped = ctx
return ctx
def replace_env(self):
"""Replaces the contents of os.environ with a detyped version
of the xonsh environement.
"""
if self._orig_env is None:
self._orig_env = dict(os.environ)
os.environ.clear()
os.environ.update(self.detype())
def undo_replace_env(self):
"""Replaces the contents of os.environ with a detyped version
of the xonsh environement.
"""
if self._orig_env is not None:
os.environ.clear()
os.environ.update(self._orig_env)
self._orig_env = None
def get_ensurer(self, key,
default=Ensurer(always_true, None, ensure_string)):
"""Gets an ensurer for the given key."""
if key in self.ensurers:
return self.ensurers[key]
for k, ensurer in self.ensurers.items():
if isinstance(k, string_types):
continue
m = k.match(key)
if m is not None:
ens = ensurer
break
else:
ens = default
self.ensurers[key] = ens
return ens
#
# Mutable mapping interface
#
def __getitem__(self, key):
m = self._arg_regex.match(key)
if (m is not None) and (key not in self._d) and ('ARGS' in self._d):
args = self._d['ARGS']
ix = int(m.group(1))
if ix >= len(args):
e = "Not enough arguments given to access ARG{0}."
raise IndexError(e.format(ix))
return self._d['ARGS'][ix]
val = self._d[key]
if isinstance(val, (MutableSet, MutableSequence, MutableMapping)):
self._detyped = None
return self._d[key]
def __setitem__(self, key, val):
ensurer = self.get_ensurer(key)
if not ensurer.validate(val):
val = ensurer.convert(val)
self._d[key] = val
self._detyped = None
def __delitem__(self, key):
del self._d[key]
self._detyped = None
def __iter__(self):
yield from self._d
def __len__(self):
return len(self._d)
def __str__(self):
return str(self._d)
def __repr__(self):
return '{0}.{1}({2})'.format(self.__class__.__module__,
self.__class__.__name__, self._d)
def ensure_git(func):
@wraps(func)
def wrapper(*args, **kwargs):
@ -168,10 +330,10 @@ DEFAULT_PROMPT = ('{BOLD_GREEN}{user}@{hostname}{BOLD_BLUE} '
DEFAULT_TITLE = '{user}@{hostname}: {cwd} | xonsh'
def _replace_home(x):
if ON_WINDOWS:
home = builtins.__xonsh_env__['HOMEDRIVE'] + builtins.__xonsh_env__['HOMEPATH'][0]
home = (builtins.__xonsh_env__['HOMEDRIVE'] +
builtins.__xonsh_env__['HOMEPATH'][0])
return x.replace(home, '~')
else:
return x.replace(builtins.__xonsh_env__['HOME'], '~')
@ -243,6 +405,8 @@ BASE_ENV = {
'LC_TIME': locale.setlocale(locale.LC_TIME),
'LC_MONETARY': locale.setlocale(locale.LC_MONETARY),
'LC_NUMERIC': locale.setlocale(locale.LC_NUMERIC),
'PROMPT_TOOLKIT_SHELL': False,
'HIGHLIGHTING_LEXER': None,
}
try:
@ -327,7 +491,6 @@ def default_env(env=None):
ctx['PWD'] = _get_cwd()
if env is not None:
ctx.update(env)
return ctx

89
xonsh/history.py Normal file
View file

@ -0,0 +1,89 @@
"""History object for use with prompt_toolkit."""
import os
from prompt_toolkit.history import History
class LimitedFileHistory(History):
"""History class that keeps entries in file with limit on number of those.
It handles only one-line entries.
"""
def __init__(self):
"""Initializes history object."""
super().__init__()
self.new_entries = []
self.old_history = []
def append(self, entry):
"""Appends new entry to the history.
Entry sould be a one-liner.
"""
super().append(entry)
self.new_entries.append(entry)
def read_history_file(self, filename):
"""Reads history from given file into memory.
It first discards all history entries that were read by this function
before, and then tries to read entries from filename as history of
commands that happend before current session.
Entries that were appendend in current session are left unharmed.
Parameters
----------
filename : str
Path to history file.
"""
self.old_history = []
self._load(self.old_history, filename)
self.strings = self.old_history[:]
self.strings.extend(self.new_entries)
def save_history_to_file(self, filename, limit=-1):
"""Saves history to file.
It first reads existing history file again, so nothing is overrided. If
combined number of entries from history file and current session
exceeds limit old entries are dropped.
Not thread safe.
Parameters
----------
filename : str
Path to file to save history to.
limit : int
Limit on number of entries in history file. Negative values imply
unlimited history.
"""
def write_list(lst, file):
text = ('\n'.join(lst)) + '\n'
file.write(text.encode('utf-8'))
if limit < 0:
with open(filename, 'ab') as hf:
write_list(new_entries, hf)
return
new_history = []
self._load(new_history, filename)
if len(new_history) + len(self.new_entries) <= limit:
with open(filename, 'ab') as hf:
write_list(self.new_entries, hf)
else:
new_history.extend(self.new_entries)
with open(filename, 'wb') as hf:
write_list(new_history[-limit:], hf)
def _load(self, store, filename):
"""Loads content of file filename into list store."""
if os.path.exists(filename):
with open(filename, 'rb') as hf:
for line in hf:
line = line.decode('utf-8')
# Drop trailing newline
store.append(line[:-1])

View file

@ -21,6 +21,13 @@ parser.add_argument('--no-rc',
dest='norc',
action='store_true',
default=False)
parser.add_argument('-D',
dest='defines',
help='define an environment variable, in the form of '
'-DNAME=VAL. May be used many times.',
metavar='ITEM',
nargs='*',
default=None)
parser.add_argument('file',
metavar='script-file',
help='If present, execute the script in script-file'
@ -41,6 +48,8 @@ def main(argv=None):
shell = Shell() if not args.norc else Shell(ctx={})
from xonsh import imphooks
env = builtins.__xonsh_env__
if args.defines is not None:
env.update([x.split('=', 1) for x in args.defines])
env['XONSH_INTERACTIVE'] = False
if args.command is not None:
# run a single command and exit

View file

@ -0,0 +1,31 @@
"""Completer implementation to use with prompt_toolkit."""
from prompt_toolkit.completion import Completer, Completion
class PromptToolkitCompleter(Completer):
"""Simple prompt_toolkit Completer object.
It just redirects requests to normal Xonsh completer.
"""
def __init__(self, completer, ctx):
"""Takes instance of xonsh.completer.Completer and dict with context."""
self.completer = completer
self.ctx = ctx
def get_completions(self, document, complete_event):
"""Returns a generator for list of completions."""
line = document.current_line
endidx = document.cursor_position_col
space_pos = document.find_backwards(' ')
if space_pos is None:
begidx = 0
else:
begidx = space_pos + endidx + 1
prefix = line[begidx:endidx + 1]
completions = self.completer.complete(prefix,
line,
begidx,
endidx,
self.ctx)
for comp in completions:
yield Completion(comp, -len(prefix))

View file

@ -0,0 +1,111 @@
"""The prompt_toolkit based xonsh shell"""
import os
import builtins
from warnings import warn
from prompt_toolkit.shortcuts import create_cli, create_eventloop
from pygments.token import Token
from xonsh.base_shell import BaseShell
from xonsh.history import LimitedFileHistory
from xonsh.pyghooks import XonshLexer
from xonsh.tools import format_prompt_for_prompt_toolkit
from xonsh.prompt_toolkit_completer import PromptToolkitCompleter
def setup_history():
"""Creates history object."""
env = builtins.__xonsh_env__
hfile = env.get('XONSH_HISTORY_FILE',
os.path.expanduser('~/.xonsh_history'))
history = LimitedFileHistory()
try:
history.read_history_file(hfile)
except PermissionError:
warn('do not have read permissions for ' + hfile, RuntimeWarning)
return history
def teardown_history(history):
"""Tears down the history object."""
env = builtins.__xonsh_env__
hsize = env.get('XONSH_HISTORY_SIZE', 8128)
hfile = env.get('XONSH_HISTORY_FILE',
os.path.expanduser('~/.xonsh_history'))
try:
history.save_history_to_file(hfile, hsize)
except PermissionError:
warn('do not have write permissions for ' + hfile, RuntimeWarning)
def get_user_input(get_prompt_tokens,
history=None,
lexer=None,
completer=None):
"""Customized function that mostly mimics promp_toolkit's get_input.
Main difference between this and prompt_toolkit's get_input() is that it
allows to pass get_tokens() function instead of text prompt.
"""
eventloop = create_eventloop()
cli = create_cli(
eventloop,
lexer=lexer,
completer=completer,
history=history,
get_prompt_tokens=get_prompt_tokens)
try:
document = cli.read_input()
if document:
return document.text
finally:
eventloop.close()
class PromptToolkitShell(BaseShell):
"""The xonsh shell."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.history = setup_history()
self.pt_completer = PromptToolkitCompleter(self.completer, self.ctx)
def __del__(self):
if self.history is not None:
teardown_history(self.history)
def cmdloop(self, intro=None):
"""Enters a loop that reads and execute input from user."""
if intro:
print(intro)
while not builtins.__xonsh_exit__:
try:
line = get_user_input(
get_prompt_tokens=self._get_prompt_tokens(),
completer=self.pt_completer,
history=self.history,
lexer=self.lexer)
if not line:
self.emptyline()
else:
line = self.precmd(line)
self.default(line)
except KeyboardInterrupt:
self.reset_buffer()
except EOFError:
break
def _get_prompt_tokens(self):
"""Returns function to pass as prompt to prompt_toolkit."""
def get_tokens(cli):
return [(Token.Prompt,
format_prompt_for_prompt_toolkit(self.prompt))]
return get_tokens
@property
def lexer(self):
"""Obtains the current lexer."""
env = builtins.__xonsh_env__
return env['HIGHLIGHTING_LEXER']

127
xonsh/readline_shell.py Normal file
View file

@ -0,0 +1,127 @@
"""The readline based xonsh shell"""
import os
import builtins
from cmd import Cmd
from warnings import warn
from xonsh.base_shell import BaseShell
RL_COMPLETION_SUPPRESS_APPEND = RL_LIB = None
RL_CAN_RESIZE = False
def setup_readline():
"""Sets up the readline module and completion supression, if available."""
global RL_COMPLETION_SUPPRESS_APPEND, RL_LIB, RL_CAN_RESIZE
if RL_COMPLETION_SUPPRESS_APPEND is not None:
return
try:
import readline
except ImportError:
return
import ctypes
import ctypes.util
readline.set_completer_delims(' \t\n')
if not readline.__file__.endswith('.py'):
RL_LIB = lib = ctypes.cdll.LoadLibrary(readline.__file__)
try:
RL_COMPLETION_SUPPRESS_APPEND = ctypes.c_int.in_dll(
lib, 'rl_completion_suppress_append')
except ValueError:
# not all versions of readline have this symbol, ie Macs sometimes
RL_COMPLETION_SUPPRESS_APPEND = None
RL_CAN_RESIZE = hasattr(lib, 'rl_reset_screen_size')
# reads in history
env = builtins.__xonsh_env__
hf = env.get('XONSH_HISTORY_FILE', os.path.expanduser('~/.xonsh_history'))
if os.path.isfile(hf):
try:
readline.read_history_file(hf)
except PermissionError:
warn('do not have read permissions for ' + hf, RuntimeWarning)
hs = env.get('XONSH_HISTORY_SIZE', 8128)
readline.set_history_length(hs)
# sets up IPython-like history matching with up and down
readline.parse_and_bind('"\e[B": history-search-forward')
readline.parse_and_bind('"\e[A": history-search-backward')
# Setup Shift-Tab to indent
readline.parse_and_bind('"\e[Z": "{0}"'.format(env.get('INDENT', '')))
# handle tab completion differences found in libedit readline compatibility
# as discussed at http://stackoverflow.com/a/7116997
if readline.__doc__ and 'libedit' in readline.__doc__:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind("tab: complete")
def teardown_readline():
"""Tears down up the readline module, if available."""
try:
import readline
except ImportError:
return
env = builtins.__xonsh_env__
hs = env.get('XONSH_HISTORY_SIZE', 8128)
readline.set_history_length(hs)
hf = env.get('XONSH_HISTORY_FILE', os.path.expanduser('~/.xonsh_history'))
try:
readline.write_history_file(hf)
except PermissionError:
warn('do not have write permissions for ' + hf, RuntimeWarning)
def rl_completion_suppress_append(val=1):
"""Sets the rl_completion_suppress_append varaiable, if possible.
A value of 1 (default) means to suppress, a value of 0 means to enable.
"""
if RL_COMPLETION_SUPPRESS_APPEND is None:
return
RL_COMPLETION_SUPPRESS_APPEND.value = val
class ReadlineShell(BaseShell, Cmd):
"""The readline based xonsh shell."""
def __init__(self, completekey='tab', stdin=None, stdout=None, **kwargs):
super().__init__(completekey=completekey,
stdin=stdin,
stdout=stdout,
**kwargs)
setup_readline()
def __del__(self):
teardown_readline()
def parseline(self, line):
"""Overridden to no-op."""
return '', line, line
def completedefault(self, text, line, begidx, endidx):
"""Implements tab-completion for text."""
rl_completion_suppress_append() # this needs to be called each time
return self.completer.complete(text, line,
begidx, endidx,
ctx=self.ctx)
# tab complete on first index too
completenames = completedefault
def cmdloop(self, intro=None):
while not builtins.__xonsh_exit__:
try:
super().cmdloop(intro=intro)
except KeyboardInterrupt:
print() # Gives a newline
self.reset_buffer()
intro = None
@property
def prompt(self):
"""Obtains the current prompt string."""
global RL_LIB, RL_CAN_RESIZE
if RL_CAN_RESIZE:
# This is needed to support some system where line-wrapping doesn't
# work. This is a bug in upstream Python, or possibly readline.
RL_LIB.rl_reset_screen_size()
return super().prompt

View file

@ -1,99 +1,42 @@
"""The xonsh shell"""
import os
import sys
import builtins
import traceback
from cmd import Cmd
from warnings import warn
from argparse import Namespace
from xonsh.execer import Execer
from xonsh.completer import Completer
from xonsh.tools import XonshError, escape_windows_title_string
from xonsh.tools import ON_WINDOWS
from xonsh.environ import xonshrc_context, multiline_prompt, format_prompt
from xonsh.environ import xonshrc_context
RL_COMPLETION_SUPPRESS_APPEND = RL_LIB = None
RL_CAN_RESIZE = False
def setup_readline():
"""Sets up the readline module and completion supression, if available."""
global RL_COMPLETION_SUPPRESS_APPEND, RL_LIB, RL_CAN_RESIZE
if RL_COMPLETION_SUPPRESS_APPEND is not None:
return
def is_prompt_toolkit_available():
"""Checks if prompt_toolkit is available to import."""
try:
import readline
import prompt_toolkit
return True
except ImportError:
return
import ctypes
import ctypes.util
readline.set_completer_delims(' \t\n')
if not readline.__file__.endswith('.py'):
RL_LIB = lib = ctypes.cdll.LoadLibrary(readline.__file__)
try:
RL_COMPLETION_SUPPRESS_APPEND = ctypes.c_int.in_dll(
lib, 'rl_completion_suppress_append')
except ValueError:
# not all versions of readline have this symbol, ie Macs sometimes
RL_COMPLETION_SUPPRESS_APPEND = None
RL_CAN_RESIZE = hasattr(lib, 'rl_reset_screen_size')
# reads in history
env = builtins.__xonsh_env__
hf = env.get('XONSH_HISTORY_FILE', os.path.expanduser('~/.xonsh_history'))
if os.path.isfile(hf):
try:
readline.read_history_file(hf)
except PermissionError:
warn('do not have read permissions for ' + hf, RuntimeWarning)
hs = env.get('XONSH_HISTORY_SIZE', 8128)
readline.set_history_length(hs)
# sets up IPython-like history matching with up and down
readline.parse_and_bind('"\e[B": history-search-forward')
readline.parse_and_bind('"\e[A": history-search-backward')
# Setup Shift-Tab to indent
readline.parse_and_bind('"\e[Z": "{0}"'.format(env.get('INDENT', '')))
# handle tab completion differences found in libedit readline compatibility
# as discussed at http://stackoverflow.com/a/7116997
if readline.__doc__ and 'libedit' in readline.__doc__:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind("tab: complete")
return False
def teardown_readline():
"""Tears down up the readline module, if available."""
try:
import readline
except ImportError:
return
env = builtins.__xonsh_env__
hs = env.get('XONSH_HISTORY_SIZE', 8128)
readline.set_history_length(hs)
hf = env.get('XONSH_HISTORY_FILE', os.path.expanduser('~/.xonsh_history'))
try:
readline.write_history_file(hf)
except PermissionError:
warn('do not have write permissions for ' + hf, RuntimeWarning)
class Shell(object):
"""Main xonsh shell.
def rl_completion_suppress_append(val=1):
"""Sets the rl_completion_suppress_append varaiable, if possible.
A value of 1 (default) means to suppress, a value of 0 means to enable.
Initializes execution environment and decides if prompt_toolkit or
readline version of shell should be used.
"""
if RL_COMPLETION_SUPPRESS_APPEND is None:
return
RL_COMPLETION_SUPPRESS_APPEND.value = val
def __init__(self, ctx=None, **kwargs):
self._init_environ(ctx)
env = builtins.__xonsh_env__
if is_prompt_toolkit_available() and env['PROMPT_TOOLKIT_SHELL']:
from xonsh.prompt_toolkit_shell import PromptToolkitShell
self.shell = PromptToolkitShell(execer=self.execer,
ctx=self.ctx, **kwargs)
else:
from xonsh.readline_shell import ReadlineShell
self.shell = ReadlineShell(execer=self.execer,
ctx=self.ctx, **kwargs)
class Shell(Cmd):
"""The xonsh shell."""
def __getattr__(self, attr):
"""Delegates calls to appropriate shell instance."""
return getattr(self.shell, attr)
def __init__(self, completekey='tab', stdin=None, stdout=None, ctx=None):
super(Shell, self).__init__(completekey=completekey,
stdin=stdin,
stdout=stdout)
def _init_environ(self, ctx):
self.execer = Execer()
env = builtins.__xonsh_env__
if ctx is not None:
@ -103,123 +46,3 @@ class Shell(Cmd):
self.ctx = xonshrc_context(rcfile=rc, execer=self.execer)
builtins.__xonsh_ctx__ = self.ctx
self.ctx['__name__'] = '__main__'
self.completer = Completer()
self.buffer = []
self.need_more_lines = False
self.mlprompt = None
setup_readline()
def __del__(self):
teardown_readline()
def emptyline(self):
"""Called when an empty line has been entered."""
self.need_more_lines = False
self.default('')
def parseline(self, line):
"""Overridden to no-op."""
return '', line, line
def precmd(self, line):
return line if self.need_more_lines else line.lstrip()
def default(self, line):
"""Implements code execution."""
line = line if line.endswith('\n') else line + '\n'
code = self.push(line)
if code is None:
return
try:
self.execer.exec(code, mode='single', glbs=self.ctx) # no locals
except XonshError as e:
print(e.args[0], file=sys.stderr, end='')
except:
traceback.print_exc()
if builtins.__xonsh_exit__:
return True
def push(self, line):
"""Pushes a line onto the buffer and compiles the code in a way that
enables multiline input.
"""
code = None
self.buffer.append(line)
if self.need_more_lines:
return code
src = ''.join(self.buffer)
try:
code = self.execer.compile(src,
mode='single',
glbs=None,
locs=self.ctx)
self.reset_buffer()
except SyntaxError:
if line == '\n':
self.reset_buffer()
traceback.print_exc()
return None
self.need_more_lines = True
return code
def reset_buffer(self):
"""Resets the line buffer."""
self.buffer.clear()
self.need_more_lines = False
self.mlprompt = None
def completedefault(self, text, line, begidx, endidx):
"""Implements tab-completion for text."""
rl_completion_suppress_append() # this needs to be called each time
return self.completer.complete(text, line,
begidx, endidx,
ctx=self.ctx)
# tab complete on first index too
completenames = completedefault
def cmdloop(self, intro=None):
while not builtins.__xonsh_exit__:
try:
super(Shell, self).cmdloop(intro=intro)
except KeyboardInterrupt:
print() # Gives a newline
self.reset_buffer()
intro = None
def settitle(self):
env = builtins.__xonsh_env__
term = env.get('TERM', None)
if term is None or term == 'linux':
return
if 'TITLE' in env:
t = env['TITLE']
else:
return
t = format_prompt(t)
if ON_WINDOWS and 'ANSICON' not in env:
t = escape_windows_title_string(t)
os.system('title {}'.format(t))
else:
sys.stdout.write("\x1b]2;{0}\x07".format(t))
@property
def prompt(self):
"""Obtains the current prompt string."""
global RL_LIB, RL_CAN_RESIZE
if RL_CAN_RESIZE:
# This is needed to support some system where line-wrapping doesn't
# work. This is a bug in upstream Python, or possibly readline.
RL_LIB.rl_reset_screen_size()
if self.need_more_lines:
if self.mlprompt is None:
self.mlprompt = multiline_prompt()
return self.mlprompt
env = builtins.__xonsh_env__
if 'PROMPT' in env:
p = env['PROMPT']
p = format_prompt(p)
else:
p = "set '$PROMPT = ...' $ "
self.settitle()
return p

View file

@ -21,7 +21,7 @@ import re
import sys
import builtins
import platform
from collections import OrderedDict
from collections import OrderedDict, Sequence
if sys.version_info[0] >= 3:
string_types = (str, bytes)
@ -374,7 +374,7 @@ def suggestion_sort_helper(x, y):
def escape_windows_title_string(s):
"""Returns a string that is usable by the Windows cmd.exe title
"""Returns a string that is usable by the Windows cmd.exe title
builtin. The escaping is based on details here and emperical testing:
http://www.robvanderwoude.com/escapechars.php
"""
@ -383,3 +383,124 @@ def escape_windows_title_string(s):
s = s.replace('/?', '/.')
return s
#
# Validators and contervers
#
def is_int(x):
"""Tests if something is an integer"""
return isinstance(x, int)
def always_true(x):
"""Returns True"""
return True
def always_false(x):
"""Returns False"""
return False
def ensure_string(x):
"""Returns a string if x is not a string, and x if it alread is."""
if isinstance(x, string_types):
return x
else:
return str(x)
def is_env_path(x):
"""This tests if something is an environment path, ie a list of strings."""
if isinstance(x, string_types):
return False
else:
return isinstance(x, Sequence) and \
all([isinstance(a, string_types) for a in x])
def str_to_env_path(x):
"""Converts a string to an environment path, ie a list of strings,
splitting on the OS separator.
"""
return x.split(os.pathsep)
def env_path_to_str(x):
"""Converts an environment path to a string by joining on the OS separator.
"""
return os.pathsep.join(x)
#
# prompt toolkit tools
#
class FakeChar(str):
"""Class that holds a single char and escape sequences that surround it.
It is used as a workaround for the fact that prompt_toolkit doesn't display
colorful prompts correctly.
It behaves like normal string created with prefix + char + suffix, but has
two differences:
* len() always returns 2
* iterating over instance of this class is the same as iterating over
the single char - prefix and suffix are ommited.
"""
def __new__(cls, char, prefix='', suffix=''):
return str.__new__(cls, prefix + char + suffix)
def __init__(self, char, prefix='', suffix=''):
self.char = char
self.prefix = prefix
self.suffix = suffix
self.length = 2
self.iterated = False
def __len__(self):
return self.length
def __iter__(self):
return iter(self.char)
RE_HIDDEN_MAX = re.compile('(\001.*?\002)+')
def format_prompt_for_prompt_toolkit(prompt):
"""Uses workaround for passing a string with color sequences.
Returns list of characters of the prompt, where some characters can be not
normal characters but FakeChars - objects that consists of one printable
character and escape sequences surrounding it.
Returned list can be later passed as a prompt to prompt_toolkit.
If prompt contains no printable characters returns equivalent of empty
string.
"""
def append_escape_seq(lst, suffix):
last = lst.pop()
if isinstance(last, FakeChar):
lst.append(FakeChar(last.char, prefix=last.prefix, suffix=suffix))
else:
lst.append(FakeChar(last, suffix=suffix))
pos = 0
match = RE_HIDDEN_MAX.search(prompt, pos)
if match and match.group(0) == prompt:
return ['']
formatted_prompt = []
while match:
formatted_prompt.extend(list(prompt[pos:match.start()]))
pos = match.end()
if not formatted_prompt:
formatted_prompt.append(FakeChar(prompt[pos],
prefix=match.group(0)))
pos += 1
else:
append_escape_seq(formatted_prompt, match.group(0))
match = RE_HIDDEN_MAX.search(prompt, pos)
formatted_prompt.extend(list(prompt[pos:]))
return formatted_prompt