move prompts from environ.py to prompt/

This commit is contained in:
BlahGeek 2016-09-03 12:55:52 +08:00
parent cbcaace4b3
commit 78bfee18c0
8 changed files with 432 additions and 353 deletions

View file

@ -36,7 +36,9 @@ except ImportError:
TABLES = ['xonsh/lexer_table.py', 'xonsh/parser_table.py',
'xonsh/__amalgam__.py', 'xonsh/completers/__amalgam__.py']
'xonsh/__amalgam__.py',
'xonsh/completers/__amalgam__.py',
'xonsh/prompt/__amalgam__.py']
def clean_tables():
@ -60,7 +62,7 @@ def amalgamate_source():
print('Could not import amalgamate, skipping.', file=sys.stderr)
return
amalgamate.main(['amalgamate', '--debug=XONSH_DEBUG', 'xonsh',
'xonsh.completers'])
'xonsh.completers', 'xonsh.prompt'])
sys.path.pop(0)
@ -290,7 +292,8 @@ def main():
platforms='Cross Platform',
classifiers=['Programming Language :: Python :: 3'],
packages=['xonsh', 'xonsh.ply', 'xonsh.ptk', 'xonsh.parsers',
'xonsh.xoreutils', 'xontrib', 'xonsh.completers'],
'xonsh.xoreutils', 'xontrib',
'xonsh.completers', 'xonsh.prompt'],
package_dir={'xonsh': 'xonsh', 'xontrib': 'xontrib'},
package_data={'xonsh': ['*.json', '*.githash'], 'xontrib': ['*.xsh']},
cmdclass=cmdclass,

View file

