mirror of
https://github.com/xonsh/xonsh.git
synced 2025-03-04 08:24:40 +01:00
Merge pull request #1669 from blahgeek/xontrib-gitstatus
Informative git status on prompt
This commit is contained in:
commit
93d659f3fb
21 changed files with 702 additions and 362 deletions
|
@ -32,6 +32,7 @@ For those of you who want the gritty details.
|
|||
history
|
||||
completer
|
||||
completers/index
|
||||
prompt/index
|
||||
shell
|
||||
base_shell
|
||||
readline_shell
|
||||
|
|
11
docs/api/prompt/base.rst
Normal file
11
docs/api/prompt/base.rst
Normal file
|
@ -0,0 +1,11 @@
|
|||
.. _xonsh_prompt_base:
|
||||
|
||||
***********************************************
|
||||
Base prompt formatter (``xonsh.prompt.base``)
|
||||
***********************************************
|
||||
|
||||
.. automodule:: xonsh.prompt.base
|
||||
:members:
|
||||
:undoc-members:
|
||||
:inherited-members:
|
||||
|
11
docs/api/prompt/cwd.rst
Normal file
11
docs/api/prompt/cwd.rst
Normal file
|
@ -0,0 +1,11 @@
|
|||
.. _xonsh_prompt_cwd:
|
||||
|
||||
*****************************************************
|
||||
CWD related prompt formatter (``xonsh.prompt.cwd``)
|
||||
*****************************************************
|
||||
|
||||
.. automodule:: xonsh.prompt.cwd
|
||||
:members:
|
||||
:undoc-members:
|
||||
:inherited-members:
|
||||
|
11
docs/api/prompt/env.rst
Normal file
11
docs/api/prompt/env.rst
Normal file
|
@ -0,0 +1,11 @@
|
|||
.. _xonsh_prompt_env:
|
||||
|
||||
****************************************************
|
||||
Virtualenv prompt formatter (``xonsh.prompt.env``)
|
||||
****************************************************
|
||||
|
||||
.. automodule:: xonsh.prompt.env
|
||||
:members:
|
||||
:undoc-members:
|
||||
:inherited-members:
|
||||
|
11
docs/api/prompt/gitstatus.rst
Normal file
11
docs/api/prompt/gitstatus.rst
Normal file
|
@ -0,0 +1,11 @@
|
|||
.. _xonsh_prompt_gitstatus:
|
||||
|
||||
***********************************************************************
|
||||
Informative git status prompt formatter (``xonsh.prompt.gitstatus``)
|
||||
***********************************************************************
|
||||
|
||||
.. automodule:: xonsh.prompt.gitstatus
|
||||
:members:
|
||||
:undoc-members:
|
||||
:inherited-members:
|
||||
|
16
docs/api/prompt/index.rst
Normal file
16
docs/api/prompt/index.rst
Normal file
|
@ -0,0 +1,16 @@
|
|||
.. _api_prompt:
|
||||
|
||||
======================
|
||||
Prompt formatter API
|
||||
======================
|
||||
Modules that provides ``FORMATTER_DICT``.
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 1
|
||||
|
||||
base
|
||||
cwd
|
||||
env
|
||||
gitstatus
|
||||
jobs
|
||||
vc_branch
|
11
docs/api/prompt/jobs.rst
Normal file
11
docs/api/prompt/jobs.rst
Normal file
|
@ -0,0 +1,11 @@
|
|||
.. _xonsh_prompt_jobs:
|
||||
|
||||
***********************************************
|
||||
Jobs prompt formatter (``xonsh.prompt.jobs``)
|
||||
***********************************************
|
||||
|
||||
.. automodule:: xonsh.prompt.jobs
|
||||
:members:
|
||||
:undoc-members:
|
||||
:inherited-members:
|
||||
|
11
docs/api/prompt/vc_branch.rst
Normal file
11
docs/api/prompt/vc_branch.rst
Normal file
|
@ -0,0 +1,11 @@
|
|||
.. _xonsh_prompt_vc_branch:
|
||||
|
||||
***************************************************************************
|
||||
Version control branch info prompt formatter (``xonsh.prompt.vc_branch``)
|
||||
***************************************************************************
|
||||
|
||||
.. automodule:: xonsh.prompt.vc_branch
|
||||
:members:
|
||||
:undoc-members:
|
||||
:inherited-members:
|
||||
|
|
@ -269,7 +269,7 @@ that ``ls -l`` is meant to be run in subprocess-mode?
|
|||
For any given line that only contains an expression statement (expr-stmt,
|
||||
see the Python AST docs for more information), if all the names cannot
|
||||
be found as current variables xonsh will try to parse the line as a
|
||||
subprocess command instead. In the above, if ``ls`` and ``l`` are not
|
||||
subprocess command instead. In the above, if ``ls`` and ``l`` are not
|
||||
variables, then subprocess mode will be attempted. If parsing in subprocess
|
||||
mode fails, then the line is left in Python-mode.
|
||||
|
||||
|
@ -1235,6 +1235,8 @@ By default, the following variables are available for use:
|
|||
foreground, if any.
|
||||
* ``vte_new_tab_cwd``: Issues VTE escape sequence for opening new tabs in the
|
||||
current working directory on some linux terminals. This is not usually needed.
|
||||
* ``gitstatus``: Informative git status, like ``[master|MERGING|+1…2]``, you
|
||||
may use `$XONSH_GITSTATUS_* <envvars.html>`_ to customize the styling.
|
||||
|
||||
You can also color your prompt easily by inserting keywords such as ``{GREEN}``
|
||||
or ``{BOLD_BLUE}``. Colors have the form shown below:
|
||||
|
|
15
news/gitstatus.rst
Normal file
15
news/gitstatus.rst
Normal file
|
@ -0,0 +1,15 @@
|
|||
**Added:**
|
||||
|
||||
* New ``FORMATTER_DICT`` entry ``gitstatus`` to provides informative git status
|
||||
|
||||
**Changed:**
|
||||
|
||||
* All prompt formatter functions moved to ``xonsh.prompt`` subpackage
|
||||
|
||||
**Deprecated:** None
|
||||
|
||||
**Removed:** None
|
||||
|
||||
**Fixed:** None
|
||||
|
||||
**Security:** None
|
9
setup.py
9
setup.py
|
@ -36,7 +36,9 @@ except ImportError:
|
|||
|
||||
|
||||
TABLES = ['xonsh/lexer_table.py', 'xonsh/parser_table.py',
|
||||
'xonsh/__amalgam__.py', 'xonsh/completers/__amalgam__.py']
|
||||
'xonsh/__amalgam__.py',
|
||||
'xonsh/completers/__amalgam__.py',
|
||||
'xonsh/prompt/__amalgam__.py']
|
||||
|
||||
|
||||
def clean_tables():
|
||||
|
@ -60,7 +62,7 @@ def amalgamate_source():
|
|||
print('Could not import amalgamate, skipping.', file=sys.stderr)
|
||||
return
|
||||
amalgamate.main(['amalgamate', '--debug=XONSH_DEBUG', 'xonsh',
|
||||
'xonsh.completers'])
|
||||
'xonsh.completers', 'xonsh.prompt'])
|
||||
sys.path.pop(0)
|
||||
|
||||
|
||||
|
@ -290,7 +292,8 @@ def main():
|
|||
platforms='Cross Platform',
|
||||
classifiers=['Programming Language :: Python :: 3'],
|
||||
packages=['xonsh', 'xonsh.ply', 'xonsh.ptk', 'xonsh.parsers',
|
||||
'xonsh.xoreutils', 'xontrib', 'xonsh.completers'],
|
||||
'xonsh.xoreutils', 'xontrib',
|
||||
'xonsh.completers', 'xonsh.prompt'],
|
||||
package_dir={'xonsh': 'xonsh', 'xontrib': 'xontrib'},
|
||||
package_data={'xonsh': ['*.json', '*.githash'], 'xontrib': ['*.xsh']},
|
||||
cmdclass=cmdclass,
|
||||
|
|
|
@ -65,12 +65,12 @@ else:
|
|||
_sys.modules['xonsh.inspectors'] = __amalgam__
|
||||
environ = __amalgam__
|
||||
_sys.modules['xonsh.environ'] = __amalgam__
|
||||
tracer = __amalgam__
|
||||
_sys.modules['xonsh.tracer'] = __amalgam__
|
||||
base_shell = __amalgam__
|
||||
_sys.modules['xonsh.base_shell'] = __amalgam__
|
||||
replay = __amalgam__
|
||||
_sys.modules['xonsh.replay'] = __amalgam__
|
||||
tracer = __amalgam__
|
||||
_sys.modules['xonsh.tracer'] = __amalgam__
|
||||
xonfig = __amalgam__
|
||||
_sys.modules['xonsh.xonfig'] = __amalgam__
|
||||
aliases = __amalgam__
|
||||
|
|
378
xonsh/environ.py
378
xonsh/environ.py
|
@ -3,10 +3,7 @@
|
|||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
import socket
|
||||
import shutil
|
||||
import string
|
||||
import pprint
|
||||
import locale
|
||||
|
@ -15,27 +12,25 @@ import warnings
|
|||
import traceback
|
||||
import itertools
|
||||
import contextlib
|
||||
import subprocess
|
||||
import collections
|
||||
import collections.abc as cabc
|
||||
|
||||
from xonsh import __version__ as XONSH_VERSION
|
||||
from xonsh.jobs import get_next_task
|
||||
from xonsh.lazyasd import LazyObject, lazyobject
|
||||
from xonsh.codecache import run_script_with_cache
|
||||
from xonsh.dirstack import _get_cwd
|
||||
from xonsh.foreign_shells import load_foreign_envs
|
||||
from xonsh.platform import (
|
||||
BASH_COMPLETIONS_DEFAULT, DEFAULT_ENCODING, PATH_DEFAULT,
|
||||
ON_WINDOWS, ON_ANACONDA, ON_LINUX, ON_CYGWIN,
|
||||
ON_WINDOWS, ON_LINUX, ON_CYGWIN,
|
||||
)
|
||||
from xonsh.tools import (
|
||||
is_superuser, always_true, always_false, ensure_string, is_env_path,
|
||||
always_true, always_false, ensure_string, is_env_path,
|
||||
str_to_env_path, env_path_to_str, is_bool, to_bool, bool_to_str,
|
||||
is_history_tuple, to_history_tuple, history_tuple_to_str, is_float,
|
||||
is_string, is_string_or_callable,
|
||||
is_completions_display_value, to_completions_display_value,
|
||||
is_string_set, csv_to_set, set_to_csv, get_sep, is_int, is_bool_seq,
|
||||
is_string_set, csv_to_set, set_to_csv, is_int, is_bool_seq,
|
||||
to_bool_or_int, bool_or_int_to_str,
|
||||
csv_to_bool_seq, bool_seq_to_csv, DefaultNotGiven, print_exception,
|
||||
setup_win_unicode_console, intensify_colors_on_win_setter, format_color,
|
||||
|
@ -44,6 +39,7 @@ from xonsh.tools import (
|
|||
is_nonstring_seq_of_strings, pathsep_to_upper_seq,
|
||||
seq_to_upper_pathsep,
|
||||
)
|
||||
import xonsh.prompt.base as prompt
|
||||
|
||||
|
||||
@lazyobject
|
||||
|
@ -254,7 +250,7 @@ def DEFAULT_VALUES():
|
|||
'DYNAMIC_CWD_WIDTH': (float('inf'), 'c'),
|
||||
'EXPAND_ENV_VARS': True,
|
||||
'FORCE_POSIX_PATHS': False,
|
||||
'FORMATTER_DICT': dict(FORMATTER_DICT),
|
||||
'FORMATTER_DICT': dict(prompt.FORMATTER_DICT),
|
||||
'FUZZY_PATH_COMPLETION': True,
|
||||
'GLOB_SORTED': True,
|
||||
'HISTCONTROL': set(),
|
||||
|
@ -425,7 +421,7 @@ def DEFAULT_DOCS():
|
|||
'Dictionary containing variables to be used when formatting $PROMPT '
|
||||
"and $TITLE. See 'Customizing the Prompt' "
|
||||
'http://xon.sh/tutorial.html#customizing-the-prompt',
|
||||
configurable=False, default='xonsh.environ.FORMATTER_DICT'),
|
||||
configurable=False, default='xonsh.prompt.FORMATTER_DICT'),
|
||||
'FUZZY_PATH_COMPLETION': VarDocs(
|
||||
"Toggles 'fuzzy' matching of paths for tab completion, which is only "
|
||||
"used as a fallback if no other completions succeed but can be used "
|
||||
|
@ -618,6 +614,20 @@ def DEFAULT_DOCS():
|
|||
"(https://docs.python.org/3/library/codecs.html#error-handlers) "
|
||||
'for more information and available options.',
|
||||
default="'surrogateescape'"),
|
||||
'XONSH_GITSTATUS_*': VarDocs(
|
||||
'Symbols for gitstatus prompt. Default values are: \n\n'
|
||||
'* XONSH_GITSTATUS_HASH: `:`\n'
|
||||
'* XONSH_GITSTATUS_BRANCH: `{CYAN}`\n'
|
||||
'* XONSH_GITSTATUS_OPERATION: `{CYAN}`\n'
|
||||
'* XONSH_GITSTATUS_STAGED: `{RED}●`\n'
|
||||
'* XONSH_GITSTATUS_CONFLICTS: `{RED}×`\n'
|
||||
'* XONSH_GITSTATUS_CHANGED: `{BLUE}+`\n'
|
||||
'* XONSH_GITSTATUS_UNTRACKED: `…`\n'
|
||||
'* XONSH_GITSTATUS_STASHED: `⚑`\n'
|
||||
'* XONSH_GITSTATUS_CLEAN: `{BOLD_GREEN}✓`\n'
|
||||
'* XONSH_GITSTATUS_AHEAD: `↑·`\n'
|
||||
'* XONSH_GITSTATUS_BEHIND: `↓·`\n'
|
||||
),
|
||||
'XONSH_HISTORY_FILE': VarDocs(
|
||||
'Location of history file (deprecated).',
|
||||
configurable=False, default="'~/.xonsh_history'"),
|
||||
|
@ -903,348 +913,6 @@ def locate_binary(name):
|
|||
return builtins.__xonsh_commands_cache__.locate_binary(name)
|
||||
|
||||
|
||||
def get_git_branch():
|
||||
"""Attempts to find the current git branch. If no branch is found, then
|
||||
an empty string is returned. If a timeout occured, the timeout exception
|
||||
(subprocess.TimeoutExpired) is returned.
|
||||
"""
|
||||
branch = None
|
||||
env = builtins.__xonsh_env__
|
||||
cwd = env['PWD']
|
||||
denv = env.detype()
|
||||
vcbt = env['VC_BRANCH_TIMEOUT']
|
||||
if not ON_WINDOWS:
|
||||
prompt_scripts = ['/usr/lib/git-core/git-sh-prompt',
|
||||
'/usr/local/etc/bash_completion.d/git-prompt.sh']
|
||||
for script in prompt_scripts:
|
||||
# note that this is about 10x faster than bash -i "__git_ps1"
|
||||
inp = 'source {}; __git_ps1 "${{1:-%s}}"'.format(script)
|
||||
try:
|
||||
branch = subprocess.check_output(['bash'], cwd=cwd, input=inp,
|
||||
stderr=subprocess.PIPE, timeout=vcbt, env=denv,
|
||||
universal_newlines=True)
|
||||
break
|
||||
except subprocess.TimeoutExpired as e:
|
||||
branch = e
|
||||
break
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
continue
|
||||
# fall back to using the git binary if the above failed
|
||||
if branch is None:
|
||||
cmd = ['git', 'rev-parse', '--abbrev-ref', 'HEAD']
|
||||
try:
|
||||
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt, env=denv,
|
||||
stderr=subprocess.PIPE, universal_newlines=True)
|
||||
if ON_WINDOWS and len(s) == 0:
|
||||
# Workaround for a bug in ConEMU/cmder, retry without redirection
|
||||
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt,
|
||||
env=denv, universal_newlines=True)
|
||||
branch = s.strip()
|
||||
except subprocess.TimeoutExpired as e:
|
||||
branch = e
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
branch = None
|
||||
return branch
|
||||
|
||||
|
||||
def _get_parent_dir_for(path, dir_name, timeout):
|
||||
# walk up the directory tree to see if we are inside an hg repo
|
||||
# the timeout makes sure that we don't thrash the file system
|
||||
previous_path = ''
|
||||
t0 = time.time()
|
||||
while path != previous_path and ((time.time() - t0) < timeout):
|
||||
if os.path.isdir(os.path.join(path, dir_name)):
|
||||
return path
|
||||
previous_path = path
|
||||
path, _ = os.path.split(path)
|
||||
return (path == previous_path)
|
||||
|
||||
|
||||
def get_hg_branch(cwd=None, root=None):
|
||||
env = builtins.__xonsh_env__
|
||||
cwd = env['PWD']
|
||||
root = _get_parent_dir_for(cwd, '.hg', env['VC_BRANCH_TIMEOUT'])
|
||||
if not isinstance(root, str):
|
||||
# Bail if we are not in a repo or we timed out
|
||||
if root:
|
||||
return None
|
||||
else:
|
||||
return subprocess.TimeoutExpired(['hg'], env['VC_BRANCH_TIMEOUT'])
|
||||
# get branch name
|
||||
branch_path = os.path.sep.join([root, '.hg', 'branch'])
|
||||
if os.path.exists(branch_path):
|
||||
with open(branch_path, 'r') as branch_file:
|
||||
branch = branch_file.read()
|
||||
else:
|
||||
branch = 'default'
|
||||
# add bookmark, if we can
|
||||
bookmark_path = os.path.sep.join([root, '.hg', 'bookmarks.current'])
|
||||
if os.path.exists(bookmark_path):
|
||||
with open(bookmark_path, 'r') as bookmark_file:
|
||||
active_bookmark = bookmark_file.read()
|
||||
branch = "{0}, {1}".format(*(b.strip(os.linesep) for b in
|
||||
(branch, active_bookmark)))
|
||||
else:
|
||||
branch = branch.strip(os.linesep)
|
||||
return branch
|
||||
|
||||
|
||||
_FIRST_BRANCH_TIMEOUT = True
|
||||
|
||||
|
||||
def _first_branch_timeout_message():
|
||||
global _FIRST_BRANCH_TIMEOUT
|
||||
sbtm = builtins.__xonsh_env__['SUPPRESS_BRANCH_TIMEOUT_MESSAGE']
|
||||
if not _FIRST_BRANCH_TIMEOUT or sbtm:
|
||||
return
|
||||
_FIRST_BRANCH_TIMEOUT = False
|
||||
print('xonsh: branch timeout: computing the branch name, color, or both '
|
||||
'timed out while formatting the prompt. You may avoid this by '
|
||||
'increaing the value of $VC_BRANCH_TIMEOUT or by removing branch '
|
||||
'fields, like {curr_branch}, from your $PROMPT. See the FAQ '
|
||||
'for more details. This message will be suppressed for the remainder '
|
||||
'of this session. To suppress this message permanently, set '
|
||||
'$SUPPRESS_BRANCH_TIMEOUT_MESSAGE = True in your xonshrc file.',
|
||||
file=sys.stderr)
|
||||
|
||||
|
||||
def current_branch(pad=NotImplemented):
|
||||
"""Gets the branch for a current working directory. Returns an empty string
|
||||
if the cwd is not a repository. This currently only works for git and hg
|
||||
and should be extended in the future. If a timeout occurred, the string
|
||||
'<branch-timeout>' is returned.
|
||||
"""
|
||||
if pad is not NotImplemented:
|
||||
warnings.warn("The pad argument of current_branch has no effect now "
|
||||
"and will be removed in the future")
|
||||
branch = None
|
||||
cmds = builtins.__xonsh_commands_cache__
|
||||
if cmds.lazy_locate_binary('git') or cmds.is_empty():
|
||||
branch = get_git_branch()
|
||||
if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and not branch:
|
||||
branch = get_hg_branch()
|
||||
if isinstance(branch, subprocess.TimeoutExpired):
|
||||
branch = '<branch-timeout>'
|
||||
_first_branch_timeout_message()
|
||||
return branch or None
|
||||
|
||||
|
||||
def git_dirty_working_directory(cwd=None, include_untracked=False):
|
||||
"""Returns whether or not the git directory is dirty. If this could not
|
||||
be determined (timeout, file not sound, etc.) then this returns None.
|
||||
"""
|
||||
cmd = ['git', 'status', '--porcelain']
|
||||
if include_untracked:
|
||||
cmd.append('--untracked-files=normal')
|
||||
else:
|
||||
cmd.append('--untracked-files=no')
|
||||
env = builtins.__xonsh_env__
|
||||
cwd = env['PWD']
|
||||
denv = env.detype()
|
||||
vcbt = env['VC_BRANCH_TIMEOUT']
|
||||
try:
|
||||
s = subprocess.check_output(cmd, stderr=subprocess.PIPE, cwd=cwd,
|
||||
timeout=vcbt, universal_newlines=True,
|
||||
env=denv)
|
||||
if ON_WINDOWS and len(s) == 0:
|
||||
# Workaround for a bug in ConEMU/cmder, retry without redirection
|
||||
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt,
|
||||
env=denv, universal_newlines=True)
|
||||
return bool(s)
|
||||
except (subprocess.CalledProcessError, subprocess.TimeoutExpired,
|
||||
FileNotFoundError):
|
||||
return None
|
||||
|
||||
|
||||
def hg_dirty_working_directory():
|
||||
"""Computes whether or not the mercurial working directory is dirty or not.
|
||||
If this cannot be deterimined, None is returned.
|
||||
"""
|
||||
env = builtins.__xonsh_env__
|
||||
cwd = env['PWD']
|
||||
denv = env.detype()
|
||||
vcbt = env['VC_BRANCH_TIMEOUT']
|
||||
# Override user configurations settings and aliases
|
||||
denv['HGRCPATH'] = ''
|
||||
try:
|
||||
s = subprocess.check_output(['hg', 'identify', '--id'],
|
||||
stderr=subprocess.PIPE, cwd=cwd, timeout=vcbt,
|
||||
universal_newlines=True, env=denv)
|
||||
return s.strip(os.linesep).endswith('+')
|
||||
except (subprocess.CalledProcessError, subprocess.TimeoutExpired,
|
||||
FileNotFoundError):
|
||||
return None
|
||||
|
||||
|
||||
def dirty_working_directory(cwd=None):
|
||||
"""Returns a boolean as to whether there are uncommitted files in version
|
||||
control repository we are inside. If this cannot be determined, returns
|
||||
None. Currently supports git and hg.
|
||||
"""
|
||||
dwd = None
|
||||
cmds = builtins.__xonsh_commands_cache__
|
||||
if cmds.lazy_locate_binary('git') or cmds.is_empty():
|
||||
dwd = git_dirty_working_directory()
|
||||
if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and (dwd is None):
|
||||
dwd = hg_dirty_working_directory()
|
||||
return dwd
|
||||
|
||||
|
||||
def branch_color():
|
||||
"""Return red if the current branch is dirty, yellow if the dirtiness can
|
||||
not be determined, and green if it clean. These are bold, intense colors
|
||||
for the foreground.
|
||||
"""
|
||||
dwd = dirty_working_directory()
|
||||
if dwd is None:
|
||||
color = '{BOLD_INTENSE_YELLOW}'
|
||||
elif dwd:
|
||||
color = '{BOLD_INTENSE_RED}'
|
||||
else:
|
||||
color = '{BOLD_INTENSE_GREEN}'
|
||||
return color
|
||||
|
||||
|
||||
def branch_bg_color():
|
||||
"""Return red if the current branch is dirty, yellow if the dirtiness can
|
||||
not be determined, and green if it clean. These are bacground colors.
|
||||
"""
|
||||
dwd = dirty_working_directory()
|
||||
if dwd is None:
|
||||
color = '{BACKGROUND_YELLOW}'
|
||||
elif dwd:
|
||||
color = '{BACKGROUND_RED}'
|
||||
else:
|
||||
color = '{BACKGROUND_GREEN}'
|
||||
return color
|
||||
|
||||
|
||||
def _replace_home(x):
|
||||
if ON_WINDOWS:
|
||||
home = (builtins.__xonsh_env__['HOMEDRIVE'] +
|
||||
builtins.__xonsh_env__['HOMEPATH'][0])
|
||||
if x.startswith(home):
|
||||
x = x.replace(home, '~', 1)
|
||||
|
||||
if builtins.__xonsh_env__.get('FORCE_POSIX_PATHS'):
|
||||
x = x.replace(os.sep, os.altsep)
|
||||
|
||||
return x
|
||||
else:
|
||||
home = builtins.__xonsh_env__['HOME']
|
||||
if x.startswith(home):
|
||||
x = x.replace(home, '~', 1)
|
||||
return x
|
||||
|
||||
|
||||
def _replace_home_cwd():
|
||||
return _replace_home(builtins.__xonsh_env__['PWD'])
|
||||
|
||||
|
||||
def _collapsed_pwd():
|
||||
sep = get_sep()
|
||||
pwd = _replace_home_cwd().split(sep)
|
||||
l = len(pwd)
|
||||
leader = sep if l > 0 and len(pwd[0]) == 0 else ''
|
||||
base = [i[0] if ix != l - 1 else i for ix, i in enumerate(pwd) if len(i) > 0]
|
||||
return leader + sep.join(base)
|
||||
|
||||
|
||||
def _dynamically_collapsed_pwd():
|
||||
"""Return the compact current working directory. It respects the
|
||||
environment variable DYNAMIC_CWD_WIDTH.
|
||||
"""
|
||||
originial_path = _replace_home_cwd()
|
||||
target_width, units = builtins.__xonsh_env__['DYNAMIC_CWD_WIDTH']
|
||||
if target_width == float('inf'):
|
||||
return originial_path
|
||||
if (units == '%'):
|
||||
cols, _ = shutil.get_terminal_size()
|
||||
target_width = (cols * target_width) // 100
|
||||
sep = get_sep()
|
||||
pwd = originial_path.split(sep)
|
||||
last = pwd.pop()
|
||||
remaining_space = target_width - len(last)
|
||||
# Reserve space for separators
|
||||
remaining_space_for_text = remaining_space - len(pwd)
|
||||
parts = []
|
||||
for i in range(len(pwd)):
|
||||
part = pwd[i]
|
||||
part_len = int(min(len(part), max(1, remaining_space_for_text // (len(pwd) - i))))
|
||||
remaining_space_for_text -= part_len
|
||||
reduced_part = part[0:part_len]
|
||||
parts.append(reduced_part)
|
||||
parts.append(last)
|
||||
full = sep.join(parts)
|
||||
# If even if displaying one letter per dir we are too long
|
||||
if (len(full) > target_width):
|
||||
# We truncate the left most part
|
||||
full = "..." + full[int(-target_width) + 3:]
|
||||
# if there is not even a single separator we still
|
||||
# want to display at least the beginning of the directory
|
||||
if full.find(sep) == -1:
|
||||
full = ("..." + sep + last)[0:int(target_width)]
|
||||
return full
|
||||
|
||||
|
||||
def _current_job():
|
||||
j = get_next_task()
|
||||
if j is not None:
|
||||
if not j['bg']:
|
||||
cmd = j['cmds'][-1]
|
||||
s = cmd[0]
|
||||
if s == 'sudo' and len(cmd) > 1:
|
||||
s = cmd[1]
|
||||
return s
|
||||
|
||||
|
||||
def env_name(pre_chars='(', post_chars=')'):
|
||||
"""Extract the current environment name from $VIRTUAL_ENV or
|
||||
$CONDA_DEFAULT_ENV if that is set
|
||||
"""
|
||||
env_path = builtins.__xonsh_env__.get('VIRTUAL_ENV', '')
|
||||
if len(env_path) == 0 and ON_ANACONDA:
|
||||
env_path = builtins.__xonsh_env__.get('CONDA_DEFAULT_ENV', '')
|
||||
env_name = os.path.basename(env_path)
|
||||
if env_name:
|
||||
return pre_chars + env_name + post_chars
|
||||
|
||||
|
||||
if ON_WINDOWS:
|
||||
USER = 'USERNAME'
|
||||
else:
|
||||
USER = 'USER'
|
||||
|
||||
|
||||
def vte_new_tab_cwd():
|
||||
"""This prints an escape squence that tells VTE terminals the hostname
|
||||
and pwd. This should not be needed in most cases, but sometimes is for
|
||||
certain Linux terminals that do not read the PWD from the environment
|
||||
on startup. Note that this does not return a string, it simply prints
|
||||
and flushes the escape sequence to stdout directly.
|
||||
"""
|
||||
env = builtins.__xonsh_env__
|
||||
t = '\033]7;file://{}{}\007'
|
||||
s = t.format(env.get('HOSTNAME'), env.get('PWD'))
|
||||
print(s, end='', flush=True)
|
||||
|
||||
|
||||
FORMATTER_DICT = LazyObject(lambda: dict(
|
||||
user=os.environ.get(USER, '<user>'),
|
||||
prompt_end='#' if is_superuser() else '$',
|
||||
hostname=socket.gethostname().split('.', 1)[0],
|
||||
cwd=_dynamically_collapsed_pwd,
|
||||
cwd_dir=lambda: os.path.dirname(_replace_home_cwd()),
|
||||
cwd_base=lambda: os.path.basename(_replace_home_cwd()),
|
||||
short_cwd=_collapsed_pwd,
|
||||
curr_branch=current_branch,
|
||||
branch_color=branch_color,
|
||||
branch_bg_color=branch_bg_color,
|
||||
current_job=_current_job,
|
||||
env_name=env_name,
|
||||
vte_new_tab_cwd=vte_new_tab_cwd,
|
||||
), globals(), 'FORMATTER_DICT')
|
||||
|
||||
_FORMATTER = LazyObject(string.Formatter, globals(), '_FORMATTER')
|
||||
|
||||
|
||||
|
@ -1257,7 +925,8 @@ def is_template_string(template, formatter_dict=None):
|
|||
return False
|
||||
included_names.discard(None)
|
||||
if formatter_dict is None:
|
||||
fmtter = builtins.__xonsh_env__.get('FORMATTER_DICT', FORMATTER_DICT)
|
||||
fmtter = builtins.__xonsh_env__.get('FORMATTER_DICT',
|
||||
prompt.FORMATTER_DICT)
|
||||
else:
|
||||
fmtter = formatter_dict
|
||||
known_names = set(fmtter.keys())
|
||||
|
@ -1266,7 +935,8 @@ def is_template_string(template, formatter_dict=None):
|
|||
|
||||
def _get_fmtter(formatter_dict=None):
|
||||
if formatter_dict is None:
|
||||
fmtter = builtins.__xonsh_env__.get('FORMATTER_DICT', FORMATTER_DICT)
|
||||
fmtter = builtins.__xonsh_env__.get('FORMATTER_DICT',
|
||||
prompt.FORMATTER_DICT)
|
||||
else:
|
||||
fmtter = formatter_dict
|
||||
return fmtter
|
||||
|
|
26
xonsh/prompt/__init__.py
Normal file
26
xonsh/prompt/__init__.py
Normal file
|
@ -0,0 +1,26 @@
|
|||
# amalgamate exclude
|
||||
import os as _os
|
||||
if _os.getenv('XONSH_DEBUG', ''):
|
||||
pass
|
||||
else:
|
||||
import sys as _sys
|
||||
try:
|
||||
from xonsh.prompt import __amalgam__
|
||||
cwd = __amalgam__
|
||||
_sys.modules['xonsh.prompt.cwd'] = __amalgam__
|
||||
env = __amalgam__
|
||||
_sys.modules['xonsh.prompt.env'] = __amalgam__
|
||||
gitstatus = __amalgam__
|
||||
_sys.modules['xonsh.prompt.gitstatus'] = __amalgam__
|
||||
job = __amalgam__
|
||||
_sys.modules['xonsh.prompt.job'] = __amalgam__
|
||||
vc_branch = __amalgam__
|
||||
_sys.modules['xonsh.prompt.vc_branch'] = __amalgam__
|
||||
base = __amalgam__
|
||||
_sys.modules['xonsh.prompt.base'] = __amalgam__
|
||||
del __amalgam__
|
||||
except ImportError:
|
||||
pass
|
||||
del _sys
|
||||
del _os
|
||||
# amalgamate end
|
38
xonsh/prompt/base.py
Normal file
38
xonsh/prompt/base.py
Normal file
|
@ -0,0 +1,38 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""Base prompt, provides FORMATTER_DICT"""
|
||||
|
||||
import os
|
||||
import socket
|
||||
|
||||
import xonsh.lazyasd as xl
|
||||
import xonsh.tools as xt
|
||||
import xonsh.platform as xp
|
||||
|
||||
|
||||
from xonsh.prompt.cwd import (
|
||||
_collapsed_pwd, _replace_home_cwd, _dynamically_collapsed_pwd
|
||||
)
|
||||
from xonsh.prompt.job import _current_job
|
||||
from xonsh.prompt.env import (env_name, vte_new_tab_cwd)
|
||||
from xonsh.prompt.vc_branch import (
|
||||
current_branch, branch_color, branch_bg_color
|
||||
)
|
||||
from xonsh.prompt.gitstatus import gitstatus_prompt
|
||||
|
||||
|
||||
FORMATTER_DICT = xl.LazyObject(lambda: dict(
|
||||
user=os.environ.get('USERNAME' if xp.ON_WINDOWS else 'USER', '<user>'),
|
||||
prompt_end='#' if xt.is_superuser() else '$',
|
||||
hostname=socket.gethostname().split('.', 1)[0],
|
||||
cwd=_dynamically_collapsed_pwd,
|
||||
cwd_dir=lambda: os.path.dirname(_replace_home_cwd()),
|
||||
cwd_base=lambda: os.path.basename(_replace_home_cwd()),
|
||||
short_cwd=_collapsed_pwd,
|
||||
curr_branch=current_branch,
|
||||
branch_color=branch_color,
|
||||
branch_bg_color=branch_bg_color,
|
||||
current_job=_current_job,
|
||||
env_name=env_name,
|
||||
vte_new_tab_cwd=vte_new_tab_cwd,
|
||||
gitstatus=gitstatus_prompt,
|
||||
), globals(), 'FORMATTER_DICT')
|
79
xonsh/prompt/cwd.py
Normal file
79
xonsh/prompt/cwd.py
Normal file
|
@ -0,0 +1,79 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""CWD related prompt formatter"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import builtins
|
||||
|
||||
import xonsh.tools as xt
|
||||
import xonsh.platform as xp
|
||||
|
||||
|
||||
def _replace_home(x):
|
||||
if xp.ON_WINDOWS:
|
||||
home = (builtins.__xonsh_env__['HOMEDRIVE'] +
|
||||
builtins.__xonsh_env__['HOMEPATH'][0])
|
||||
if x.startswith(home):
|
||||
x = x.replace(home, '~', 1)
|
||||
|
||||
if builtins.__xonsh_env__.get('FORCE_POSIX_PATHS'):
|
||||
x = x.replace(os.sep, os.altsep)
|
||||
|
||||
return x
|
||||
else:
|
||||
home = builtins.__xonsh_env__['HOME']
|
||||
if x.startswith(home):
|
||||
x = x.replace(home, '~', 1)
|
||||
return x
|
||||
|
||||
|
||||
def _replace_home_cwd():
|
||||
return _replace_home(builtins.__xonsh_env__['PWD'])
|
||||
|
||||
|
||||
def _collapsed_pwd():
|
||||
sep = xt.get_sep()
|
||||
pwd = _replace_home_cwd().split(sep)
|
||||
l = len(pwd)
|
||||
leader = sep if l > 0 and len(pwd[0]) == 0 else ''
|
||||
base = [i[0] if ix != l - 1 else i
|
||||
for ix, i in enumerate(pwd) if len(i) > 0]
|
||||
return leader + sep.join(base)
|
||||
|
||||
|
||||
def _dynamically_collapsed_pwd():
|
||||
"""Return the compact current working directory. It respects the
|
||||
environment variable DYNAMIC_CWD_WIDTH.
|
||||
"""
|
||||
originial_path = _replace_home_cwd()
|
||||
target_width, units = builtins.__xonsh_env__['DYNAMIC_CWD_WIDTH']
|
||||
if target_width == float('inf'):
|
||||
return originial_path
|
||||
if (units == '%'):
|
||||
cols, _ = shutil.get_terminal_size()
|
||||
target_width = (cols * target_width) // 100
|
||||
sep = xt.get_sep()
|
||||
pwd = originial_path.split(sep)
|
||||
last = pwd.pop()
|
||||
remaining_space = target_width - len(last)
|
||||
# Reserve space for separators
|
||||
remaining_space_for_text = remaining_space - len(pwd)
|
||||
parts = []
|
||||
for i in range(len(pwd)):
|
||||
part = pwd[i]
|
||||
part_len = int(min(len(part),
|
||||
max(1, remaining_space_for_text // (len(pwd) - i))))
|
||||
remaining_space_for_text -= part_len
|
||||
reduced_part = part[0:part_len]
|
||||
parts.append(reduced_part)
|
||||
parts.append(last)
|
||||
full = sep.join(parts)
|
||||
# If even if displaying one letter per dir we are too long
|
||||
if (len(full) > target_width):
|
||||
# We truncate the left most part
|
||||
full = "..." + full[int(-target_width) + 3:]
|
||||
# if there is not even a single separator we still
|
||||
# want to display at least the beginning of the directory
|
||||
if full.find(sep) == -1:
|
||||
full = ("..." + sep + last)[0:int(target_width)]
|
||||
return full
|
32
xonsh/prompt/env.py
Normal file
32
xonsh/prompt/env.py
Normal file
|
@ -0,0 +1,32 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""Prompt formatter for virtualenv and others"""
|
||||
|
||||
import os
|
||||
import builtins
|
||||
|
||||
import xonsh.platform as xp
|
||||
|
||||
|
||||
def env_name(pre_chars='(', post_chars=')'):
|
||||
"""Extract the current environment name from $VIRTUAL_ENV or
|
||||
$CONDA_DEFAULT_ENV if that is set
|
||||
"""
|
||||
env_path = builtins.__xonsh_env__.get('VIRTUAL_ENV', '')
|
||||
if len(env_path) == 0 and xp.ON_ANACONDA:
|
||||
env_path = builtins.__xonsh_env__.get('CONDA_DEFAULT_ENV', '')
|
||||
env_name = os.path.basename(env_path)
|
||||
if env_name:
|
||||
return pre_chars + env_name + post_chars
|
||||
|
||||
|
||||
def vte_new_tab_cwd():
|
||||
"""This prints an escape squence that tells VTE terminals the hostname
|
||||
and pwd. This should not be needed in most cases, but sometimes is for
|
||||
certain Linux terminals that do not read the PWD from the environment
|
||||
on startup. Note that this does not return a string, it simply prints
|
||||
and flushes the escape sequence to stdout directly.
|
||||
"""
|
||||
env = builtins.__xonsh_env__
|
||||
t = '\033]7;file://{}{}\007'
|
||||
s = t.format(env.get('HOSTNAME'), env.get('PWD'))
|
||||
print(s, end='', flush=True)
|
149
xonsh/prompt/gitstatus.py
Normal file
149
xonsh/prompt/gitstatus.py
Normal file
|
@ -0,0 +1,149 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""Informative git status prompt formatter"""
|
||||
|
||||
import os
|
||||
import builtins
|
||||
import subprocess
|
||||
|
||||
import xonsh.lazyasd as xl
|
||||
|
||||
|
||||
def _check_output(*args, **kwargs):
|
||||
kwargs.update(dict(env=builtins.__xonsh_env__.detype(),
|
||||
stderr=subprocess.DEVNULL,
|
||||
timeout=builtins.__xonsh_env__['VC_BRANCH_TIMEOUT'],
|
||||
universal_newlines=True
|
||||
))
|
||||
return subprocess.check_output(*args, **kwargs)
|
||||
|
||||
|
||||
@xl.lazyobject
|
||||
def _DEFS():
|
||||
DEFS = {
|
||||
'HASH': ':',
|
||||
'BRANCH': '{CYAN}',
|
||||
'OPERATION': '{CYAN}',
|
||||
'STAGED': '{RED}●',
|
||||
'CONFLICTS': '{RED}×',
|
||||
'CHANGED': '{BLUE}+',
|
||||
'UNTRACKED': '…',
|
||||
'STASHED': '⚑',
|
||||
'CLEAN': '{BOLD_GREEN}✓',
|
||||
'AHEAD': '↑·',
|
||||
'BEHIND': '↓·',
|
||||
}
|
||||
return DEFS
|
||||
|
||||
|
||||
def _get_def(key):
|
||||
return builtins.__xonsh_env__.get('XONSH_GITSTATUS_' + key) or _DEFS[key]
|
||||
|
||||
|
||||
def _get_tag_or_hash():
|
||||
tag = _check_output(['git', 'describe', '--exact-match']).strip()
|
||||
if tag:
|
||||
return tag
|
||||
hash_ = _check_output(['git', 'rev-parse', '--short', 'HEAD']).strip()
|
||||
return _get_def('HASH') + hash_
|
||||
|
||||
|
||||
def _get_stash(gitdir):
|
||||
try:
|
||||
with open(os.path.join(gitdir, 'logs/refs/stash')) as f:
|
||||
return sum(1 for _ in f)
|
||||
except IOError:
|
||||
return 0
|
||||
|
||||
|
||||
def _gitoperation(gitdir):
|
||||
files = (
|
||||
('rebase-merge', 'REBASE'),
|
||||
('rebase-apply', 'AM/REBASE'),
|
||||
('MERGE_HEAD', 'MERGING'),
|
||||
('CHERRY_PICK_HEAD', 'CHERRY-PICKING'),
|
||||
('REVERT_HEAD', 'REVERTING'),
|
||||
('BISECT_LOG', 'BISECTING'),
|
||||
)
|
||||
return [f[1] for f in files
|
||||
if os.path.exists(os.path.join(gitdir, f[0]))]
|
||||
|
||||
|
||||
def gitstatus():
|
||||
"""Return (branch name, number of ahead commit, number of behind commit,
|
||||
untracked number, changed number, conflicts number,
|
||||
staged number, stashed number, operation)"""
|
||||
status = _check_output(['git', 'status', '--porcelain', '--branch'])
|
||||
branch = ''
|
||||
num_ahead, num_behind = 0, 0
|
||||
untracked, changed, conflicts, staged = 0, 0, 0, 0
|
||||
for line in status.splitlines():
|
||||
if line.startswith('##'):
|
||||
line = line[2:].strip()
|
||||
if 'Initial commit on' in line:
|
||||
branch = line.split()[-1]
|
||||
elif 'no branch' in line:
|
||||
branch = _get_tag_or_hash()
|
||||
elif '...' not in line:
|
||||
branch = line
|
||||
else:
|
||||
branch, rest = line.split('...')
|
||||
if ' ' in rest:
|
||||
divergence = rest.split(' ', 1)[-1]
|
||||
divergence = divergence.strip('[]')
|
||||
for div in divergence.split(', '):
|
||||
if 'ahead' in div:
|
||||
num_ahead = int(div[len('ahead '):].strip())
|
||||
elif 'behind' in div:
|
||||
num_behind = int(div[len('behind '):].strip())
|
||||
elif line.startswith('??'):
|
||||
untracked += 1
|
||||
else:
|
||||
if len(line) > 1 and line[1] == 'M':
|
||||
changed += 1
|
||||
|
||||
if len(line) > 0 and line[0] == 'U':
|
||||
conflicts += 1
|
||||
elif len(line) > 0 and line[0] != ' ':
|
||||
staged += 1
|
||||
|
||||
gitdir = _check_output(['git', 'rev-parse', '--git-dir']).strip()
|
||||
stashed = _get_stash(gitdir)
|
||||
operations = _gitoperation(gitdir)
|
||||
|
||||
return (branch, num_ahead, num_behind,
|
||||
untracked, changed, conflicts, staged, stashed,
|
||||
operations)
|
||||
|
||||
|
||||
def gitstatus_prompt():
|
||||
"""Return str `BRANCH|OPERATOR|numbers`"""
|
||||
try:
|
||||
(branch, num_ahead, num_behind,
|
||||
untracked, changed, conflicts, staged, stashed,
|
||||
operations) = gitstatus()
|
||||
except subprocess.SubprocessError:
|
||||
return None
|
||||
|
||||
ret = _get_def('BRANCH') + branch
|
||||
if num_ahead > 0:
|
||||
ret += _get_def('AHEAD') + str(num_ahead)
|
||||
if num_behind > 0:
|
||||
ret += _get_def('BEHIND') + str(num_behind)
|
||||
if operations:
|
||||
ret += _get_def('OPERATION') + '|' + '|'.join(operations)
|
||||
ret += '|'
|
||||
if staged > 0:
|
||||
ret += _get_def('STAGED') + str(staged) + '{NO_COLOR}'
|
||||
if conflicts > 0:
|
||||
ret += _get_def('CONFLICTS') + str(conflicts) + '{NO_COLOR}'
|
||||
if changed > 0:
|
||||
ret += _get_def('CHANGED') + str(changed) + '{NO_COLOR}'
|
||||
if untracked > 0:
|
||||
ret += _get_def('UNTRACKED') + str(untracked) + '{NO_COLOR}'
|
||||
if stashed > 0:
|
||||
ret += _get_def('STASHED') + str(stashed) + '{NO_COLOR}'
|
||||
if staged + conflicts + changed + untracked + stashed == 0:
|
||||
ret += _get_def('CLEAN') + '{NO_COLOR}'
|
||||
ret += '{NO_COLOR}'
|
||||
|
||||
return ret
|
15
xonsh/prompt/job.py
Normal file
15
xonsh/prompt/job.py
Normal file
|
@ -0,0 +1,15 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""Prompt formatter for current jobs"""
|
||||
|
||||
import xonsh.jobs as xj
|
||||
|
||||
|
||||
def _current_job():
|
||||
j = xj.get_next_task()
|
||||
if j is not None:
|
||||
if not j['bg']:
|
||||
cmd = j['cmds'][-1]
|
||||
s = cmd[0]
|
||||
if s == 'sudo' and len(cmd) > 1:
|
||||
s = cmd[1]
|
||||
return s
|
227
xonsh/prompt/vc_branch.py
Normal file
227
xonsh/prompt/vc_branch.py
Normal file
|
@ -0,0 +1,227 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""Prompt formatter for simple version control branchs"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import builtins
|
||||
import warnings
|
||||
import subprocess
|
||||
|
||||
import xonsh.platform as xp
|
||||
|
||||
|
||||
def get_git_branch():
|
||||
"""Attempts to find the current git branch. If no branch is found, then
|
||||
an empty string is returned. If a timeout occured, the timeout exception
|
||||
(subprocess.TimeoutExpired) is returned.
|
||||
"""
|
||||
branch = None
|
||||
env = builtins.__xonsh_env__
|
||||
cwd = env['PWD']
|
||||
denv = env.detype()
|
||||
vcbt = env['VC_BRANCH_TIMEOUT']
|
||||
if not xp.ON_WINDOWS:
|
||||
prompt_scripts = ['/usr/lib/git-core/git-sh-prompt',
|
||||
'/usr/local/etc/bash_completion.d/git-prompt.sh']
|
||||
for script in prompt_scripts:
|
||||
# note that this is about 10x faster than bash -i "__git_ps1"
|
||||
inp = 'source {}; __git_ps1 "${{1:-%s}}"'.format(script)
|
||||
try:
|
||||
branch = subprocess.check_output(['bash'], cwd=cwd, input=inp,
|
||||
stderr=subprocess.PIPE, timeout=vcbt, env=denv,
|
||||
universal_newlines=True)
|
||||
break
|
||||
except subprocess.TimeoutExpired as e:
|
||||
branch = e
|
||||
break
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
continue
|
||||
# fall back to using the git binary if the above failed
|
||||
if branch is None:
|
||||
cmd = ['git', 'rev-parse', '--abbrev-ref', 'HEAD']
|
||||
try:
|
||||
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt, env=denv,
|
||||
stderr=subprocess.PIPE, universal_newlines=True)
|
||||
if xp.ON_WINDOWS and len(s) == 0:
|
||||
# Workaround for a bug in ConEMU/cmder, retry without redirection
|
||||
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt,
|
||||
env=denv, universal_newlines=True)
|
||||
branch = s.strip()
|
||||
except subprocess.TimeoutExpired as e:
|
||||
branch = e
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
branch = None
|
||||
return branch
|
||||
|
||||
|
||||
def _get_parent_dir_for(path, dir_name, timeout):
|
||||
# walk up the directory tree to see if we are inside an hg repo
|
||||
# the timeout makes sure that we don't thrash the file system
|
||||
previous_path = ''
|
||||
t0 = time.time()
|
||||
while path != previous_path and ((time.time() - t0) < timeout):
|
||||
if os.path.isdir(os.path.join(path, dir_name)):
|
||||
return path
|
||||
previous_path = path
|
||||
path, _ = os.path.split(path)
|
||||
return (path == previous_path)
|
||||
|
||||
|
||||
def get_hg_branch(cwd=None, root=None):
|
||||
env = builtins.__xonsh_env__
|
||||
cwd = env['PWD']
|
||||
root = _get_parent_dir_for(cwd, '.hg', env['VC_BRANCH_TIMEOUT'])
|
||||
if not isinstance(root, str):
|
||||
# Bail if we are not in a repo or we timed out
|
||||
if root:
|
||||
return None
|
||||
else:
|
||||
return subprocess.TimeoutExpired(['hg'], env['VC_BRANCH_TIMEOUT'])
|
||||
# get branch name
|
||||
branch_path = os.path.sep.join([root, '.hg', 'branch'])
|
||||
if os.path.exists(branch_path):
|
||||
with open(branch_path, 'r') as branch_file:
|
||||
branch = branch_file.read()
|
||||
else:
|
||||
branch = 'default'
|
||||
# add bookmark, if we can
|
||||
bookmark_path = os.path.sep.join([root, '.hg', 'bookmarks.current'])
|
||||
if os.path.exists(bookmark_path):
|
||||
with open(bookmark_path, 'r') as bookmark_file:
|
||||
active_bookmark = bookmark_file.read()
|
||||
branch = "{0}, {1}".format(*(b.strip(os.linesep) for b in
|
||||
(branch, active_bookmark)))
|
||||
else:
|
||||
branch = branch.strip(os.linesep)
|
||||
return branch
|
||||
|
||||
|
||||
_FIRST_BRANCH_TIMEOUT = True
|
||||
|
||||
|
||||
def _first_branch_timeout_message():
|
||||
global _FIRST_BRANCH_TIMEOUT
|
||||
sbtm = builtins.__xonsh_env__['SUPPRESS_BRANCH_TIMEOUT_MESSAGE']
|
||||
if not _FIRST_BRANCH_TIMEOUT or sbtm:
|
||||
return
|
||||
_FIRST_BRANCH_TIMEOUT = False
|
||||
print('xonsh: branch timeout: computing the branch name, color, or both '
|
||||
'timed out while formatting the prompt. You may avoid this by '
|
||||
'increaing the value of $VC_BRANCH_TIMEOUT or by removing branch '
|
||||
'fields, like {curr_branch}, from your $PROMPT. See the FAQ '
|
||||
'for more details. This message will be suppressed for the remainder '
|
||||
'of this session. To suppress this message permanently, set '
|
||||
'$SUPPRESS_BRANCH_TIMEOUT_MESSAGE = True in your xonshrc file.',
|
||||
file=sys.stderr)
|
||||
|
||||
|
||||
def current_branch(pad=NotImplemented):
|
||||
"""Gets the branch for a current working directory. Returns an empty string
|
||||
if the cwd is not a repository. This currently only works for git and hg
|
||||
and should be extended in the future. If a timeout occurred, the string
|
||||
'<branch-timeout>' is returned.
|
||||
"""
|
||||
if pad is not NotImplemented:
|
||||
warnings.warn("The pad argument of current_branch has no effect now "
|
||||
"and will be removed in the future")
|
||||
branch = None
|
||||
cmds = builtins.__xonsh_commands_cache__
|
||||
if cmds.lazy_locate_binary('git') or cmds.is_empty():
|
||||
branch = get_git_branch()
|
||||
if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and not branch:
|
||||
branch = get_hg_branch()
|
||||
if isinstance(branch, subprocess.TimeoutExpired):
|
||||
branch = '<branch-timeout>'
|
||||
_first_branch_timeout_message()
|
||||
return branch or None
|
||||
|
||||
|
||||
def git_dirty_working_directory(cwd=None, include_untracked=False):
|
||||
"""Returns whether or not the git directory is dirty. If this could not
|
||||
be determined (timeout, file not sound, etc.) then this returns None.
|
||||
"""
|
||||
cmd = ['git', 'status', '--porcelain']
|
||||
if include_untracked:
|
||||
cmd.append('--untracked-files=normal')
|
||||
else:
|
||||
cmd.append('--untracked-files=no')
|
||||
env = builtins.__xonsh_env__
|
||||
cwd = env['PWD']
|
||||
denv = env.detype()
|
||||
vcbt = env['VC_BRANCH_TIMEOUT']
|
||||
try:
|
||||
s = subprocess.check_output(cmd, stderr=subprocess.PIPE, cwd=cwd,
|
||||
timeout=vcbt, universal_newlines=True,
|
||||
env=denv)
|
||||
if xp.ON_WINDOWS and len(s) == 0:
|
||||
# Workaround for a bug in ConEMU/cmder, retry without redirection
|
||||
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt,
|
||||
env=denv, universal_newlines=True)
|
||||
return bool(s)
|
||||
except (subprocess.CalledProcessError, subprocess.TimeoutExpired,
|
||||
FileNotFoundError):
|
||||
return None
|
||||
|
||||
|
||||
def hg_dirty_working_directory():
|
||||
"""Computes whether or not the mercurial working directory is dirty or not.
|
||||
If this cannot be deterimined, None is returned.
|
||||
"""
|
||||
env = builtins.__xonsh_env__
|
||||
cwd = env['PWD']
|
||||
denv = env.detype()
|
||||
vcbt = env['VC_BRANCH_TIMEOUT']
|
||||
# Override user configurations settings and aliases
|
||||
denv['HGRCPATH'] = ''
|
||||
try:
|
||||
s = subprocess.check_output(['hg', 'identify', '--id'],
|
||||
stderr=subprocess.PIPE, cwd=cwd, timeout=vcbt,
|
||||
universal_newlines=True, env=denv)
|
||||
return s.strip(os.linesep).endswith('+')
|
||||
except (subprocess.CalledProcessError, subprocess.TimeoutExpired,
|
||||
FileNotFoundError):
|
||||
return None
|
||||
|
||||
|
||||
def dirty_working_directory(cwd=None):
|
||||
"""Returns a boolean as to whether there are uncommitted files in version
|
||||
control repository we are inside. If this cannot be determined, returns
|
||||
None. Currently supports git and hg.
|
||||
"""
|
||||
dwd = None
|
||||
cmds = builtins.__xonsh_commands_cache__
|
||||
if cmds.lazy_locate_binary('git') or cmds.is_empty():
|
||||
dwd = git_dirty_working_directory()
|
||||
if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and (dwd is None):
|
||||
dwd = hg_dirty_working_directory()
|
||||
return dwd
|
||||
|
||||
|
||||
def branch_color():
|
||||
"""Return red if the current branch is dirty, yellow if the dirtiness can
|
||||
not be determined, and green if it clean. These are bold, intense colors
|
||||
for the foreground.
|
||||
"""
|
||||
dwd = dirty_working_directory()
|
||||
if dwd is None:
|
||||
color = '{BOLD_INTENSE_YELLOW}'
|
||||
elif dwd:
|
||||
color = '{BOLD_INTENSE_RED}'
|
||||
else:
|
||||
color = '{BOLD_INTENSE_GREEN}'
|
||||
return color
|
||||
|
||||
|
||||
def branch_bg_color():
|
||||
"""Return red if the current branch is dirty, yellow if the dirtiness can
|
||||
not be determined, and green if it clean. These are bacground colors.
|
||||
"""
|
||||
dwd = dirty_working_directory()
|
||||
if dwd is None:
|
||||
color = '{BACKGROUND_YELLOW}'
|
||||
elif dwd:
|
||||
color = '{BACKGROUND_RED}'
|
||||
else:
|
||||
color = '{BACKGROUND_GREEN}'
|
||||
return color
|
|
@ -12,8 +12,8 @@ from xonsh.lazyasd import LazyObject
|
|||
from xonsh.platform import HAS_PYGMENTS
|
||||
from xonsh.tools import DefaultNotGiven, print_color, normabspath, to_bool
|
||||
from xonsh.inspectors import find_file, getouterframes
|
||||
from xonsh.environ import _replace_home
|
||||
from xonsh.lazyimps import pygments, pyghooks
|
||||
import xonsh.prompt.cwd as prompt
|
||||
|
||||
|
||||
terminal = LazyObject(lambda: importlib.import_module(
|
||||
|
@ -98,7 +98,8 @@ COLOR_LINE = ('{{PURPLE}}{fname}{{BLUE}}:'
|
|||
|
||||
def tracer_format_line(fname, lineno, line, color=True, lexer=None, formatter=None):
|
||||
"""Formats a trace line suitable for printing."""
|
||||
fname = min(fname, _replace_home(fname), os.path.relpath(fname), key=len)
|
||||
fname = min(fname, prompt._replace_home(fname), os.path.relpath(fname),
|
||||
key=len)
|
||||
if not color:
|
||||
return COLORLESS_LINE.format(fname=fname, lineno=lineno, line=line)
|
||||
cline = COLOR_LINE.format(fname=fname, lineno=lineno)
|
||||
|
|
Loading…
Add table
Reference in a new issue