Rebased with upstream/master

This commit is contained in:
Travis Shirk 2015-11-21 14:14:49 -07:00
commit 63971cd691
58 changed files with 658 additions and 312 deletions

50
Makefile Normal file
View file

@ -0,0 +1,50 @@
SHELL = /bin/sh
which = edited # which tests to run
all:
@echo "You must specifiy a make target."
@echo "Targets are:"
@echo " clean"
@echo " lint"
@echo " lint-all"
@echo " test"
@echo " test-all"
@echo " build-tables"
clean:
rm -f xonsh/lexer_table.py xonsh/parser_table.py
rm -f xonsh/lexer_test_table.py xonsh/parser_test_table.py
rm -f xonsh/*.pyc tests/*.pyc
rm -f xonsh/*.rej tests/*.rej
rm -fr build
# Line just the changed python files. It doesn't matter if "git add" has
# already been done but obviously if you've already done "git commit" then
# they're no longer consider changed. This should be run (along with "make
# test") before commiting a set of changes.
lint:
pylint $$(git status -s | awk '/\.py$$/ { print $$2 }' | sort)
# Lint all the python files.
lint-all:
make clean
pylint $$(find tests xonsh -name \*.py | sort)
# Test just the changed python files. It doesn't matter if "git add" has
# already been done but obviously if you've already done "git commit" then
# they're no longer consider changed. This should be run (along with "make
# lint") before commiting a set of changes. You can also pass a list of test
# names via a "which=name1 name2..." argument.
test:
scripts/run_tests.xsh $(which)
# Test all the python files.
test-all:
scripts/run_tests.xsh all
# Build the parser_table.py module. This is normally done by setup.py at
# install time. This just makes it easy to create the parser module on the fly
# to facilitate development.
build-tables:
python3 -c 'import setup; setup.build_tables()'

View file

@ -61,22 +61,78 @@ is open to interpretation.
unittest directly or write tests in an object-oriented style.
* Test generators make more dots and the dots must flow!
You can easily check for style issues, including some outright bugs such
as mispelled variable names, using pylint. If you're using Anaconda you'll
need to run "conda install pylint" once. You can easily run pylint on
the edited files in your uncommited git change:
$ make lint
If you want to lint the entire code base run:
$ make lint-all
How to Test
================
First, install nose: http://nose.readthedocs.org/en/latest/
First, install nose: http://nose.readthedocs.org/en/latest/. Second, ensure
your cwd is the root directory of the project (i.e., the one containing the
.git directory).
To perform all unit tests::
To perform all unit tests:
$ cd tests/
$ nosetests
$ make test-all
This will recursively look through the currently directory, open up every file
named test_* and run every function (or method) named test_*.
Or, if if make isn't available to you (e.g., you're on MS-Windows) invoke
the script directly:
Nosetests can also take file(s) as an argument. For example, to run just the
lexer and parser module tests::
$ scripts/run-tests.xsh
$ nosetests test_lexer.py test_parser.py
Or, you can do it the hard way:
$ python3 -c 'import setup; setup.build_tables()'
$ env XONSHRC='' nosetests
If you want to test the xonsh code that is installed on your system first
cd into the ``tests`` directory then run the tests:
$ cd tests
$ env XONSHRC='' nosetests
If you're working on a change and haven't yet committed it you can run the
tests associated with the change. This does not require that the change
include the unit test module. This will execute any unit tests that are
part of the change as well as the unit tests for xonsh source modules in
the change:
$ make test
If you want to run specific tests you can specify the test names to
execute. For example to run test_aliases:
$ make test which=aliases
Or, if make isn't available run the test script directly:
$ scripts/run_tests.xsh aliases
The test name can be the bare test name (e.g., ``aliases``), include
the ``test_`` prefix and ``.py`` suffix without the directory
(e.g., ``test_aliases.py``), or the complete relative path (e.g.,
``tests/test_aliases.py``). For example:
Note that you can pass multiple test names in the above examples:
$ make test which='aliases environ'
Or:
$ scripts/run_tests.xsh aliases environ
As before, if you want to test the xonsh code that is installed on your
system first cd into the `tests` directory then run the tests:
$ cd tests
$ env XONSHRC='' nosetests test_aliases.py test_environ.py
Happy testing!
@ -247,7 +303,10 @@ When releasing xonsh, make sure to do the following items in order:
--------------------
Maintenance Tasks
--------------------
None currently.
You can cleanup your local repository of transient files such as \*.pyc files
created by unit testing by running:
$ make clean
-----------------------
Performing the Release

View file

@ -116,7 +116,7 @@ applicable.
- ``False``
- Whether or not to supress directory stack manipulation output.
* - SHELL_TYPE
- ``'readline'``
- ``'prompt_toolkit'`` if on Windows, otherwise ``'readline'``
- Which shell is used. Currently two shell types are supported: ``'readline'`` that
is backed by Python's readline module, and ``'prompt_toolkit'`` that uses
external library of the same name. For using prompt_toolkit shell you need

View file

@ -230,7 +230,8 @@ Xonsh currently has the following external dependencies,
*Documentation:*
#. Sphinx
#. `Sphinx <http://sphinx-doc.org/>` (which uses
`reStructuredText <http://sphinx-doc.org/rest.html>`)
#. Numpydoc
#. Cloud Sphinx Theme

37
logo.txt Normal file
View file

@ -0,0 +1,37 @@
╓██▄
╙██▀██╕
▐██4Φ█▀█▌
²██▄███▀██^██
-███╩▀ " ╒▄█████▀█
║██▀▀W╤▄▀ ▐║█╘ ╝█
▄m▀%Φ▀▀ ╝*" ,α█████▓▄,▄▀Γ"▀╕
"▀██¼" ▄═╦█╟║█▀ ╓ `^` ,▄ ╢╕
,▀╫M█▐j╓╟▀ ╔▓▄█▀ '║ ╔ ╣║▌ ▀▄
▄m▀▀███╬█╝▀ █▀^ "ÜM j▐╟╫╨▒ ╙▀≡═╤═m▀╗
█æsæ╓ ╕, ,▄Ä ▐'╕H LU ║║╠╫Å^2=⌐ █
▄æ%Å███╠█ª╙▄█▀ $1╙ ║║╟╫╩*T▄ ▌
╙╗%▄,╦██▌█▌█╢M ╕ M║║║║█═⌐ⁿ"^ ╫
╙╣▀████@█░█ ▌╕╕ ` ▌║▐▐║█D═≈⌐¬ⁿ s ║⌐
╙╬███▓║█` ▌╚ ╕ ╕▌║▐▐╣▌⌐*▒▒Dù` ▐▌
╙╬██╨U█ ╟ $ ▌ ▌▌▐▐▐M█▄═≤⌐% ╓⌐ ▌
║║█▄▌║ ╟ ▌ ▌M▐▐▐M█▀▒▒▒22, ▐▌
███╙^▌ ║ ▌ ⌐M▐▐▐M█≤⌐⌐¬── ▐M
║██ ▌╙ ╓ H║ ▌╒ M║▐▐M█"^^^^^"ⁿ ║
██╕╙@▓ ╕ ▌║ H' ║▐▐▐█══=.,,, █
╙█▓╔╚╚█ ╠ ▌└╒ ▌▐ ╚║║║▀****ⁿ - ╓▌
╙█▌¼V╚▌ ▌ ╕ ▌ ║╒ ║ ▌▒╠█▀≤≤≤≤≤⌐ █
╙█▌╔█╚▌ ┘ M ▌║ ╫ UUM██J^^" ▐▌
╙██╙█╙▌ ╕$j ▐⌐▌ ▌║╝╟█Å%%%≈═ █
╙╣█╣█^▌ ╠║▐ ║ ▌▐.DU██^[""ⁿ -╒▌
▀█▄█`▌ ░M▀ ▌▐ Å£╝╝█╜%≈═╓""w ⁿ⌐ █
`▀▄▀`▌ ▌█▐⌐║▐UW╖██%≤═░*─ =z ▄Γ
╙██╙▄▌█ ▌Å╛╣██╨%╤ƒⁿ= -` ▄┘
█▌╢▓▌▌ W £6█╤,"ⁿ ` ▄≡▀▀▀
█"█▌▌╟Å╓█╓█▀%` ▄▀
╙▌██`▒U▓U█%╗* █
▌╫║ ▌ÅÅ║▀╛¬` `"█
▌╫ ╫╟ █▄ ~╦%▒╥4^
▌▌ "M█ `▀╕ X╕"╗▄▀^
█▌ ╓M ╙▀e▀▀^
╙██▄▄▀
^^

View file

@ -15,10 +15,12 @@ requirements:
build:
- python
- ply
- setuptools
- jupyter
run:
- python
- ply
- prompt_toolkit # [win]
about:
home: http://xonsh.org/

70
scripts/run_tests.xsh Executable file
View file

@ -0,0 +1,70 @@
#!/usr/bin/env xonsh --no-rc
#
# Run all the nosetests or just the ones relevant for the edited files in the
# current uncommited git change or just the ones named on the command line.
# Your cwd must be the top of the project tree.
#
# Usage:
# run_tests.xsh [edited | all | list of test names]
#
# You can omit the "all" argument if you want all tests run.
#
import os.path
import sys
if not os.path.isdir('.git'):
print('No .git directory. The cwd must be the top of the project tree.',
file=sys.stderr)
sys.exit(1)
if len($ARGS) == 1:
# Run all tests.
$[make build-tables] # ensure lexer/parser table module is up to date
$[env XONSHRC='' nosetests]
elif len($ARGS) == 2 and $ARG1 == 'all':
# Run all tests.
$[make build-tables] # ensure lexer/parser table module is up to date
$[env XONSHRC='' nosetests]
elif len($ARGS) == 2 and $ARG1 == 'edited':
# Run just the tests for the files edited in the uncommited change.
tests = set()
for edited_fname in $(git status -s).split():
if not edited_fname.endswith('.py'):
continue
if edited_fname.startswith('xonsh/'):
test_fname = 'tests/test_' + edited_fname[len('xonsh/')]
if os.path.exists(test_fname):
tests.add(test_fname)
elif edited_fname.startswith('tests/'):
tests.add(test_fname)
else:
print('Ignoring file because I cannot find a test for: {!r}.'.
format(edited_fname), file=sys.stderr)
if tests:
$[make build-tables] # ensure lexer/parser table module is up to date
$[env XONSHRC='' nosetests -v @(sorted(tests))]
else:
print('Cannot find any tests in the pending changes.', file=sys.stderr)
else:
# Run the named tests.
tests = set()
for test_fname in $ARGS[1:]:
if not test_fname.startswith('tests/'):
if not test_fname.startswith('test_'):
test_fname = 'tests/test_' + test_fname
if not test_fname.endswith('.py'):
test_fname += '.py'
if os.path.exists(test_fname):
tests.add(test_fname)
else:
print('Cannot find test module {!r}; ignoring the argument.'.
format(test_fname), file=sys.stderr)
if tests:
$[make build-tables] # ensure lexer/parser table module is up to date
$[env XONSHRC='' nosetests -v @(sorted(tests))]
else:
print('Cannot find any tests matching {}.'.format($ARGS[1:]),
file=sys.stderr)
sys.exit(1)

View file

@ -1,6 +1,8 @@
#!/usr/bin/env python
# coding=utf-8
# -*- coding: ascii -*-
"""The xonsh installer."""
# Note: Do not embed any non-ASCII characters in this file until pip has been
# fixed. See https://github.com/scopatz/xonsh/issues/487.
from __future__ import print_function, unicode_literals
import os
import sys
@ -33,7 +35,9 @@ CONDA = ("--conda" in sys.argv)
if CONDA:
sys.argv.remove("--conda")
def clean_tables():
"""Remove the lexer/parser modules that are dynamically created."""
for f in TABLES:
if os.path.isfile(f):
os.remove(f)
@ -41,6 +45,7 @@ def clean_tables():
def build_tables():
"""Build the lexer/parser modules."""
print('Building lexer and parser tables.')
sys.path.insert(0, os.path.dirname(__file__))
from xonsh.parser import Parser
@ -50,15 +55,17 @@ def build_tables():
def install_jupyter_hook(root=None):
"""Make xonsh available as a Jupyter kernel."""
if not HAVE_JUPYTER:
print('Could not install Jupyter kernel spec, please install Jupyter/IPython.')
print('Could not install Jupyter kernel spec, please install '
'Jupyter/IPython.')
return
spec = {"argv": [sys.executable, "-m", "xonsh.jupyter_kernel",
"-f", "{connection_file}"],
"display_name":"Xonsh",
"language":"xonsh",
"codemirror_mode":"shell",
}
"-f", "{connection_file}"],
"display_name": "Xonsh",
"language": "xonsh",
"codemirror_mode": "shell",
}
if CONDA:
d = os.path.join(sys.prefix + '/share/jupyter/kernels/xonsh/')
os.makedirs(d, exist_ok=True)
@ -73,9 +80,13 @@ def install_jupyter_hook(root=None):
with open(os.path.join(d, 'kernel.json'), 'w') as f:
json.dump(spec, f, sort_keys=True)
print('Installing Jupyter kernel spec...')
KernelSpecManager().install_kernel_spec(d, 'xonsh', user=('--user' in sys.argv), replace=True, prefix=root)
KernelSpecManager().install_kernel_spec(
d, 'xonsh', user=('--user' in sys.argv), replace=True,
prefix=root)
class xinstall(install):
"""Xonsh specialization of setuptools install class."""
def run(self):
clean_tables()
build_tables()
@ -84,6 +95,7 @@ class xinstall(install):
class xsdist(sdist):
"""Xonsh specialization of setuptools sdist class."""
def make_release_tree(self, basedir, files):
clean_tables()
build_tables()
@ -92,6 +104,7 @@ class xsdist(sdist):
if HAVE_SETUPTOOLS:
class xdevelop(develop):
"""Xonsh specialization of setuptools develop class."""
def run(self):
clean_tables()
build_tables()
@ -99,10 +112,14 @@ if HAVE_SETUPTOOLS:
def main():
"""The main entry point."""
if sys.version_info[0] < 3:
sys.exit('xonsh currently requires Python 3.4+')
try:
if '--name' not in sys.argv:
logo_fname = os.path.join(os.path.dirname(__file__), 'logo.txt')
with open(logo_fname, 'rb') as f:
logo = f.read().decode('utf-8')
print(logo)
except UnicodeEncodeError:
pass
@ -129,53 +146,15 @@ def main():
skw['entry_points'] = {
'pygments.lexers': ['xonsh = xonsh.pyghooks:XonshLexer',
'xonshcon = xonsh.pyghooks:XonshConsoleLexer',
],
],
'console_scripts': ['xonsh = xonsh.main:main'],
}
skw['cmdclass']['develop'] = xdevelop
else:
skw['scripts'] = ['scripts/xonsh', 'scripts/xonsh.bat']
skw['scripts'] = ['scripts/xonsh'] if 'win' not in sys.platform else ['scripts/xonsh.bat'],
setup(**skw)
logo = """
4Φ
²^
- " ╒▄█████▀█
W
m%Φ *" ,α█████▓▄,▄▀Γ"
"▀██¼" `^` ,
,Mj '║ ╔ ╣║▌ ▀▄
m ^ "ÜM j▐╟╫╨▒ ╙▀≡═╤═m▀╗
æsæ , ,Ä '╕H LU ║║╠╫Å^2=⌐ █
æ%Ū $1 *T
%,M M"^ ╫
@ ` D¬ s
` *`
U $ M%
MM22,
^ MM¬ M
H MM"^^^^^"
@ H' ║▐▐▐█══=.,,, █
**** -
¼V
M UUMJ^^" ▐▌
$j Å%%%
^ .DU^["" -
` M ţ%""w
`` UW%* =z Γ
Å%ƒⁿ= -`
W £6,"ⁿ ` ▄≡▀▀▀
"█▌▌╟Å╓█╓█▀%` ▄▀
`UU%*
ÅŬ` `"
~%4^
"M█ `▀╕ X╕"^
M e^
^^
"""
if __name__ == '__main__':
main()

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Testing built_ins.Aliases"""
from __future__ import unicode_literals, print_function

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tests the xonsh builtins."""
from __future__ import unicode_literals, print_function
import os

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Testing dirstack"""
from __future__ import unicode_literals, print_function

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tests the xonsh environment."""
from __future__ import unicode_literals, print_function
import os

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tests the xonsh lexer."""
from __future__ import unicode_literals, print_function
import os

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tests foreign shells."""
from __future__ import unicode_literals, print_function
import os
@ -43,7 +44,7 @@ def test_foreign_bash_data():
rcfile = os.path.join(os.path.dirname(__file__), 'bashrc.sh')
try:
obsenv, obsaliases = foreign_shell_data('bash', currenv=(),
extra_args=('--rcfile', rcfile),
extra_args=('--rcfile', rcfile),
safe=False)
except (subprocess.CalledProcessError, FileNotFoundError):
raise SkipTest

View file

@ -1,23 +1,29 @@
# -*- coding: utf-8 -*-
"""Tests the xonsh history."""
# pylint: disable=protected-access
# TODO: Remove the following pylint directive when it correctly handles calls
# to nose assert_xxx functions.
# pylint: disable=no-value-for-parameter
from __future__ import unicode_literals, print_function
import io
import os
import sys
import nose
from nose.tools import assert_equal, assert_true
from nose.tools import assert_equal, assert_is_none, assert_is_not_none
from xonsh.lazyjson import LazyJSON
from xonsh.history import History, CommandField
from xonsh.history import History
from xonsh import history
HIST_TEST_KWARGS = dict(sessionid='SESSIONID', gc=False)
def test_hist_init():
"""Test initialization of the shell history."""
FNAME = 'xonsh-SESSIONID.json'
FNAME += '.init'
hist = History(filename=FNAME, here='yup', **HIST_TEST_KWARGS)
History(filename=FNAME, here='yup', **HIST_TEST_KWARGS)
with LazyJSON(FNAME) as lj:
obs = lj['here']
assert_equal('yup', obs)
@ -25,24 +31,26 @@ def test_hist_init():
def test_hist_append():
"""Verify appending to the history works."""
FNAME = 'xonsh-SESSIONID.json'
FNAME += '.append'
hist = History(filename=FNAME, here='yup', **HIST_TEST_KWARGS)
hf = hist.append({'joco': 'still alive'})
yield assert_true, hf is None
yield assert_is_none, hf
yield assert_equal, 'still alive', hist.buffer[0]['joco']
os.remove(FNAME)
def test_hist_flush():
"""Verify explicit flushing of the history works."""
FNAME = 'xonsh-SESSIONID.json'
FNAME += '.flush'
hist = History(filename=FNAME, here='yup', **HIST_TEST_KWARGS)
hf = hist.flush()
yield assert_true, hf is None
yield assert_is_none, hf
hist.append({'joco': 'still alive'})
hf = hist.flush()
yield assert_true, hf is not None
yield assert_is_not_none, hf
while hf.is_alive():
pass
with LazyJSON(FNAME) as lj:
@ -52,12 +60,13 @@ def test_hist_flush():
def test_cmd_field():
"""Test basic history behavior."""
FNAME = 'xonsh-SESSIONID.json'
FNAME += '.cmdfield'
hist = History(filename=FNAME, here='yup', **HIST_TEST_KWARGS)
# in-memory
hf = hist.append({'rtn': 1})
yield assert_true, hf is None
yield assert_is_none, hf
yield assert_equal, 1, hist.rtns[0]
yield assert_equal, 1, hist.rtns[-1]
yield assert_equal, None, hist.outs[-1]
@ -65,24 +74,28 @@ def test_cmd_field():
yield assert_equal, [1], hist.rtns[:]
# on disk
hf = hist.flush()
yield assert_true, hf is not None
yield assert_is_not_none, hf
yield assert_equal, 1, hist.rtns[0]
yield assert_equal, 1, hist.rtns[-1]
yield assert_equal, None, hist.outs[-1]
os.remove(FNAME)
def test_show_cmd():
"""Verify that CLI history commands work."""
FNAME = 'xonsh-SESSIONID.json'
FNAME += '.show_cmd'
cmds = ['ls', 'cat hello kitty', 'abc', 'def', 'touch me', 'grep from me']
def format_hist_line(idx, cmd):
"""Construct a history output line."""
return ' {:d} {:s}\n'.format(idx, cmd)
def run_show_cmd(hist_args, commands, base_idx=0, step=1):
"""Run and evaluate the output of the given show command."""
stdout.seek(0, io.SEEK_SET)
stdout.truncate()
history._main(hist, hist_args) # pylint: disable=protected-access
history._main(hist, hist_args)
stdout.seek(0, io.SEEK_SET)
hist_lines = stdout.readlines()
yield assert_equal, len(commands), len(hist_lines)
@ -95,7 +108,7 @@ def test_show_cmd():
saved_stdout = sys.stdout
sys.stdout = stdout
for cmd in cmds: # populate the shell history
for cmd in cmds: # populate the shell history
hist.append({'inp': cmd, 'rtn': 0})
# Verify an implicit "show" emits the entire history.
@ -109,7 +122,7 @@ def test_show_cmd():
# Verify an explicit "show" with a reversed qualifier emits the entire
# history in reverse order.
for x in run_show_cmd(['show', '-r'], list(reversed(cmds)),
len(cmds) - 1, -1):
len(cmds) - 1, -1):
yield x
# Verify that showing a specific history entry relative to the start of the

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Testing xonsh import hooks"""
from __future__ import unicode_literals, print_function

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tests lazy json functionality."""
from __future__ import unicode_literals, print_function
from io import StringIO

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tests the xonsh lexer."""
from __future__ import unicode_literals, print_function
import os

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
import os
import nose

View file

@ -1,4 +1,4 @@
# coding=utf-8
# -*- coding: utf-8 -*-
"""Tests the xonsh parser."""
from __future__ import unicode_literals, print_function
import os

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tests for the LimitedFileHistory class."""
import os
from tempfile import NamedTemporaryFile

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tests some tools function for prompt_toolkit integration."""
from __future__ import unicode_literals, print_function

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tests the xonsh replay functionality."""
from __future__ import unicode_literals, print_function
import os

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tests the xonsh lexer."""
from __future__ import unicode_literals, print_function
import os

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tests the xonsh lexer."""
from __future__ import unicode_literals, print_function
import sys

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Aliases for the xonsh shell."""
import os
import shlex
@ -33,7 +34,7 @@ def _ensure_source_foreign_parser():
desc = "Sources a file written in a foreign shell language."
parser = ArgumentParser('source-foreign', description=desc)
parser.add_argument('shell', help='Name or path to the foreign shell')
parser.add_argument('files_or_code', nargs='+',
parser.add_argument('files_or_code', nargs='+',
help='file paths to source or code in the target '
'language.')
parser.add_argument('-i', '--interactive', type=to_bool, default=True,
@ -42,23 +43,23 @@ def _ensure_source_foreign_parser():
parser.add_argument('-l', '--login', type=to_bool, default=False,
help='whether the sourced shell should be login',
dest='login')
parser.add_argument('--envcmd', default='env', dest='envcmd',
parser.add_argument('--envcmd', default='env', dest='envcmd',
help='command to print environment')
parser.add_argument('--aliascmd', default='alias', dest='aliascmd',
parser.add_argument('--aliascmd', default='alias', dest='aliascmd',
help='command to print aliases')
parser.add_argument('--extra-args', default=(), dest='extra_args',
type=(lambda s: tuple(s.split())),
type=(lambda s: tuple(s.split())),
help='extra arguments needed to run the shell')
parser.add_argument('-s', '--safe', type=to_bool, default=True,
parser.add_argument('-s', '--safe', type=to_bool, default=True,
help='whether the source shell should be run safely, '
'and not raise any errors, even if they occur.',
dest='safe')
parser.add_argument('-p', '--prevcmd', default=None, dest='prevcmd',
parser.add_argument('-p', '--prevcmd', default=None, dest='prevcmd',
help='command(s) to run before any other commands, '
'replaces traditional source.')
parser.add_argument('--postcmd', default='', dest='postcmd',
parser.add_argument('--postcmd', default='', dest='postcmd',
help='command(s) to run after all other commands')
parser.add_argument('--funcscmd', default=None, dest='funcscmd',
parser.add_argument('--funcscmd', default=None, dest='funcscmd',
help='code to find locations of all native functions '
'in the shell language.')
parser.add_argument('--sourcer', default=None, dest='sourcer',
@ -66,7 +67,7 @@ def _ensure_source_foreign_parser():
'default: source.')
_SOURCE_FOREIGN_PARSER = parser
return parser
def source_foreign(args, stdin=None):
"""Sources a file written in a foreign shell language."""
@ -78,10 +79,10 @@ def source_foreign(args, stdin=None):
ns.prevcmd = '{0} {1}'.format(ns.sourcer, ' '.join(ns.files_or_code))
foreign_shell_data.cache_clear() # make sure that we don't get prev src
fsenv, fsaliases = foreign_shell_data(shell=ns.shell, login=ns.login,
interactive=ns.interactive, envcmd=ns.envcmd,
aliascmd=ns.aliascmd, extra_args=ns.extra_args,
safe=ns.safe, prevcmd=ns.prevcmd,
postcmd=ns.postcmd, funcscmd=ns.funcscmd,
interactive=ns.interactive, envcmd=ns.envcmd,
aliascmd=ns.aliascmd, extra_args=ns.extra_args,
safe=ns.safe, prevcmd=ns.prevcmd,
postcmd=ns.postcmd, funcscmd=ns.funcscmd,
sourcer=ns.sourcer)
# apply results
env = builtins.__xonsh_env__

View file

@ -1,20 +1,26 @@
# -*- coding: utf-8 -*-
"""The xonsh abstract syntax tree node."""
# These are imported into our module namespace for the benefit of parser.py.
# pylint: disable=unused-import
from ast import Module, Num, Expr, Str, Bytes, UnaryOp, UAdd, USub, Invert, \
BinOp, Add, Sub, Mult, Div, FloorDiv, Mod, Pow, Compare, Lt, Gt, \
LtE, GtE, Eq, NotEq, In, NotIn, Is, IsNot, Not, BoolOp, Or, And, Subscript, \
Load, Slice, List, Tuple, Set, Dict, AST, NameConstant, \
LtE, GtE, Eq, NotEq, In, NotIn, Is, IsNot, Not, BoolOp, Or, And, \
Subscript, Load, Slice, List, Tuple, Set, Dict, AST, NameConstant, \
Name, GeneratorExp, Store, comprehension, ListComp, SetComp, DictComp, \
Assign, AugAssign, BitXor, BitAnd, BitOr, LShift, RShift, Assert, Delete, \
Del, Pass, Raise, Import, alias, ImportFrom, Continue, Break, Yield, \
YieldFrom, Return, IfExp, Lambda, arguments, arg, Call, keyword, \
Attribute, Global, Nonlocal, If, While, For, withitem, With, Try, \
ExceptHandler, FunctionDef, ClassDef, Starred, NodeTransformer, \
Interactive, Expression, dump
from ast import Ellipsis, Index # pylint:disable=unused-import,redefined-builtin
Interactive, Expression, Index, dump
from ast import Ellipsis # pylint: disable=redefined-builtin
# pylint: enable=unused-import
from xonsh.tools import subproc_toks, VER_3_5, VER_MAJOR_MINOR
if VER_3_5 <= VER_MAJOR_MINOR:
# pylint: disable=unused-import
# pylint: disable=no-name-in-module
from ast import MatMult, AsyncFunctionDef, AsyncWith, AsyncFor, Await
else:
MatMult = AsyncFunctionDef = AsyncWith = AsyncFor = Await = None
@ -140,6 +146,7 @@ class CtxAwareTransformer(NodeTransformer):
return inscope
def visit_Expression(self, node):
"""Handle visiting an expression body."""
body = node.body
inscope = self.is_in_scope(body)
if not inscope:
@ -147,6 +154,7 @@ class CtxAwareTransformer(NodeTransformer):
return node
def visit_Expr(self, node):
"""Handle visiting an expression."""
if self.is_in_scope(node):
return node
else:
@ -158,10 +166,11 @@ class CtxAwareTransformer(NodeTransformer):
return newnode
def visit_Assign(self, node):
"""Handle visiting an assignment statement."""
ups = set()
for targ in node.targets:
if isinstance(targ, (Tuple, List)):
ups.update(map(leftmostname, targ.elts))
ups.update(leftmostname(elt) for elt in targ.elts)
elif isinstance(targ, BinOp):
newnode = self.try_subproc_toks(node)
if newnode is node:
@ -174,6 +183,7 @@ class CtxAwareTransformer(NodeTransformer):
return node
def visit_Import(self, node):
"""Handle visiting a import statement."""
for name in node.names:
if name.asname is None:
self.ctxadd(name.name)
@ -182,6 +192,7 @@ class CtxAwareTransformer(NodeTransformer):
return node
def visit_ImportFrom(self, node):
"""Handle visiting a "from ... import ..." statement."""
for name in node.names:
if name.asname is None:
self.ctxadd(name.name)
@ -190,6 +201,7 @@ class CtxAwareTransformer(NodeTransformer):
return node
def visit_With(self, node):
"""Handle visiting a with statement."""
for item in node.items:
if item.optional_vars is not None:
self.ctxadd(leftmostname(item.optional_vars))
@ -197,15 +209,17 @@ class CtxAwareTransformer(NodeTransformer):
return node
def visit_For(self, node):
"""Handle visiting a for statement."""
targ = node.target
if isinstance(targ, (Tuple, List)):
self.ctxupdate(map(leftmostname, targ.elts))
self.ctxupdate(leftmostname(elt) for elt in targ.elts)
else:
self.ctxadd(leftmostname(targ))
self.generic_visit(node)
return node
def visit_FunctionDef(self, node):
"""Handle visiting a function definition."""
self.ctxadd(node.name)
self.contexts.append(set())
self.generic_visit(node)
@ -213,6 +227,7 @@ class CtxAwareTransformer(NodeTransformer):
return node
def visit_ClassDef(self, node):
"""Handle visiting a class definition."""
self.ctxadd(node.name)
self.contexts.append(set())
self.generic_visit(node)
@ -220,6 +235,7 @@ class CtxAwareTransformer(NodeTransformer):
return node
def visit_Delete(self, node):
"""Handle visiting a del statement."""
for targ in node.targets:
if isinstance(targ, Name):
self.ctxremove(targ.id)
@ -227,6 +243,7 @@ class CtxAwareTransformer(NodeTransformer):
return node
def visit_Try(self, node):
"""Handle visiting a try statement."""
for handler in node.handlers:
if handler.name is not None:
self.ctxadd(handler.name)
@ -234,6 +251,7 @@ class CtxAwareTransformer(NodeTransformer):
return node
def visit_Global(self, node):
"""Handle visiting a global statement."""
self.contexts[1].update(node.names) # contexts[1] is the global ctx
self.generic_visit(node)
return node

View file

@ -1,26 +1,24 @@
# -*- coding: utf-8 -*-
"""The base class for xonsh shell"""
import io
import os
import sys
import time
import builtins
import traceback
from xonsh.execer import Execer
from xonsh.tools import XonshError, escape_windows_title_string, ON_WINDOWS, \
print_exception
from xonsh.completer import Completer
from xonsh.environ import multiline_prompt, format_prompt
class TeeOut(object):
"""Tees stdout into the original sys.stdout and another buffer instance that is
provided.
"""
class _TeeOut(object):
"""Tees stdout into the original sys.stdout and another buffer."""
def __init__(self, buf, *args, **kwargs):
def __init__(self, buf):
self.buffer = buf
self.stdout = sys.stdout
self.encoding = self.stdout.encoding
sys.stdout = self
def __del__(self):
@ -40,15 +38,19 @@ class TeeOut(object):
self.stdout.flush()
self.buffer.flush()
def fileno(self):
"""Tunnel fileno() calls."""
_ = self
return sys.stdout.fileno()
class TeeErr(object):
"""Tees stderr into the original sys.stdout and another buffer instance that is
provided.
"""
def __init__(self, buf, *args, **kwargs):
class _TeeErr(object):
"""Tees stderr into the original sys.stdout and another buffer."""
def __init__(self, buf):
self.buffer = buf
self.stderr = sys.stderr
self.encoding = self.stderr.encoding
sys.stderr = self
def __del__(self):
@ -68,16 +70,24 @@ class TeeErr(object):
self.stderr.flush()
self.buffer.flush()
def fileno(self):
"""Tunnel fileno() calls."""
_ = self
return sys.stderr.fileno()
class Tee(io.StringIO):
"""Class that merges tee'd stdout and stderr into a single buffer, namely itself.
"""Class that merges tee'd stdout and stderr into a single buffer.
This represents what a user would actually see on the command line.
"""
# pylint is a stupid about counting public methods when using inheritance.
# pylint: disable=too-few-public-methods
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stdout = TeeOut(self)
self.stderr = TeeErr(self)
self.stdout = _TeeOut(self)
self.stderr = _TeeErr(self)
def __del__(self):
del self.stdout, self.stderr
@ -117,10 +127,10 @@ class BaseShell(object):
src, code = self.push(line)
if code is None:
return
hist = builtins.__xonsh_history__
hist = builtins.__xonsh_history__ # pylint: disable=no-member
ts1 = None
tee = Tee() if builtins.__xonsh_env__.get('XONSH_STORE_STDOUT') \
else io.StringIO()
store_stdout = builtins.__xonsh_env__.get('XONSH_STORE_STDOUT') # pylint: disable=no-member
tee = Tee() if store_stdout else io.StringIO()
try:
ts0 = time.time()
self.execer.exec(code, mode='single', glbs=self.ctx) # no locals
@ -131,7 +141,7 @@ class BaseShell(object):
print(e.args[0], file=sys.stderr)
if hist.last_cmd_rtn is None:
hist.last_cmd_rtn = 1 # return code for failure
except Exception:
except Exception: # pylint: disable=broad-except
print_exception()
if hist.last_cmd_rtn is None:
hist.last_cmd_rtn = 1 # return code for failure
@ -139,7 +149,7 @@ class BaseShell(object):
ts1 = ts1 or time.time()
self._append_history(inp=src, ts=[ts0, ts1], tee_out=tee.getvalue())
tee.close()
if builtins.__xonsh_exit__:
if builtins.__xonsh_exit__: # pylint: disable=no-member
return True
def push(self, line):
@ -163,7 +173,7 @@ class BaseShell(object):
print_exception()
return src, None
self.need_more_lines = True
except Exception:
except Exception: # pylint: disable=broad-except
self.reset_buffer()
print_exception()
return src, None
@ -177,7 +187,8 @@ class BaseShell(object):
def settitle(self):
"""Sets terminal title."""
env = builtins.__xonsh_env__
_ = self
env = builtins.__xonsh_env__ # pylint: disable=no-member
term = env.get('TERM', None)
# Shells running in emacs sets TERM to "dumb" or "eterm-color".
# Do not set title for these to avoid garbled prompt.
@ -200,21 +211,23 @@ class BaseShell(object):
if self.mlprompt is None:
try:
self.mlprompt = multiline_prompt()
except Exception:
except Exception: # pylint: disable=broad-except
print_exception()
self.mlprompt = '<multiline prompt error> '
return self.mlprompt
env = builtins.__xonsh_env__
env = builtins.__xonsh_env__ # pylint: disable=no-member
p = env.get('PROMPT')
try:
p = format_prompt(p)
except Exception:
except Exception: # pylint: disable=broad-except
print_exception()
self.settitle()
return p
def _append_history(self, tee_out=None, **info):
hist = builtins.__xonsh_history__
"""Append information about the command to the history."""
_ = self
hist = builtins.__xonsh_history__ # pylint: disable=no-member
info['rtn'] = hist.last_cmd_rtn
tee_out = tee_out or None
last_out = hist.last_cmd_out or None
@ -228,5 +241,3 @@ class BaseShell(object):
info['out'] = tee_out + '\n' + last_out
hist.append(info)
hist.last_cmd_rtn = hist.last_cmd_out = None

View file

@ -1,5 +1,8 @@
"""The xonsh built-ins. Note that this module is named 'built_ins' so as
not to be confused with the special Python builtins module.
# -*- coding: utf-8 -*-
"""The xonsh built-ins.
Note that this module is named 'built_ins' so as not to be confused with the
special Python builtins module.
"""
import os
import re
@ -31,14 +34,14 @@ from xonsh.foreign_shells import load_foreign_aliases
ENV = None
BUILTINS_LOADED = False
INSPECTOR = Inspector()
AT_EXIT_SIGNALS = (signal.SIGABRT, signal.SIGFPE, signal.SIGILL, signal.SIGSEGV,
AT_EXIT_SIGNALS = (signal.SIGABRT, signal.SIGFPE, signal.SIGILL, signal.SIGSEGV,
signal.SIGTERM)
if ON_POSIX:
AT_EXIT_SIGNALS += (signal.SIGTSTP, signal.SIGQUIT, signal.SIGHUP)
def resetting_signal_handle(sig, f):
"""Sets a new signal handle that will automaticallly restore the old value
"""Sets a new signal handle that will automaticallly restore the old value
once the new handle is finished.
"""
oldh = signal.getsignal(sig)
@ -319,9 +322,9 @@ def get_script_subproc_command(fname, args):
raise PermissionError
if ON_POSIX and not os.access(fname, os.R_OK):
# on some systems, some importnat programs (e.g. sudo) will have execute
# permissions but not read/write permisions. This enables things with the SUID
# set to be run. Needs to come before _is_binary() is called, because that
# on some systems, some importnat programs (e.g. sudo) will have execute
# permissions but not read/write permisions. This enables things with the SUID
# set to be run. Needs to come before _is_binary() is called, because that
# function tries to read the file.
return [fname] + args
elif _is_binary(fname):
@ -612,7 +615,7 @@ def run_subproc(cmds, captured=True):
if prev_proc.stdout not in (None, sys.stdout):
output = prev_proc.stdout.read()
if captured:
# to get proper encoding from Popen, we have to
# to get proper encoding from Popen, we have to
# use a byte stream and then implement universal_newlines here
output = output.decode(encoding=ENV.get('XONSH_ENCODING'),
errors=ENV.get('XONSH_ENCODING_ERRORS'))
@ -681,7 +684,7 @@ def load_builtins(execer=None):
builtins.aliases.update(load_foreign_aliases(issue_warning=False))
# history needs to be started after env and aliases
# would be nice to actually include non-detyped versions.
builtins.__xonsh_history__ = History(env=ENV.detype(), #aliases=builtins.aliases,
builtins.__xonsh_history__ = History(env=ENV.detype(), #aliases=builtins.aliases,
ts=[time.time(), None], locked=True)
lastflush = lambda s=None, f=None: builtins.__xonsh_history__.flush(at_exit=True)
atexit.register(lastflush)

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""A (tab-)completer for xonsh."""
import os
import re
@ -27,6 +28,8 @@ XONSH_TOKENS = {
'...'
}
COMPLETION_SKIP_TOKENS = {'sudo', 'time'}
BASH_COMPLETE_SCRIPT = """source {filename}
COMP_WORDS=({line})
COMP_LINE={comp_line}
@ -56,13 +59,13 @@ def startswithnorm(x, start, startlow=None):
def _normpath(p):
""" Wraps os.normpath() to avoid removing './' at the beginning
""" Wraps os.normpath() to avoid removing './' at the beginning
and '/' at the end. On windows it does the same with backslases
"""
"""
initial_dotslash = p.startswith(os.curdir + os.sep)
initial_dotslash |= (ON_WINDOWS and p.startswith(os.curdir + os.altsep))
p = p.rstrip()
trailing_slash = p.endswith(os.sep)
trailing_slash = p.endswith(os.sep)
trailing_slash |= (ON_WINDOWS and p.endswith(os.altsep))
p = os.path.normpath(p)
if initial_dotslash and p != '.':
@ -122,6 +125,11 @@ class Completer(object):
ctx = ctx or {}
prefixlow = prefix.lower()
cmd = line.split(' ', 1)[0]
if cmd in COMPLETION_SKIP_TOKENS:
begidx -= len(cmd)+1
endidx -= len(cmd)+1
cmd = line.split(' ', 2)[1]
line = line.split(' ', 1)[1]
csc = builtins.__xonsh_env__.get('CASE_SENSITIVE_COMPLETIONS')
startswither = startswithnorm if csc else startswithlow
if begidx == 0:
@ -315,7 +323,7 @@ class Completer(object):
for f in builtins.__xonsh_env__.get('BASH_COMPLETIONS'):
if os.path.isfile(f):
# We need to "Unixify" Windows paths for Bash to understand
if ON_WINDOWS:
if ON_WINDOWS:
f = RE_WIN_DRIVE.sub(lambda m: '/{0}/'.format(m.group(1).lower()), f).replace('\\', '/')
srcs.append('source ' + f)
return srcs

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tools for diff'ing two xonsh history files in a meaningful fashion."""
from datetime import datetime
from itertools import zip_longest
@ -117,7 +118,7 @@ class HistoryDiffer(object):
s += ' started: ' + ts0.isoformat(' ')
if ts[1] is not None:
ts1 = datetime.fromtimestamp(ts[1])
s += ' stopped: ' + ts1.isoformat(' ') + ' runtime: ' + str(ts1 - ts0)
s += ' stopped: ' + ts1.isoformat(' ') + ' runtime: ' + str(ts1 - ts0)
return s
def header(self):
@ -195,7 +196,7 @@ class HistoryDiffer(object):
pass
elif bout is None:
aid = self.a['sessionid']
s += 'Note: only {red}{aid}{no_color} output stored\n'.format(red=RED,
s += 'Note: only {red}{aid}{no_color} output stored\n'.format(red=RED,
aid=aid, no_color=NO_COLOR)
elif aout is None:
bid = self.b['sessionid']
@ -234,7 +235,7 @@ class HistoryDiffer(object):
s = ''
for tag, i1, i2, j1, j2 in sm.get_opcodes():
if tag == REPLACE:
for i, ainp, j, binp in zip_longest(range(i1, i2), ainps[i1:i2],
for i, ainp, j, binp in zip_longest(range(i1, i2), ainps[i1:i2],
range(j1, j2), binps[j1:j2]):
if j is None:
s += self._cmd_in_one_diff(ainp, i, self.a, aid, RED)
@ -254,7 +255,7 @@ class HistoryDiffer(object):
if len(odiff) > 0:
h = ('cmd #{i} in {red}{aid}{no_color} input is the same as \n'
'cmd #{j} in {green}{bid}{no_color}, but output differs:\n')
s += h.format(i=i, aid=aid, j=j, bid=bid, red=RED, green=GREEN,
s += h.format(i=i, aid=aid, j=j, bid=bid, red=RED, green=GREEN,
no_color=NO_COLOR)
s += odiff + '\n'
else:
@ -299,7 +300,7 @@ def _create_parser(p=None):
def _main_action(ns, hist=None):
hd = HistoryDiffer(ns.a, ns.b, reopen=ns.reopen, verbose=ns.verbose)
print(hd.format())
def main(args=None, stdin=None):
"""Main entry point for history diff'ing"""

View file

@ -1,5 +1,5 @@
"""Directory stack and associated utilities for the xonsh shell.
"""
# -*- coding: utf-8 -*-
"""Directory stack and associated utilities for the xonsh shell."""
import os
import builtins
from glob import iglob

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Environment for the xonsh shell."""
import os
import re
@ -126,7 +127,7 @@ DEFAULT_VALUES = {
'/opt/local/etc/profile.d/bash_completion.sh')
if ON_MAC else
('/usr/share/bash-completion/bash_completion',
'/usr/share/bash-completion/completions/git')
'/usr/share/bash-completion/completions/git')
if ON_ARCH else
('/etc/bash_completion',
'/usr/share/bash-completion/completions/git')),
@ -150,7 +151,7 @@ DEFAULT_VALUES = {
'PROMPT_TOOLKIT_STYLES': None,
'PUSHD_MINUS': False,
'PUSHD_SILENT': False,
'SHELL_TYPE': 'readline',
'SHELL_TYPE': 'prompt_toolkit' if ON_WINDOWS else 'readline',
'SUGGEST_COMMANDS': True,
'SUGGEST_MAX_NUM': 5,
'SUGGEST_THRESHOLD': 3,
@ -163,7 +164,7 @@ DEFAULT_VALUES = {
'XONSHRC': ((os.path.join(os.environ['ALLUSERSPROFILE'],
'xonsh', 'xonshrc'),
os.path.expanduser('~/.xonshrc')) if ON_WINDOWS
else ('/etc/xonshrc', os.path.expanduser('~/.xonshrc'))),
else ('/etc/xonshrc', os.path.expanduser('~/.xonshrc'))),
'XONSH_CONFIG_DIR': xonsh_config_dir,
'XONSH_DATA_DIR': xonsh_data_dir,
'XONSH_ENCODING': DEFAULT_ENCODING,

View file

@ -1,4 +1,5 @@
"""Implements the xonsh executer"""
# -*- coding: utf-8 -*-
"""Implements the xonsh executer."""
import re
import os
import types

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tools to help interface with foreign shells, such as Bash."""
import os
import re
@ -67,11 +68,11 @@ DEFAULT_SOURCERS = {
}
@lru_cache()
def foreign_shell_data(shell, interactive=True, login=False, envcmd='env',
aliascmd='alias', extra_args=(), currenv=None,
def foreign_shell_data(shell, interactive=True, login=False, envcmd='env',
aliascmd='alias', extra_args=(), currenv=None,
safe=True, prevcmd='', postcmd='', funcscmd=None,
sourcer=None):
"""Extracts data from a foreign (non-xonsh) shells. Currently this gets
"""Extracts data from a foreign (non-xonsh) shells. Currently this gets
the environment, aliases, and functions but may be extended in the future.
Parameters
@ -93,7 +94,7 @@ def foreign_shell_data(shell, interactive=True, login=False, envcmd='env',
safe : bool, optional
Flag for whether or not to safely handle exceptions and other errors.
prevcmd : str, optional
A command to run in the shell before anything else, useful for
A command to run in the shell before anything else, useful for
sourcing and other commands that may require environment recovery.
postcmd : str, optional
A command to run after everything else, useful for cleaning up any
@ -103,12 +104,12 @@ def foreign_shell_data(shell, interactive=True, login=False, envcmd='env',
and locations of any functions that are native to the foreign shell.
This command should print *only* a whitespace separated sequence
of pairs function name & filenames where the functions are defined.
If this is None, then a default script will attempted to be looked
If this is None, then a default script will attempted to be looked
up based on the shell name. Callable wrappers for these functions
will be returned in the aliases dictionary.
sourcer : str or None, optional
How to source a foreign shell file for purposes of calling functions
in that shell. If this is None, a default value will attempt to be
in that shell. If this is None, a default value will attempt to be
looked up based on the shell name.
Returns
@ -116,7 +117,7 @@ def foreign_shell_data(shell, interactive=True, login=False, envcmd='env',
env : dict
Dictionary of shell's environment
aliases : dict
Dictionary of shell's alaiases, this includes foreign function
Dictionary of shell's alaiases, this includes foreign function
wrappers.
"""
cmd = [shell]
@ -161,7 +162,7 @@ def parse_env(s):
return env
ALIAS_RE = re.compile('__XONSH_ALIAS_BEG__\n(.*)__XONSH_ALIAS_END__',
ALIAS_RE = re.compile('__XONSH_ALIAS_BEG__\n(.*)__XONSH_ALIAS_END__',
flags=re.DOTALL)
def parse_aliases(s):
@ -211,7 +212,7 @@ def parse_funcs(s, shell, sourcer=None):
for funcname, filename in zip(flatpairs[::2], flatpairs[1::2]):
if funcname.startswith('_'):
continue # skip private functions
wrapper = ForeignShellFunctionAlias(name=funcname, shell=shell,
wrapper = ForeignShellFunctionAlias(name=funcname, shell=shell,
sourcer=sourcer, filename=filename)
funcs[funcname] = wrapper
return funcs
@ -223,7 +224,7 @@ class ForeignShellFunctionAlias(object):
"""
INPUT = ('{sourcer} {filename}\n'
'{funcname} {args}\n')
'{funcname} {args}\n')
def __init__(self, name, shell, filename, sourcer=None):
"""
@ -278,8 +279,8 @@ class ForeignShellFunctionAlias(object):
return args, True
VALID_SHELL_PARAMS = frozenset(['shell', 'interactive', 'login', 'envcmd',
'aliascmd', 'extra_args', 'currenv', 'safe',
VALID_SHELL_PARAMS = frozenset(['shell', 'interactive', 'login', 'envcmd',
'aliascmd', 'extra_args', 'currenv', 'safe',
'prevcmd', 'postcmd', 'funcscmd', 'sourcer'])
def ensure_shell(shell):
@ -296,9 +297,9 @@ def ensure_shell(shell):
if 'login' in shell_keys:
shell['login'] = to_bool(shell['login'])
if 'envcmd' in shell_keys:
shell['envcmd'] = eunsure_string(shell['envcmd'])
shell['envcmd'] = ensure_string(shell['envcmd'])
if 'aliascmd' in shell_keys:
shell['aliascmd'] = eunsure_string(shell['aliascmd'])
shell['aliascmd'] = ensure_string(shell['aliascmd'])
if 'extra_args' in shell_keys and not isinstance(shell['extra_args'], tuple):
shell['extra_args'] = tuple(map(ensure_string, shell['extra_args']))
if 'currenv' in shell_keys and not isinstance(shell['currenv'], tuple):
@ -313,15 +314,15 @@ def ensure_shell(shell):
if 'safe' in shell_keys:
shell['safe'] = to_bool(shell['safe'])
if 'prevcmd' in shell_keys:
shell['prevcmd'] = eunsure_string(shell['prevcmd'])
shell['prevcmd'] = ensure_string(shell['prevcmd'])
if 'postcmd' in shell_keys:
shell['postcmd'] = eunsure_string(shell['postcmd'])
shell['postcmd'] = ensure_string(shell['postcmd'])
if 'funcscmd' in shell_keys:
shell['funcscmd'] = None if shell['funcscmd'] is None \
else eunsure_string(shell['funcscmd'])
else ensure_string(shell['funcscmd'])
if 'sourcer' in shell_keys:
shell['sourcer'] = None if shell['sourcer'] is None \
else eunsure_string(shell['sourcer'])
else ensure_string(shell['sourcer'])
return shell
@ -357,7 +358,7 @@ def load_foreign_envs(shells=None, config=None, issue_warning=True):
keyword arguments. Not compatible with config not being None.
config : str of None, optional
Path to the static config file. Not compatible with shell not being None.
If both shell and config is None, then it will be read from the
If both shell and config is None, then it will be read from the
$XONSHCONFIG environment variable.
issue_warning : bool, optional
Issues warnings if config file cannot be found.
@ -386,7 +387,7 @@ def load_foreign_aliases(shells=None, config=None, issue_warning=True):
keyword arguments. Not compatible with config not being None.
config : str of None, optional
Path to the static config file. Not compatible with shell not being None.
If both shell and config is None, then it will be read from the
If both shell and config is None, then it will be read from the
$XONSHCONFIG environment variable.
issue_warning : bool, optional
Issues warnings if config file cannot be found.

View file

@ -1,4 +1,7 @@
"""Implements the xonsh history object"""
# -*- coding: utf-8 -*-
"""Implements the xonsh history object."""
import argparse
import functools
import os
import uuid
import time
@ -12,74 +15,100 @@ from xonsh.tools import ensure_int_or_slice, to_history_tuple
from xonsh import diff_history
def _gc_commands_to_rmfiles(hsize, files):
"""Return the history files to remove to get under the command limit."""
rmfiles = []
n = 0
ncmds = 0
for ts, fcmds, f in files[::-1]:
if fcmds == 0:
# we need to make sure that 'empty' history files don't hang around
rmfiles.append((ts, fcmds, f))
if ncmds + fcmds > hsize:
break
ncmds += fcmds
n += 1
rmfiles += files[:-n]
return rmfiles
def _gc_files_to_rmfiles(hsize, files):
"""Return the history files to remove to get under the file limit."""
rmfiles = files[:-hsize] if len(files) > hsize else []
return rmfiles
def _gc_seconds_to_rmfiles(hsize, files):
"""Return the history files to remove to get under the age limit."""
rmfiles = []
now = time.time()
for ts, _, f in files:
if (now - ts) < hsize:
break
rmfiles.append((None, None, f))
return rmfiles
def _gc_bytes_to_rmfiles(hsize, files):
"""Return the history files to remove to get under the byte limit."""
rmfiles = []
n = 0
nbytes = 0
for _, _, f in files[::-1]:
fsize = os.stat(f).st_size
if nbytes + fsize > hsize:
break
nbytes += fsize
n += 1
rmfiles = files[:-n]
return rmfiles
class HistoryGC(Thread):
"""Shell history garbage collection."""
def __init__(self, wait_for_shell=True, size=None, *args, **kwargs):
"""Thread responsible for garbage collecting old history.
May wait for shell (and thus xonshrc to have been loaded) to start work.
"""
super(HistoryGC, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.daemon = True
self.size = size
self.wait_for_shell = wait_for_shell
self.start()
self.gc_units_to_rmfiles = {'commands': _gc_commands_to_rmfiles,
'files': _gc_files_to_rmfiles,
's': _gc_seconds_to_rmfiles,
'b': _gc_bytes_to_rmfiles}
def run(self):
while self.wait_for_shell:
time.sleep(0.01)
env = builtins.__xonsh_env__
env = builtins.__xonsh_env__ # pylint: disable=no-member
if self.size is None:
hsize, units = env.get('XONSH_HISTORY_SIZE')
else:
hsize, units = to_history_tuple(self.size)
files = self.unlocked_files()
# flag files for removal
if units == 'commands':
n = 0
ncmds = 0
rmfiles = []
for ts, fcmds, f in files[::-1]:
if fcmds == 0:
# we need to make sure that 'empty' history files don't hang around
fmfiles.append((ts, fcmds, f))
if ncmds + fcmds > hsize:
break
ncmds += fcmds
n += 1
rmfiles += files[:-n]
elif units == 'files':
rmfiles = files[:-hsize] if len(files) > hsize else []
elif units == 's':
now = time.time()
rmfiles = []
for ts, _, f in files:
if (now - ts) < hsize:
break
rmfiles.append((None, None, f))
elif units == 'b':
n = 0
nbytes = 0
for _, _, f in files[::-1]:
fsize = os.stat(f).st_size
if nbytes + fsize > hsize:
break
nbytes += fsize
n += 1
rmfiles = files[:-n]
else:
raise ValueError('Units of {0!r} not understood'.format(unit))
# finally, clean up files
for _, _, f in rmfiles:
rmfiles_fn = self.gc_units_to_rmfiles.get(units)
if rmfiles_fn is None:
raise ValueError('Units type {0!r} not understood'.format(units))
for _, _, f in rmfiles_fn(hsize, files):
try:
os.remove(f)
except OSError:
pass
def unlocked_files(self):
"""Finds the history files and returns the ones that are unlocked, this is
sorted by the last closed time. Returns a list of (timestamp, file) tuples.
"""Find and return the history files that are unlocked.
This is sorted by the last closed time. Returns a list of (timestamp,
file) tuples.
"""
_ = self # this could be a function but is intimate to this class
# pylint: disable=no-member
xdd = os.path.abspath(builtins.__xonsh_env__.get('XONSH_DATA_DIR'))
fs = [f for f in iglob(os.path.join(xdd, 'xonsh-*.json'))]
files = []
@ -98,8 +127,10 @@ class HistoryGC(Thread):
class HistoryFlusher(Thread):
"""Flush shell history to disk periodically."""
def __init__(self, filename, buffer, queue, cond, at_exit=False, *args, **kwargs):
def __init__(self, filename, buffer, queue, cond, at_exit=False, *args,
**kwargs):
"""Thread for flushing history."""
super(HistoryFlusher, self).__init__(*args, **kwargs)
self.filename = filename
@ -125,6 +156,7 @@ class HistoryFlusher(Thread):
return self is self.queue[0]
def dump(self):
"""Write the cached history to external storage."""
with open(self.filename, 'r', newline='\n') as f:
hist = lazyjson.LazyJSON(f).load()
hist['cmds'].extend(self.buffer)
@ -136,11 +168,13 @@ class HistoryFlusher(Thread):
class CommandField(Sequence):
"""A field in the 'cmds' portion of history."""
def __init__(self, field, hist, default=None):
"""Represents a field in the 'cmds' portion of history. Will query the buffer
for the relevant data, if possible. Otherwise it will lazily acquire data from
the file.
"""Represents a field in the 'cmds' portion of history.
Will query the buffer for the relevant data, if possible. Otherwise it
will lazily acquire data from the file.
Parameters
----------
@ -163,14 +197,16 @@ class CommandField(Sequence):
if isinstance(key, slice):
return [self[i] for i in range(*key.indices(size))]
elif not isinstance(key, int):
raise IndexError('CommandField may only be indexed by int or slice.')
raise IndexError(
'CommandField may only be indexed by int or slice.')
elif size == 0:
raise IndexError('CommandField is empty.')
# now we know we have an int
key = size + key if key < 0 else key # ensure key is non-negative
bufsize = len(self.hist.buffer)
if size - bufsize <= key: # key is in buffer
return self.hist.buffer[key + bufsize - size].get(self.field, self.default)
return self.hist.buffer[key + bufsize - size].get(
self.field, self.default)
# now we know we have to go into the file
queue = self.hist._queue
queue.append(self)
@ -190,8 +226,10 @@ class CommandField(Sequence):
class History(object):
"""Xonsh session history."""
def __init__(self, filename=None, sessionid=None, buffersize=100, gc=True, **meta):
def __init__(self, filename=None, sessionid=None, buffersize=100, gc=True,
**meta):
"""Represents a xonsh session's history as an in-memory buffer that is
periodically flushed to disk.
@ -201,19 +239,21 @@ class History(object):
Location of history file, defaults to
``$XONSH_DATA_DIR/xonsh-{sessionid}.json``.
sessionid : int, uuid, str, optional
Current session identifier, will generate a new sessionid if not set.
Current session identifier, will generate a new sessionid if not
set.
buffersize : int, optional
Maximum buffersize in memory.
meta : optional
Top-level metadata to store along with the history. The kwargs 'cmds' and
'sessionid' are not allowed and will be overwritten.
Top-level metadata to store along with the history. The kwargs
'cmds' and 'sessionid' are not allowed and will be overwritten.
gc : bool, optional
Run garbage collector flag.
"""
self.sessionid = sid = uuid.uuid4() if sessionid is None else sessionid
if filename is None:
self.filename = os.path.join(builtins.__xonsh_env__.get('XONSH_DATA_DIR'),
'xonsh-{0}.json'.format(sid))
# pylint: disable=no-member
data_dir = builtins.__xonsh_env__.get('XONSH_DATA_DIR')
self.filename = os.path.join(data_dir, 'xonsh-{0}.json'.format(sid))
else:
self.filename = filename
self.buffer = []
@ -274,38 +314,36 @@ class History(object):
"""
if len(self.buffer) == 0:
return
hf = HistoryFlusher(self.filename, tuple(self.buffer), self._queue, self._cond,
at_exit=at_exit)
hf = HistoryFlusher(self.filename, tuple(self.buffer), self._queue,
self._cond, at_exit=at_exit)
self.buffer.clear()
return hf
#
# Interface to History
#
_HIST_PARSER = None
@functools.lru_cache()
def _create_parser():
global _HIST_PARSER
if _HIST_PARSER is not None:
return _HIST_PARSER
from argparse import ArgumentParser
p = ArgumentParser(prog='history',
description='Tools for dealing with history')
"""Create a parser for the "history" command."""
p = argparse.ArgumentParser(prog='history',
description='Tools for dealing with history')
subp = p.add_subparsers(title='action', dest='action')
# show action
show = subp.add_parser('show', help='displays current history, default action')
show = subp.add_parser('show',
help='displays current history, default action')
show.add_argument('-r', dest='reverse', default=False, action='store_true',
help='reverses the direction')
show.add_argument('n', nargs='?', default=None,
help='display n\'th history entry if n is a simple int, '
'or range of entries if it is Python slice notation')
# id
idp = subp.add_parser('id', help='displays the current session id')
# file
fp = subp.add_parser('file', help='displays the current history filename')
# info
info = subp.add_parser('info', help='displays information about the current history')
# 'id' subcommand
subp.add_parser('id', help='displays the current session id')
# 'file' subcommand
subp.add_parser('file', help='displays the current history filename')
# 'info' subcommand
info = subp.add_parser('info', help=('displays information about the '
'current history'))
info.add_argument('--json', dest='json', default=False, action='store_true',
help='print in JSON format')
# diff
@ -319,19 +357,20 @@ def _create_parser():
# gc
gcp = subp.add_parser('gc', help='launches a new history garbage collector')
gcp.add_argument('--size', nargs=2, dest='size', default=None,
help='next two arguments represent the history size and units, '
'eg "--size 8128 commands"')
help=('next two arguments represent the history size and '
'units; e.g. "--size 8128 commands"'))
bgcp = gcp.add_mutually_exclusive_group()
bgcp.add_argument('--blocking', dest='blocking', default=True, action='store_true',
help='ensures that the gc blocks the main thread, default True')
bgcp.add_argument('--blocking', dest='blocking', default=True,
action='store_true',
help=('ensures that the gc blocks the main thread, '
'default True'))
bgcp.add_argument('--non-blocking', dest='blocking', action='store_false',
help='makes the gc non-blocking, and thus return sooner')
# set and return
_HIST_PARSER = p
return p
def _show(ns, hist):
"""Show the requested portion of the shell history."""
idx = ensure_int_or_slice(ns.n)
if len(hist) == 0:
return
@ -354,6 +393,7 @@ def _show(ns, hist):
def _info(ns, hist):
"""Display information about the shell history."""
data = OrderedDict()
data['sessionid'] = str(hist.sessionid)
data['filename'] = hist.filename
@ -370,6 +410,7 @@ def _info(ns, hist):
def _gc(ns, hist):
"""Start and monitor garbage collection of the shell history."""
hist.gc = gc = HistoryGC(wait_for_shell=False, size=ns.size)
if ns.blocking:
while gc.is_alive():
@ -385,6 +426,7 @@ _MAIN_ACTIONS = {
'gc': _gc,
}
def _main(hist, args):
"""This implements the history CLI."""
if not args or args[0] not in _MAIN_ACTIONS:
@ -392,13 +434,13 @@ def _main(hist, args):
if (args[0] == 'show' and len(args) > 1 and args[-1].startswith('-') and
args[-1][1].isdigit()):
args.insert(-1, '--') # ensure parsing stops before a negative int
parser = _create_parser()
ns = parser.parse_args(args)
ns = _create_parser().parse_args(args)
if ns.action is None: # apply default action
ns = parser.parse_args(['show'] + args)
ns = _create_parser().parse_args(['show'] + args)
_MAIN_ACTIONS[ns.action](ns, hist)
def main(args=None, stdin=None):
"""This is the history command entry point."""
_ = stdin
_main(builtins.__xonsh_history__, args)
_main(builtins.__xonsh_history__, args) # pylint: disable=no-member

View file

@ -1,5 +1,7 @@
"""Import hooks for importing xonsh source files. This module registers
the hooks it defines when it is imported.
# -*- coding: utf-8 -*-
"""Import hooks for importing xonsh source files.
This module registers the hooks it defines when it is imported.
"""
import os
import sys

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tools for inspecting Python objects.
This file was forked from the IPython project:

View file

@ -1,6 +1,5 @@
"""
Job control for the xonsh shell.
"""
# -*- coding: utf-8 -*-
"""Job control for the xonsh shell."""
import os
import sys
import time
@ -101,10 +100,9 @@ else:
return
job = builtins.__xonsh_all_jobs__[act]
obj = job['obj']
if job['bg']:
if job['bg'] and job['status'] == 'running':
return
pgrp = job['pgrp']
obj.done = False
# give the terminal over to the fg process
_give_terminal_to(pgrp)
@ -112,10 +110,17 @@ else:
# (this hook was added because vim, emacs, etc, seem to need to have
# the terminal when they receive SIGCONT from the "fg" command)
if signal_to_send is not None:
if signal_to_send == signal.SIGCONT:
job['status'] = 'running'
os.kill(obj.pid, signal_to_send)
if job['bg']:
_give_terminal_to(_shell_pgrp)
return
_, wcode = os.waitpid(obj.pid, os.WUNTRACED)
if os.WIFSTOPPED(wcode):
obj.done = True
if os.WIFSTOPPED(s):
job['bg'] = True
job['status'] = 'stopped'
print() # get a newline because ^Z will have been printed
@ -275,6 +280,6 @@ def bg(args, stdin=None):
builtins.__xonsh_active_job__ = act
job = builtins.__xonsh_all_jobs__[act]
job['bg'] = True
job['status'] = 'running'
# When the SIGCONT is sent job['status'] is set to running.
print_one_job(act)
wait_for_active_job(_continue(job['obj']))

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Hooks for Jupyter Xonsh Kernel."""
import io
import builtins

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Implements a lazy JSON file class that wraps around json data."""
import io
import weakref

View file

@ -1,5 +1,7 @@
"""
Lexer for xonsh code, written using a hybrid of ``tokenize`` and PLY
# -*- coding: utf-8 -*-
"""Lexer for xonsh code.
Written using a hybrid of ``tokenize`` and PLY.
"""
import re
import sys

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""The main xonsh script."""
import os
import sys

View file

@ -1,6 +1,7 @@
"""
Tools to open ``*.py`` files as Unicode, using the encoding specified within the
file, as per PEP 263.
# -*- coding: utf-8 -*-
"""Tools to open ``*.py`` files as Unicode.
Uses the encoding specified within the file, as per PEP 263.
Much of the code is taken from the tokenize module in Python 3.2.

View file

@ -1,4 +1,5 @@
"""Implements the xonsh parser"""
# -*- coding: utf-8 -*-
"""Implements the xonsh parser."""
import os
import sys
from collections import Iterable, Sequence, Mapping
@ -213,8 +214,8 @@ class Parser(object):
'or_and_test', 'and_not_test', 'comp_op_expr', 'pipe_xor_expr',
'xor_and_expr', 'ampersand_shift_expr', 'shift_arith_expr',
'pm_term', 'op_factor', 'trailer', 'comma_subscript',
'comma_expr_or_star_expr', 'comma_test', 'comma_argument', 'comma_item',
'attr_period_name', 'test_comma', 'equals_yield_expr_or_testlist',
'comma_expr_or_star_expr', 'comma_test', 'comma_argument', 'comma_item',
'attr_period_name', 'test_comma', 'equals_yield_expr_or_testlist',
'test_or_star_expr']
if VER_MAJOR_MINOR <= VER_3_4:
list_rules += ['argument_comma',]
@ -807,7 +808,7 @@ class Parser(object):
elif lenp == 4:
op = self._augassign_op[p2]
if op is None:
self._parse_error('operation {0!r} not supported'.format(p2),
self._parse_error('operation {0!r} not supported'.format(p2),
self.currloc(lineno=p.lineno, column=p.lexpos))
p0 = ast.AugAssign(target=p1[0], op=op(), value=p[3],
lineno=self.lineno, col_offset=self.col)
@ -1282,7 +1283,7 @@ class Parser(object):
def p_async_stmt(self, p):
"""async_stmt : async_funcdef
| async_with_stmt
| async_with_stmt
| async_for_stmt
"""
p[0] = p[1]
@ -1585,7 +1586,7 @@ class Parser(object):
"""
op = self._term_binops[p[1]]
if op is None:
self._parse_error('operation {0!r} not supported'.format(p[1]),
self._parse_error('operation {0!r} not supported'.format(p[1]),
self.currloc(lineno=p.lineno, column=p.lexpos))
p[0] = [op(), p[2]]
@ -1686,7 +1687,7 @@ class Parser(object):
assert False
leader = p0
if lenp == 4:
p0 = ast.Await(value=p0, ctx=ast.Load(), lineno=self.lineno,
p0 = ast.Await(value=p0, ctx=ast.Load(), lineno=self.lineno,
col_offset=self.col)
p[0] = p0
@ -1961,7 +1962,7 @@ class Parser(object):
)
def p_item(self, p):
lenp = len(p)
if lenp == 4:
if lenp == 4:
p0 = [p[1], p[3]]
elif lenp == 3:
p0 = [None, p[2]]
@ -2144,7 +2145,7 @@ class Parser(object):
"""argument : test
| test comp_for
| test EQUALS test
""",
""",
v35 = \
"""argument : test_or_star_expr
| test comp_for

View file

@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-
"""
Python advanced pretty printer. This pretty printer is intended to
replace the old `pprint` python module which does not allow developers
to provide their own pretty print callbacks.
"""Python advanced pretty printer.
This pretty printer is intended to replace the old `pprint` python module which
does not allow developers to provide their own pretty print callbacks.
This module is based on ruby's `prettyprint.rb` library by `Tanaka Akira`.

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Interface for running Python functions as subprocess-mode commands.
Code for several helper methods in the `ProcProxy` class have been reproduced
@ -358,8 +359,8 @@ class TeePTYProc(object):
def __init__(self, args, stdin=None, stdout=None, stderr=None, preexec_fn=None,
env=None, universal_newlines=False):
"""Popen replacement for running commands in teed psuedo-terminal. This
allows the capturing AND streaming of stdout and stderr. Availability
"""Popen replacement for running commands in teed psuedo-terminal. This
allows the capturing AND streaming of stdout and stderr. Availability
is Linux-only.
"""
self.stdin = stdin
@ -368,7 +369,7 @@ class TeePTYProc(object):
self.args = args
self.universal_newlines = universal_newlines
xenv = builtins.__xonsh_env__ if hasattr(builtins, '__xonsh_env__') \
else {'XONSH_ENCODING': 'utf-8',
else {'XONSH_ENCODING': 'utf-8',
'XONSH_ENCODING_ERRORS': 'strict'}
if not os.access(args[0], os.F_OK):

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Completer implementation to use with prompt_toolkit."""
from prompt_toolkit.completion import Completer, Completion

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""History object for use with prompt_toolkit."""
import os

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Key bindings for prompt_toolkit xonsh shell."""
import builtins

View file

@ -1,4 +1,5 @@
"""The prompt_toolkit based xonsh shell"""
# -*- coding: utf-8 -*-
"""The prompt_toolkit based xonsh shell."""
import os
import builtins
from warnings import warn
@ -50,7 +51,7 @@ class PromptToolkitShell(BaseShell):
self.pt_completer = PromptToolkitCompleter(self.completer, self.ctx)
self.key_bindings_manager = KeyBindingManager(
enable_auto_suggest_bindings=True,
enable_search=True,
enable_search=True,
enable_abort_and_exit_bindings=True,
enable_vi_mode=Condition(lambda cli: builtins.__xonsh_env__.get('VI_MODE')),
enable_open_in_editor=True)

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Hooks for pygments syntax highlighting."""
from pygments.lexer import inherit, bygroups, using, this
from pygments.token import Name, Generic, Keyword, Text, String

View file

@ -1,4 +1,5 @@
"""The readline based xonsh shell"""
# -*- coding: utf-8 -*-
"""The readline based xonsh shell."""
import os
import sys
import time
@ -11,7 +12,7 @@ from collections import deque
from xonsh import lazyjson
from xonsh.base_shell import BaseShell
from xonsh.tools import ON_WINDOWS
from xonsh.tools import ON_WINDOWS, print_color
RL_COMPLETION_SUPPRESS_APPEND = RL_LIB = None
RL_CAN_RESIZE = False
@ -204,9 +205,7 @@ class ReadlineShell(BaseShell, Cmd):
if inserter is not None:
readline.set_pre_input_hook(None)
else:
self.stdout.write(self.prompt.replace('\001', '')
.replace('\002', ''))
self.stdout.flush()
print_color(self.prompt, file=self.stdout)
if line is not None:
os.write(self.stdin.fileno(), line.encode())
if not exec_now:
@ -255,7 +254,7 @@ class ReadlineShell(BaseShell, Cmd):
class ReadlineHistoryAdder(Thread):
def __init__(self, wait_for_gc=True, *args, **kwargs):
"""Thread responsible for adding inputs from history to the current readline
"""Thread responsible for adding inputs from history to the current readline
instance. May wait for the history garbage collector to finish.
"""
super(ReadlineHistoryAdder, self).__init__(*args, **kwargs)

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Tools to replay xonsh history files."""
import time
import builtins
@ -31,7 +32,7 @@ class Replayer(object):
self._lj.close()
def replay(self, merge_envs=DEFAULT_MERGE_ENVS, target=None):
"""Replays the history specified, returns the history object where the code
"""Replays the history specified, returns the history object where the code
was executed.
Parameters
@ -40,7 +41,7 @@ class Replayer(object):
Describes how to merge the environments, in order of increasing precednce.
Available strings are 'replay' and 'native'. The 'replay' env comes from the
history file that we are replaying. The 'native' env comes from what this
instance of xonsh was started up with. Instead of a string, a dict or other
instance of xonsh was started up with. Instead of a string, a dict or other
mapping may be passed in as well. Defaults to ('replay', 'native').
target : str, optional
Path to new history file.
@ -85,8 +86,8 @@ def _create_parser(p=None):
if p_was_none:
from argparse import ArgumentParser
p = ArgumentParser('replay', description='replays a xonsh history file')
p.add_argument('--merge-envs', dest='merge_envs', default=DEFAULT_MERGE_ENVS,
nargs='+',
p.add_argument('--merge-envs', dest='merge_envs', default=DEFAULT_MERGE_ENVS,
nargs='+',
help="Describes how to merge the environments, in order of "
"increasing precedence. Available strings are 'replay' and "
"'native'. The 'replay' env comes from the history file that we "
@ -95,7 +96,7 @@ def _create_parser(p=None):
"be passed in. Defaults to '--merge-envs replay native'.")
p.add_argument('--json', dest='json', default=False, action='store_true',
help='print history info in JSON format')
p.add_argument('-o', '--target', dest='target', default=None,
p.add_argument('-o', '--target', dest='target', default=None,
help='path to new history file')
p.add_argument('path', help='path to replay history file')
if p_was_none:

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""The xonsh shell"""
import builtins
from warnings import warn

View file

@ -1,8 +1,9 @@
# -*- coding: utf-8 -*-
"""This implements a psuedo-TTY that tees its output into a Python buffer.
This file was forked from a version distibuted under an MIT license and
Copyright (c) 2011 Joshua D. Bartlett.
See http://sqizit.bartletts.id.au/2011/02/14/pseudo-terminals-in-python/ for
Copyright (c) 2011 Joshua D. Bartlett.
See http://sqizit.bartletts.id.au/2011/02/14/pseudo-terminals-in-python/ for
more information.
"""
import io
@ -45,7 +46,7 @@ def _findfirst(s, substrs):
def _on_main_thread():
"""Checks if we are on the main thread or not. Duplicated from xonsh.tools
"""Checks if we are on the main thread or not. Duplicated from xonsh.tools
here so that this module only relies on the Python standrd library.
"""
return threading.current_thread() is threading.main_thread()
@ -67,7 +68,7 @@ def _find_error_code(e):
class TeePTY(object):
"""This class is a pseudo terminal that tees the stdout and stderr into a buffer."""
def __init__(self, bufsize=1024, remove_color=True, encoding='utf-8',
def __init__(self, bufsize=1024, remove_color=True, encoding='utf-8',
errors='strict'):
"""
Parameters
@ -92,7 +93,7 @@ class TeePTY(object):
self._temp_stdin = None
def __str__(self):
return self.buffer.getvalue().decode(encoding=self.encoding,
return self.buffer.getvalue().decode(encoding=self.encoding,
errors=self.errors)
def __del__(self):
@ -179,7 +180,7 @@ class TeePTY(object):
self._set_pty_size()
def _set_pty_size(self):
"""Sets the window size of the child pty based on the window size of
"""Sets the window size of the child pty based on the window size of
our own controlling terminal.
"""
assert self.master_fd is not None
@ -199,7 +200,7 @@ class TeePTY(object):
try:
rfds, wfds, xfds = select.select([master_fd, pty.STDIN_FILENO], [], [])
except OSError as e:
if e.errno == 4: # Interrupted system call.
if e.errno == 4: # Interrupted system call.
continue # This happens at terminal resize.
if master_fd in rfds:
data = os.read(master_fd, bufsize)
@ -215,15 +216,15 @@ class TeePTY(object):
elif flag is not None:
if flag in START_ALTERNATE_MODE:
# This code is executed when the child process switches the terminal into
# alternate mode. The line below assumes that the user has opened vim,
# alternate mode. The line below assumes that the user has opened vim,
# less, or similar, and writes writes to stdin.
d0 = data[:i]
d0 = data[:i]
self._in_alt_mode = True
d1 = self._sanatize_data(data[i+len(flag):])
data = d0 + d1
elif flag in END_ALTERNATE_MODE:
# This code is executed when the child process switches the terminal back
# out of alternate mode. The line below assumes that the user has
# out of alternate mode. The line below assumes that the user has
# returned to the command prompt.
self._in_alt_mode = False
data = self._sanatize_data(data[i+len(flag):])
@ -249,7 +250,7 @@ class TeePTY(object):
data = data[n:]
def _stdin_filename(self, stdin):
if stdin is None:
if stdin is None:
rtn = None
elif isinstance(stdin, io.FileIO) and os.path.isfile(stdin.name):
rtn = stdin.name
@ -292,15 +293,15 @@ class TeePTY(object):
raise ValueError('stdin not understood {0!r}'.format(stdin))
def _delay_for_pipe(self, env=None, delay=None):
# This delay is sometimes needed because the temporary stdin file that
# This delay is sometimes needed because the temporary stdin file that
# is being written (the pipe) may not have even hits its first flush()
# call by the time the spawned process starts up and determines there
# call by the time the spawned process starts up and determines there
# is nothing in the file. The spawn can thus exit, without doing any
# real work. Consider the case of piping something into grep:
#
# $ ps aux | grep root
#
# grep will exit on EOF and so there is a race between the buffersize
# grep will exit on EOF and so there is a race between the buffersize
# and flushing the temporary file and grep. However, this race is not
# always meaningful. Pagers, for example, update when the file is written
# to. So what is important is that we start the spawned process ASAP:
@ -311,7 +312,7 @@ class TeePTY(object):
# not blocking and letting the spawned process have enough to work with
# such that it doesn't exit prematurely. Unfortunately, there is no
# way to know a priori how big the file is, how long the spawned process
# will run for, etc. Thus as user-definable delay let's the user
# will run for, etc. Thus as user-definable delay let's the user
# find something that works for them.
if delay is None:
delay = (env or os.environ).get('TEEPTY_PIPE_DELAY', -1.0)

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Timing related functionality for the xonsh shell.
The following time_it alias and Timer was forked from the IPython project:

View file

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
"""Misc. xonsh tools.
The following implementations were forked from the IPython project:
@ -450,7 +451,7 @@ def on_main_thread():
@contextmanager
def swap(namespace, name, value, default=NotImplemented):
"""Swaps a current variable name in a namespace for another value, and then
"""Swaps a current variable name in a namespace for another value, and then
replaces it when the context is exited.
"""
old = getattr(namespace, name, default)
@ -591,11 +592,11 @@ CANON_HISTORY_UNITS = frozenset(['commands', 'files', 's', 'b'])
HISTORY_UNITS = {
'': ('commands', int),
'c': ('commands', int),
'cmd': ('commands', int),
'cmds': ('commands', int),
'command': ('commands', int),
'commands': ('commands', int),
'c': ('commands', int),
'cmd': ('commands', int),
'cmds': ('commands', int),
'command': ('commands', int),
'commands': ('commands', int),
'f': ('files', int),
'files': ('files', int),
's': ('s', float),
@ -757,3 +758,11 @@ def format_prompt_for_prompt_toolkit(prompt):
token_names = [get_xonsh_color_names(c) for c in parts[::2]]
cstyles = [_make_style(c) for c in token_names]
return token_names, cstyles, strings
def print_color(string, file=sys.stdout):
"""Print strings that contain xonsh.tools.TERM_COLORS values. By default
`sys.stdout` is used as the output stream but an alternate can be specified
by the `file` keyword argument."""
print(string.format(**TERM_COLORS).replace('\001', '').replace('\002', ''),
file=file)