@ -3,10 +3,7 @@
import os
import re
import sys
import time
import json
import socket
import shutil
import string
import pprint
import locale
@ -15,27 +12,25 @@ import warnings
import traceback
import itertools
import contextlib
import subprocess
import collections
import collections.abc as cabc
from xonsh import __version__ as XONSH_VERSION
from xonsh.jobs import get_next_task
from xonsh.lazyasd import LazyObject, lazyobject
from xonsh.codecache import run_script_with_cache
from xonsh.dirstack import _get_cwd
from xonsh.foreign_shells import load_foreign_envs
from xonsh.platform import (
BASH_COMPLETIONS_DEFAULT, DEFAULT_ENCODING, PATH_DEFAULT,
ON_WINDOWS, ON_ANACONDA, ON_LINUX, ON_CYGWIN,
ON_WINDOWS, ON_LINUX, ON_CYGWIN,
)
from xonsh.tools import (
is_superuser, always_true, always_false, ensure_string, is_env_path,
always_true, always_false, ensure_string, is_env_path,
str_to_env_path, env_path_to_str, is_bool, to_bool, bool_to_str,
is_history_tuple, to_history_tuple, history_tuple_to_str, is_float,
is_string, is_string_or_callable,
is_completions_display_value, to_completions_display_value,
is_string_set, csv_to_set, set_to_csv, get_sep, is_int, is_bool_seq,
is_string_set, csv_to_set, set_to_csv, is_int, is_bool_seq,
to_bool_or_int, bool_or_int_to_str,
csv_to_bool_seq, bool_seq_to_csv, DefaultNotGiven, print_exception,
setup_win_unicode_console, intensify_colors_on_win_setter, format_color,
@ -45,6 +40,8 @@ from xonsh.tools import (
seq_to_upper_pathsep,
)
from xonsh.prompt.base import FORMATTER_DICT
@lazyobject
def LOCALE_CATS():
@ -903,348 +900,6 @@ def locate_binary(name):
return builtins.__xonsh_commands_cache__.locate_binary(name)
def get_git_branch():
"""Attempts to find the current git branch. If no branch is found, then
an empty string is returned. If a timeout occured, the timeout exception
(subprocess.TimeoutExpired) is returned.
"""
branch = None
env = builtins.__xonsh_env__
cwd = env['PWD']
denv = env.detype()
vcbt = env['VC_BRANCH_TIMEOUT']
if not ON_WINDOWS:
prompt_scripts = ['/usr/lib/git-core/git-sh-prompt',
'/usr/local/etc/bash_completion.d/git-prompt.sh']
for script in prompt_scripts:
# note that this is about 10x faster than bash -i "__git_ps1"
inp = 'source {}; __git_ps1 "${{1:-%s}}"'.format(script)
try:
branch = subprocess.check_output(['bash'], cwd=cwd, input=inp,
stderr=subprocess.PIPE, timeout=vcbt, env=denv,
universal_newlines=True)
break
except subprocess.TimeoutExpired as e:
branch = e
break
except (subprocess.CalledProcessError, FileNotFoundError):
continue
# fall back to using the git binary if the above failed
if branch is None:
cmd = ['git', 'rev-parse', '--abbrev-ref', 'HEAD']
try:
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt, env=denv,
stderr=subprocess.PIPE, universal_newlines=True)
if ON_WINDOWS and len(s) == 0:
# Workaround for a bug in ConEMU/cmder, retry without redirection
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt,
env=denv, universal_newlines=True)
branch = s.strip()
except subprocess.TimeoutExpired as e:
branch = e
except (subprocess.CalledProcessError, FileNotFoundError):
branch = None
return branch
def _get_parent_dir_for(path, dir_name, timeout):
# walk up the directory tree to see if we are inside an hg repo
# the timeout makes sure that we don't thrash the file system
previous_path = ''
t0 = time.time()
while path != previous_path and ((time.time() - t0) < timeout):
if os.path.isdir(os.path.join(path, dir_name)):
return path
previous_path = path
path, _ = os.path.split(path)
return (path == previous_path)
def get_hg_branch(cwd=None, root=None):
env = builtins.__xonsh_env__
cwd = env['PWD']
root = _get_parent_dir_for(cwd, '.hg', env['VC_BRANCH_TIMEOUT'])
if not isinstance(root, str):
# Bail if we are not in a repo or we timed out
if root:
return None
else:
return subprocess.TimeoutExpired(['hg'], env['VC_BRANCH_TIMEOUT'])
# get branch name
branch_path = os.path.sep.join([root, '.hg', 'branch'])
if os.path.exists(branch_path):
with open(branch_path, 'r') as branch_file:
branch = branch_file.read()
else:
branch = 'default'
# add bookmark, if we can
bookmark_path = os.path.sep.join([root, '.hg', 'bookmarks.current'])
if os.path.exists(bookmark_path):
with open(bookmark_path, 'r') as bookmark_file:
active_bookmark = bookmark_file.read()
branch = "{0}, {1}".format(*(b.strip(os.linesep) for b in
(branch, active_bookmark)))
else:
branch = branch.strip(os.linesep)
return branch
_FIRST_BRANCH_TIMEOUT = True
def _first_branch_timeout_message():
global _FIRST_BRANCH_TIMEOUT
sbtm = builtins.__xonsh_env__['SUPPRESS_BRANCH_TIMEOUT_MESSAGE']
if not _FIRST_BRANCH_TIMEOUT or sbtm:
return
_FIRST_BRANCH_TIMEOUT = False
print('xonsh: branch timeout: computing the branch name, color, or both '
'timed out while formatting the prompt. You may avoid this by '
'increaing the value of $VC_BRANCH_TIMEOUT or by removing branch '
'fields, like {curr_branch}, from your $PROMPT. See the FAQ '
'for more details. This message will be suppressed for the remainder '
'of this session. To suppress this message permanently, set '
'$SUPPRESS_BRANCH_TIMEOUT_MESSAGE = True in your xonshrc file.',
file=sys.stderr)
def current_branch(pad=NotImplemented):
"""Gets the branch for a current working directory. Returns an empty string
if the cwd is not a repository. This currently only works for git and hg
and should be extended in the future. If a timeout occurred, the string
'<branch-timeout>' is returned.
"""
if pad is not NotImplemented:
warnings.warn("The pad argument of current_branch has no effect now "
"and will be removed in the future")
branch = None
cmds = builtins.__xonsh_commands_cache__
if cmds.lazy_locate_binary('git') or cmds.is_empty():
branch = get_git_branch()
if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and not branch:
branch = get_hg_branch()
if isinstance(branch, subprocess.TimeoutExpired):
branch = '<branch-timeout>'
_first_branch_timeout_message()
return branch or None
def git_dirty_working_directory(cwd=None, include_untracked=False):
"""Returns whether or not the git directory is dirty. If this could not
be determined (timeout, file not sound, etc.) then this returns None.
"""
cmd = ['git', 'status', '--porcelain']
if include_untracked:
cmd.append('--untracked-files=normal')
else:
cmd.append('--untracked-files=no')
env = builtins.__xonsh_env__
cwd = env['PWD']
denv = env.detype()
vcbt = env['VC_BRANCH_TIMEOUT']
try:
s = subprocess.check_output(cmd, stderr=subprocess.PIPE, cwd=cwd,
timeout=vcbt, universal_newlines=True,
env=denv)
if ON_WINDOWS and len(s) == 0:
# Workaround for a bug in ConEMU/cmder, retry without redirection
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt,
env=denv, universal_newlines=True)
return bool(s)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired,
FileNotFoundError):
return None
def hg_dirty_working_directory():
"""Computes whether or not the mercurial working directory is dirty or not.
If this cannot be deterimined, None is returned.
"""
env = builtins.__xonsh_env__
cwd = env['PWD']
denv = env.detype()
vcbt = env['VC_BRANCH_TIMEOUT']
# Override user configurations settings and aliases
denv['HGRCPATH'] = ''
try:
s = subprocess.check_output(['hg', 'identify', '--id'],
stderr=subprocess.PIPE, cwd=cwd, timeout=vcbt,
universal_newlines=True, env=denv)
return s.strip(os.linesep).endswith('+')
except (subprocess.CalledProcessError, subprocess.TimeoutExpired,
FileNotFoundError):
return None
def dirty_working_directory(cwd=None):
"""Returns a boolean as to whether there are uncommitted files in version
control repository we are inside. If this cannot be determined, returns
None. Currently supports git and hg.
"""
dwd = None
cmds = builtins.__xonsh_commands_cache__
if cmds.lazy_locate_binary('git') or cmds.is_empty():
dwd = git_dirty_working_directory()
if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and (dwd is None):
dwd = hg_dirty_working_directory()
return dwd
def branch_color():
"""Return red if the current branch is dirty, yellow if the dirtiness can
not be determined, and green if it clean. These are bold, intense colors
for the foreground.
"""
dwd = dirty_working_directory()
if dwd is None:
color = '{BOLD_INTENSE_YELLOW}'
elif dwd:
color = '{BOLD_INTENSE_RED}'
else:
color = '{BOLD_INTENSE_GREEN}'
return color
def branch_bg_color():
"""Return red if the current branch is dirty, yellow if the dirtiness can
not be determined, and green if it clean. These are bacground colors.
"""
dwd = dirty_working_directory()
if dwd is None:
color = '{BACKGROUND_YELLOW}'
elif dwd:
color = '{BACKGROUND_RED}'
else:
color = '{BACKGROUND_GREEN}'
return color
def _replace_home(x):
if ON_WINDOWS:
home = (builtins.__xonsh_env__['HOMEDRIVE'] +
builtins.__xonsh_env__['HOMEPATH'][0])
if x.startswith(home):
x = x.replace(home, '~', 1)
if builtins.__xonsh_env__.get('FORCE_POSIX_PATHS'):
x = x.replace(os.sep, os.altsep)
return x
else:
home = builtins.__xonsh_env__['HOME']
if x.startswith(home):
x = x.replace(home, '~', 1)
return x
def _replace_home_cwd():
return _replace_home(builtins.__xonsh_env__['PWD'])
def _collapsed_pwd():
sep = get_sep()
pwd = _replace_home_cwd().split(sep)
l = len(pwd)
leader = sep if l > 0 and len(pwd[0]) == 0 else ''
base = [i[0] if ix != l - 1 else i for ix, i in enumerate(pwd) if len(i) > 0]
return leader + sep.join(base)
def _dynamically_collapsed_pwd():
"""Return the compact current working directory. It respects the
environment variable DYNAMIC_CWD_WIDTH.
"""
originial_path = _replace_home_cwd()
target_width, units = builtins.__xonsh_env__['DYNAMIC_CWD_WIDTH']
if target_width == float('inf'):
return originial_path
if (units == '%'):
cols, _ = shutil.get_terminal_size()
target_width = (cols * target_width) // 100
sep = get_sep()
pwd = originial_path.split(sep)
last = pwd.pop()
remaining_space = target_width - len(last)
# Reserve space for separators
remaining_space_for_text = remaining_space - len(pwd)
parts = []
for i in range(len(pwd)):
part = pwd[i]
part_len = int(min(len(part), max(1, remaining_space_for_text // (len(pwd) - i))))
remaining_space_for_text -= part_len
reduced_part = part[0:part_len]
parts.append(reduced_part)
parts.append(last)
full = sep.join(parts)
# If even if displaying one letter per dir we are too long
if (len(full) > target_width):
# We truncate the left most part
full = "..." + full[int(-target_width) + 3:]
# if there is not even a single separator we still
# want to display at least the beginning of the directory
if full.find(sep) == -1:
full = ("..." + sep + last)[0:int(target_width)]
return full
def _current_job():
j = get_next_task()
if j is not None:
if not j['bg']:
cmd = j['cmds'][-1]
s = cmd[0]
if s == 'sudo' and len(cmd) > 1:
s = cmd[1]
return s
def env_name(pre_chars='(', post_chars=')'):
"""Extract the current environment name from $VIRTUAL_ENV or
$CONDA_DEFAULT_ENV if that is set
"""
env_path = builtins.__xonsh_env__.get('VIRTUAL_ENV', '')
if len(env_path) == 0 and ON_ANACONDA:
env_path = builtins.__xonsh_env__.get('CONDA_DEFAULT_ENV', '')
env_name = os.path.basename(env_path)
if env_name:
return pre_chars + env_name + post_chars
if ON_WINDOWS:
USER = 'USERNAME'
else:
USER = 'USER'
def vte_new_tab_cwd():
"""This prints an escape squence that tells VTE terminals the hostname
and pwd. This should not be needed in most cases, but sometimes is for
certain Linux terminals that do not read the PWD from the environment
on startup. Note that this does not return a string, it simply prints
and flushes the escape sequence to stdout directly.
"""
env = builtins.__xonsh_env__
t = '\033]7;file://{}{}\007'
s = t.format(env.get('HOSTNAME'), env.get('PWD'))
print(s, end='', flush=True)
FORMATTER_DICT = LazyObject(lambda: dict(
user=os.environ.get(USER, '<user>'),
prompt_end='#' if is_superuser() else '$',
hostname=socket.gethostname().split('.', 1)[0],
cwd=_dynamically_collapsed_pwd,
cwd_dir=lambda: os.path.dirname(_replace_home_cwd()),
cwd_base=lambda: os.path.basename(_replace_home_cwd()),
short_cwd=_collapsed_pwd,
curr_branch=current_branch,
branch_color=branch_color,
branch_bg_color=branch_bg_color,
current_job=_current_job,
env_name=env_name,
vte_new_tab_cwd=vte_new_tab_cwd,
), globals(), 'FORMATTER_DICT')
_FORMATTER = LazyObject(string.Formatter, globals(), '_FORMATTER')

24
xonsh/prompt/__init__.py Normal file
View file

@ -0,0 +1,24 @@
# amalgamate exclude
import os as _os
if _os.getenv('XONSH_DEBUG', ''):
pass
else:
import sys as _sys
try:
from xonsh.prompt import __amalgam__
cwd = __amalgam__
_sys.modules['xonsh.prompt.cwd'] = __amalgam__
env = __amalgam__
_sys.modules['xonsh.prompt.env'] = __amalgam__
job = __amalgam__
_sys.modules['xonsh.prompt.job'] = __amalgam__
vc_branch = __amalgam__
_sys.modules['xonsh.prompt.vc_branch'] = __amalgam__
base = __amalgam__
_sys.modules['xonsh.prompt.base'] = __amalgam__
del __amalgam__
except ImportError:
pass
del _sys
del _os
# amalgamate end

42
xonsh/prompt/base.py Normal file
View file

@ -0,0 +1,42 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import socket
from xonsh.lazyasd import LazyObject
from xonsh.tools import is_superuser
from xonsh.platform import ON_WINDOWS
from xonsh.prompt.cwd import (
_collapsed_pwd, _replace_home_cwd, _dynamically_collapsed_pwd
)
from xonsh.prompt.job import _current_job
from xonsh.prompt.env import (env_name, vte_new_tab_cwd)
from xonsh.prompt.vc_branch import (
current_branch, branch_color, branch_bg_color
)
if ON_WINDOWS:
USER = 'USERNAME'
else:
USER = 'USER'
FORMATTER_DICT = LazyObject(lambda: dict(
user=os.environ.get(USER, '<user>'),
prompt_end='#' if is_superuser() else '$',
hostname=socket.gethostname().split('.', 1)[0],
cwd=_dynamically_collapsed_pwd,
cwd_dir=lambda: os.path.dirname(_replace_home_cwd()),
cwd_base=lambda: os.path.basename(_replace_home_cwd()),
short_cwd=_collapsed_pwd,
curr_branch=current_branch,
branch_color=branch_color,
branch_bg_color=branch_bg_color,
current_job=_current_job,
env_name=env_name,
vte_new_tab_cwd=vte_new_tab_cwd,
), globals(), 'FORMATTER_DICT')

80
xonsh/prompt/cwd.py Normal file
View file

@ -0,0 +1,80 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Created by i@BlahGeek.com at 2016-09-03
import os
import shutil
import builtins
from xonsh.tools import get_sep
from xonsh.platform import ON_WINDOWS
def _replace_home(x):
if ON_WINDOWS:
home = (builtins.__xonsh_env__['HOMEDRIVE'] +
builtins.__xonsh_env__['HOMEPATH'][0])
if x.startswith(home):
x = x.replace(home, '~', 1)
if builtins.__xonsh_env__.get('FORCE_POSIX_PATHS'):
x = x.replace(os.sep, os.altsep)
return x
else:
home = builtins.__xonsh_env__['HOME']
if x.startswith(home):
x = x.replace(home, '~', 1)
return x
def _replace_home_cwd():
return _replace_home(builtins.__xonsh_env__['PWD'])
def _collapsed_pwd():
sep = get_sep()
pwd = _replace_home_cwd().split(sep)
l = len(pwd)
leader = sep if l > 0 and len(pwd[0]) == 0 else ''
base = [i[0] if ix != l - 1 else i
for ix, i in enumerate(pwd) if len(i) > 0]
return leader + sep.join(base)
def _dynamically_collapsed_pwd():
"""Return the compact current working directory. It respects the
environment variable DYNAMIC_CWD_WIDTH.
"""
originial_path = _replace_home_cwd()
target_width, units = builtins.__xonsh_env__['DYNAMIC_CWD_WIDTH']
if target_width == float('inf'):
return originial_path
if (units == '%'):
cols, _ = shutil.get_terminal_size()
target_width = (cols * target_width) // 100
sep = get_sep()
pwd = originial_path.split(sep)
last = pwd.pop()
remaining_space = target_width - len(last)
# Reserve space for separators
remaining_space_for_text = remaining_space - len(pwd)
parts = []
for i in range(len(pwd)):
part = pwd[i]
part_len = int(min(len(part),
max(1, remaining_space_for_text // (len(pwd) - i))))
remaining_space_for_text -= part_len
reduced_part = part[0:part_len]
parts.append(reduced_part)
parts.append(last)
full = sep.join(parts)
# If even if displaying one letter per dir we are too long
if (len(full) > target_width):
# We truncate the left most part
full = "..." + full[int(-target_width) + 3:]
# if there is not even a single separator we still
# want to display at least the beginning of the directory
if full.find(sep) == -1:
full = ("..." + sep + last)[0:int(target_width)]
return full

32
xonsh/prompt/env.py Normal file
View file

@ -0,0 +1,32 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import builtins
from xonsh.platform import ON_ANACONDA
def env_name(pre_chars='(', post_chars=')'):
"""Extract the current environment name from $VIRTUAL_ENV or
$CONDA_DEFAULT_ENV if that is set
"""
env_path = builtins.__xonsh_env__.get('VIRTUAL_ENV', '')
if len(env_path) == 0 and ON_ANACONDA:
env_path = builtins.__xonsh_env__.get('CONDA_DEFAULT_ENV', '')
env_name = os.path.basename(env_path)
if env_name:
return pre_chars + env_name + post_chars
def vte_new_tab_cwd():
"""This prints an escape squence that tells VTE terminals the hostname
and pwd. This should not be needed in most cases, but sometimes is for
certain Linux terminals that do not read the PWD from the environment
on startup. Note that this does not return a string, it simply prints
and flushes the escape sequence to stdout directly.
"""
env = builtins.__xonsh_env__
t = '\033]7;file://{}{}\007'
s = t.format(env.get('HOSTNAME'), env.get('PWD'))
print(s, end='', flush=True)

15
xonsh/prompt/job.py Normal file
View file

@ -0,0 +1,15 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from xonsh.jobs import get_next_task
def _current_job():
j = get_next_task()
if j is not None:
if not j['bg']:
cmd = j['cmds'][-1]
s = cmd[0]
if s == 'sudo' and len(cmd) > 1:
s = cmd[1]
return s

228
xonsh/prompt/vc_branch.py Normal file
View file

@ -0,0 +1,228 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Created by i@BlahGeek.com at 2016-09-03
import os
import sys
import time
import builtins
import warnings
import subprocess
from xonsh.platform import ON_WINDOWS
def get_git_branch():
"""Attempts to find the current git branch. If no branch is found, then
an empty string is returned. If a timeout occured, the timeout exception
(subprocess.TimeoutExpired) is returned.
"""
branch = None
env = builtins.__xonsh_env__
cwd = env['PWD']
denv = env.detype()
vcbt = env['VC_BRANCH_TIMEOUT']
if not ON_WINDOWS:
prompt_scripts = ['/usr/lib/git-core/git-sh-prompt',
'/usr/local/etc/bash_completion.d/git-prompt.sh']
for script in prompt_scripts:
# note that this is about 10x faster than bash -i "__git_ps1"
inp = 'source {}; __git_ps1 "${{1:-%s}}"'.format(script)
try:
branch = subprocess.check_output(['bash'], cwd=cwd, input=inp,
stderr=subprocess.PIPE, timeout=vcbt, env=denv,
universal_newlines=True)
break
except subprocess.TimeoutExpired as e:
branch = e
break
except (subprocess.CalledProcessError, FileNotFoundError):
continue
# fall back to using the git binary if the above failed
if branch is None:
cmd = ['git', 'rev-parse', '--abbrev-ref', 'HEAD']
try:
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt, env=denv,
stderr=subprocess.PIPE, universal_newlines=True)
if ON_WINDOWS and len(s) == 0:
# Workaround for a bug in ConEMU/cmder, retry without redirection
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt,
env=denv, universal_newlines=True)
branch = s.strip()
except subprocess.TimeoutExpired as e:
branch = e
except (subprocess.CalledProcessError, FileNotFoundError):
branch = None
return branch
def _get_parent_dir_for(path, dir_name, timeout):
# walk up the directory tree to see if we are inside an hg repo
# the timeout makes sure that we don't thrash the file system
previous_path = ''
t0 = time.time()
while path != previous_path and ((time.time() - t0) < timeout):
if os.path.isdir(os.path.join(path, dir_name)):
return path
previous_path = path
path, _ = os.path.split(path)
return (path == previous_path)
def get_hg_branch(cwd=None, root=None):
env = builtins.__xonsh_env__
cwd = env['PWD']
root = _get_parent_dir_for(cwd, '.hg', env['VC_BRANCH_TIMEOUT'])
if not isinstance(root, str):
# Bail if we are not in a repo or we timed out
if root:
return None
else:
return subprocess.TimeoutExpired(['hg'], env['VC_BRANCH_TIMEOUT'])
# get branch name
branch_path = os.path.sep.join([root, '.hg', 'branch'])
if os.path.exists(branch_path):
with open(branch_path, 'r') as branch_file:
branch = branch_file.read()
else:
branch = 'default'
# add bookmark, if we can
bookmark_path = os.path.sep.join([root, '.hg', 'bookmarks.current'])
if os.path.exists(bookmark_path):
with open(bookmark_path, 'r') as bookmark_file:
active_bookmark = bookmark_file.read()
branch = "{0}, {1}".format(*(b.strip(os.linesep) for b in
(branch, active_bookmark)))
else:
branch = branch.strip(os.linesep)
return branch
_FIRST_BRANCH_TIMEOUT = True
def _first_branch_timeout_message():
global _FIRST_BRANCH_TIMEOUT
sbtm = builtins.__xonsh_env__['SUPPRESS_BRANCH_TIMEOUT_MESSAGE']
if not _FIRST_BRANCH_TIMEOUT or sbtm:
return
_FIRST_BRANCH_TIMEOUT = False
print('xonsh: branch timeout: computing the branch name, color, or both '
'timed out while formatting the prompt. You may avoid this by '
'increaing the value of $VC_BRANCH_TIMEOUT or by removing branch '
'fields, like {curr_branch}, from your $PROMPT. See the FAQ '
'for more details. This message will be suppressed for the remainder '
'of this session. To suppress this message permanently, set '
'$SUPPRESS_BRANCH_TIMEOUT_MESSAGE = True in your xonshrc file.',
file=sys.stderr)
def current_branch(pad=NotImplemented):
"""Gets the branch for a current working directory. Returns an empty string
if the cwd is not a repository. This currently only works for git and hg
and should be extended in the future. If a timeout occurred, the string
'<branch-timeout>' is returned.
"""
if pad is not NotImplemented:
warnings.warn("The pad argument of current_branch has no effect now "
"and will be removed in the future")
branch = None
cmds = builtins.__xonsh_commands_cache__
if cmds.lazy_locate_binary('git') or cmds.is_empty():
branch = get_git_branch()
if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and not branch:
branch = get_hg_branch()
if isinstance(branch, subprocess.TimeoutExpired):
branch = '<branch-timeout>'
_first_branch_timeout_message()
return branch or None
def git_dirty_working_directory(cwd=None, include_untracked=False):
"""Returns whether or not the git directory is dirty. If this could not
be determined (timeout, file not sound, etc.) then this returns None.
"""
cmd = ['git', 'status', '--porcelain']
if include_untracked:
cmd.append('--untracked-files=normal')
else:
cmd.append('--untracked-files=no')
env = builtins.__xonsh_env__
cwd = env['PWD']
denv = env.detype()
vcbt = env['VC_BRANCH_TIMEOUT']
try:
s = subprocess.check_output(cmd, stderr=subprocess.PIPE, cwd=cwd,
timeout=vcbt, universal_newlines=True,
env=denv)
if ON_WINDOWS and len(s) == 0:
# Workaround for a bug in ConEMU/cmder, retry without redirection
s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt,
env=denv, universal_newlines=True)
return bool(s)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired,
FileNotFoundError):
return None
def hg_dirty_working_directory():
"""Computes whether or not the mercurial working directory is dirty or not.
If this cannot be deterimined, None is returned.
"""
env = builtins.__xonsh_env__
cwd = env['PWD']
denv = env.detype()
vcbt = env['VC_BRANCH_TIMEOUT']
# Override user configurations settings and aliases
denv['HGRCPATH'] = ''
try:
s = subprocess.check_output(['hg', 'identify', '--id'],
stderr=subprocess.PIPE, cwd=cwd, timeout=vcbt,
universal_newlines=True, env=denv)
return s.strip(os.linesep).endswith('+')
except (subprocess.CalledProcessError, subprocess.TimeoutExpired,
FileNotFoundError):
return None
def dirty_working_directory(cwd=None):
"""Returns a boolean as to whether there are uncommitted files in version
control repository we are inside. If this cannot be determined, returns
None. Currently supports git and hg.
"""
dwd = None
cmds = builtins.__xonsh_commands_cache__
if cmds.lazy_locate_binary('git') or cmds.is_empty():
dwd = git_dirty_working_directory()
if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and (dwd is None):
dwd = hg_dirty_working_directory()
return dwd
def branch_color():
"""Return red if the current branch is dirty, yellow if the dirtiness can
not be determined, and green if it clean. These are bold, intense colors
for the foreground.
"""
dwd = dirty_working_directory()
if dwd is None:
color = '{BOLD_INTENSE_YELLOW}'
elif dwd:
color = '{BOLD_INTENSE_RED}'
else:
color = '{BOLD_INTENSE_GREEN}'
return color
def branch_bg_color():
"""Return red if the current branch is dirty, yellow if the dirtiness can
not be determined, and green if it clean. These are bacground colors.
"""
dwd = dirty_working_directory()
if dwd is None:
color = '{BACKGROUND_YELLOW}'
elif dwd:
color = '{BACKGROUND_RED}'
else:
color = '{BACKGROUND_GREEN}'
return color