xonsh/tests/test_integrations.py

191 lines
4.6 KiB
Python
Raw Normal View History

2016-10-13 02:03:30 -04:00
import os
2016-10-13 02:41:53 -04:00
import sys
2016-10-13 02:45:37 -04:00
import shutil
2016-10-16 22:20:36 +03:00
import subprocess as sp
2016-10-13 02:03:30 -04:00
import pytest
2016-10-13 02:41:53 -04:00
import xonsh
from xonsh.platform import ON_WINDOWS
from tools import skip_if_on_windows
2016-10-16 21:08:37 +03:00
2016-10-13 02:41:53 -04:00
XONSH_PREFIX = xonsh.__file__
if 'site-packages' in XONSH_PREFIX:
# must be installed version of xonsh
num_up = 5
else:
# must be in source dir
num_up = 2
for i in range(num_up):
XONSH_PREFIX = os.path.dirname(XONSH_PREFIX)
PATH = os.path.join(os.path.dirname(__file__), 'bin') + os.pathsep + \
os.path.join(XONSH_PREFIX, 'bin') + os.pathsep + \
os.path.join(XONSH_PREFIX, 'Scripts') + os.pathsep + \
os.path.join(XONSH_PREFIX, 'scripts') + os.pathsep + \
os.path.dirname(sys.executable) + os.pathsep + \
os.environ['PATH']
2016-10-16 21:08:37 +03:00
2016-10-16 22:20:36 +03:00
def run_xonsh(cmd, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.STDOUT):
2016-10-16 21:08:37 +03:00
env = dict(os.environ)
env['PATH'] = PATH
env['XONSH_DEBUG'] = '1'
env['XONSH_SHOW_TRACEBACK'] = '1'
env['RAISE_SUBPROC_ERROR'] = '1'
2016-10-16 21:43:38 +03:00
env['PROMPT'] = ''
2016-10-16 21:08:37 +03:00
xonsh = 'xonsh.bat' if ON_WINDOWS else 'xon.sh'
xonsh = shutil.which(xonsh, path=PATH)
2016-10-16 22:20:36 +03:00
proc = sp.Popen([xonsh, '--no-rc'],
env=env,
stdin=stdin,
stdout=stdout,
stderr=stderr,
universal_newlines=True,
)
2016-10-16 21:08:37 +03:00
try:
out, err = proc.communicate(input=cmd, timeout=10)
2016-10-16 22:20:36 +03:00
except sp.TimeoutExpired:
2016-10-16 21:08:37 +03:00
proc.kill()
raise
return out, err, proc.returncode
2016-10-13 02:03:30 -04:00
#
# The following list contains a (stdin, stdout, returncode) tuples
#
ALL_PLATFORMS = [
# test calling a function alias
("""
def _f():
print('hello')
aliases['f'] = _f
f
""", "hello\n", 0),
# test redirecting a function alias
("""
def _f():
print('Wow Mom!')
aliases['f'] = _f
f > tttt
with open('tttt') as tttt:
s = tttt.read().strip()
print('REDIRECTED OUTPUT: ' + s)
""", "REDIRECTED OUTPUT: Wow Mom!\n", 0),
2016-10-13 03:20:42 -04:00
# test system exit in function alias
("""
import sys
def _f():
sys.exit(42)
aliases['f'] = _f
print(![f].returncode)
""", "42\n", 0),
2016-10-24 22:41:01 -04:00
# test uncaptured streaming alias,
# order actually printed in is non-deterministic
2016-10-13 22:53:06 -04:00
("""
def _test_stream(args, stdin, stdout, stderr):
2016-10-24 22:41:01 -04:00
print('hallo on stream', file=stderr)
print('hallo on stream', file=stdout)
2016-10-13 22:53:06 -04:00
return 1
aliases['test-stream'] = _test_stream
x = ![test-stream]
print(x.returncode)
2016-10-24 22:41:01 -04:00
""", "hallo on stream\nhallo on stream\n1\n", 0),
2016-10-13 22:53:06 -04:00
# test captured streaming alias
("""
def _test_stream(args, stdin, stdout, stderr):
print('hallo on err', file=stderr)
print('hallo on out', file=stdout)
return 1
aliases['test-stream'] = _test_stream
x = !(test-stream)
print(x.returncode)
""", "hallo on err\n1\n", 0),
2016-10-16 12:07:39 -04:00
# test piping aliases
("""
def dummy(args, inn, out, err):
out.write('hey!')
return 0
def dummy2(args, inn, out, err):
s = inn.read()
out.write(s.upper())
return 0
aliases['d'] = dummy
aliases['d2'] = dummy2
d | d2
""", "HEY!", 0),
2016-10-18 21:14:08 -04:00
# test output larger than most pipe buffers
("""
def _g(args, stdin=None):
for i in range(1000):
print('x' * 100)
aliases['g'] = _g
g
""", (("x"*100) + '\n') * 1000, 0),
2016-10-25 20:55:42 -04:00
# test piping 'real' command
("""
with open('tttt', 'w') as fp:
fp.write("Wow mom!\\n")
2016-10-26 00:40:47 -04:00
![cat tttt | wc]
2016-10-26 00:51:56 -04:00
""", ' 1 2 10\n' if ON_WINDOWS else " 1 2 9 <stdin>\n", 0),
# test double piping 'real' command
("""
with open('tttt', 'w') as fp:
fp.write("Wow mom!\\n")
![cat tttt | wc | wc]
2016-10-26 20:55:40 -04:00
""", ' 1 3 24\n' if ON_WINDOWS else " 1 4 16 <stdin>\n", 0),
2016-10-13 02:03:30 -04:00
]
@pytest.mark.parametrize('case', ALL_PLATFORMS)
def test_script(case):
script, exp_out, exp_rtn = case
2016-10-16 21:08:37 +03:00
out, err, rtn = run_xonsh(script)
2016-10-13 02:03:30 -04:00
assert exp_out == out
2016-10-16 21:08:37 +03:00
assert exp_rtn == rtn
2016-10-13 02:03:30 -04:00
@skip_if_on_windows
@pytest.mark.parametrize('cmd, fmt, exp', [
2016-10-26 00:40:47 -04:00
('pwd', None, lambda: os.getcwd() + '\n'),
('echo WORKING', None, 'WORKING\n'),
('ls -f', lambda out: out.splitlines().sort(), os.listdir().sort()),
])
def test_single_command(cmd, fmt, exp):
"""The ``fmt`` parameter is a function
that formats the output of cmd, can be None.
"""
2016-10-16 22:20:36 +03:00
out, err, rtn = run_xonsh(cmd, stderr=sp.DEVNULL)
if callable(fmt):
out = fmt(out)
2016-10-26 00:40:47 -04:00
if callable(exp):
exp = exp()
assert out == exp
2016-10-16 21:08:37 +03:00
assert rtn == 0
@skip_if_on_windows
2016-10-16 21:08:37 +03:00
@pytest.mark.parametrize('cmd, exp', [
2016-10-26 00:40:47 -04:00
('pwd', lambda: os.getcwd() + '\n'),
])
2016-10-16 21:08:37 +03:00
def test_redirect_out_to_file(cmd, exp, tmpdir):
outfile = tmpdir.mkdir('xonsh_test_dir').join('xonsh_test_file')
2016-11-14 21:42:43 -05:00
command = '{} > {}\n'.format(cmd, outfile)
2016-10-16 21:43:38 +03:00
out, _, _ = run_xonsh(command)
content = outfile.read()
2016-10-26 00:40:47 -04:00
if callable(exp):
exp = exp()
assert content == exp