From 78bfee18c016275c8c76be2f9289558ff4ddaaca Mon Sep 17 00:00:00 2001 From: BlahGeek Date: Sat, 3 Sep 2016 12:55:52 +0800 Subject: [PATCH] move prompts from environ.py to prompt/ --- setup.py | 9 +- xonsh/environ.py | 355 +------------------------------------- xonsh/prompt/__init__.py | 24 +++ xonsh/prompt/base.py | 42 +++++ xonsh/prompt/cwd.py | 80 +++++++++ xonsh/prompt/env.py | 32 ++++ xonsh/prompt/job.py | 15 ++ xonsh/prompt/vc_branch.py | 228 ++++++++++++++++++++++++ 8 files changed, 432 insertions(+), 353 deletions(-) create mode 100644 xonsh/prompt/__init__.py create mode 100644 xonsh/prompt/base.py create mode 100644 xonsh/prompt/cwd.py create mode 100644 xonsh/prompt/env.py create mode 100644 xonsh/prompt/job.py create mode 100644 xonsh/prompt/vc_branch.py diff --git a/setup.py b/setup.py index 862fbd01a..f1f14952d 100755 --- a/setup.py +++ b/setup.py @@ -36,7 +36,9 @@ except ImportError: TABLES = ['xonsh/lexer_table.py', 'xonsh/parser_table.py', - 'xonsh/__amalgam__.py', 'xonsh/completers/__amalgam__.py'] + 'xonsh/__amalgam__.py', + 'xonsh/completers/__amalgam__.py', + 'xonsh/prompt/__amalgam__.py'] def clean_tables(): @@ -60,7 +62,7 @@ def amalgamate_source(): print('Could not import amalgamate, skipping.', file=sys.stderr) return amalgamate.main(['amalgamate', '--debug=XONSH_DEBUG', 'xonsh', - 'xonsh.completers']) + 'xonsh.completers', 'xonsh.prompt']) sys.path.pop(0) @@ -290,7 +292,8 @@ def main(): platforms='Cross Platform', classifiers=['Programming Language :: Python :: 3'], packages=['xonsh', 'xonsh.ply', 'xonsh.ptk', 'xonsh.parsers', - 'xonsh.xoreutils', 'xontrib', 'xonsh.completers'], + 'xonsh.xoreutils', 'xontrib', + 'xonsh.completers', 'xonsh.prompt'], package_dir={'xonsh': 'xonsh', 'xontrib': 'xontrib'}, package_data={'xonsh': ['*.json', '*.githash'], 'xontrib': ['*.xsh']}, cmdclass=cmdclass, diff --git a/xonsh/environ.py b/xonsh/environ.py index 4064bb698..de3b21475 100644 --- a/xonsh/environ.py +++ b/xonsh/environ.py @@ -3,10 +3,7 @@ import os import re import sys -import time import json -import socket -import shutil import string import pprint import locale @@ -15,27 +12,25 @@ import warnings import traceback import itertools import contextlib -import subprocess import collections import collections.abc as cabc from xonsh import __version__ as XONSH_VERSION -from xonsh.jobs import get_next_task from xonsh.lazyasd import LazyObject, lazyobject from xonsh.codecache import run_script_with_cache from xonsh.dirstack import _get_cwd from xonsh.foreign_shells import load_foreign_envs from xonsh.platform import ( BASH_COMPLETIONS_DEFAULT, DEFAULT_ENCODING, PATH_DEFAULT, - ON_WINDOWS, ON_ANACONDA, ON_LINUX, ON_CYGWIN, + ON_WINDOWS, ON_LINUX, ON_CYGWIN, ) from xonsh.tools import ( - is_superuser, always_true, always_false, ensure_string, is_env_path, + always_true, always_false, ensure_string, is_env_path, str_to_env_path, env_path_to_str, is_bool, to_bool, bool_to_str, is_history_tuple, to_history_tuple, history_tuple_to_str, is_float, is_string, is_string_or_callable, is_completions_display_value, to_completions_display_value, - is_string_set, csv_to_set, set_to_csv, get_sep, is_int, is_bool_seq, + is_string_set, csv_to_set, set_to_csv, is_int, is_bool_seq, to_bool_or_int, bool_or_int_to_str, csv_to_bool_seq, bool_seq_to_csv, DefaultNotGiven, print_exception, setup_win_unicode_console, intensify_colors_on_win_setter, format_color, @@ -45,6 +40,8 @@ from xonsh.tools import ( seq_to_upper_pathsep, ) +from xonsh.prompt.base import FORMATTER_DICT + @lazyobject def LOCALE_CATS(): @@ -903,348 +900,6 @@ def locate_binary(name): return builtins.__xonsh_commands_cache__.locate_binary(name) -def get_git_branch(): - """Attempts to find the current git branch. If no branch is found, then - an empty string is returned. If a timeout occured, the timeout exception - (subprocess.TimeoutExpired) is returned. - """ - branch = None - env = builtins.__xonsh_env__ - cwd = env['PWD'] - denv = env.detype() - vcbt = env['VC_BRANCH_TIMEOUT'] - if not ON_WINDOWS: - prompt_scripts = ['/usr/lib/git-core/git-sh-prompt', - '/usr/local/etc/bash_completion.d/git-prompt.sh'] - for script in prompt_scripts: - # note that this is about 10x faster than bash -i "__git_ps1" - inp = 'source {}; __git_ps1 "${{1:-%s}}"'.format(script) - try: - branch = subprocess.check_output(['bash'], cwd=cwd, input=inp, - stderr=subprocess.PIPE, timeout=vcbt, env=denv, - universal_newlines=True) - break - except subprocess.TimeoutExpired as e: - branch = e - break - except (subprocess.CalledProcessError, FileNotFoundError): - continue - # fall back to using the git binary if the above failed - if branch is None: - cmd = ['git', 'rev-parse', '--abbrev-ref', 'HEAD'] - try: - s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt, env=denv, - stderr=subprocess.PIPE, universal_newlines=True) - if ON_WINDOWS and len(s) == 0: - # Workaround for a bug in ConEMU/cmder, retry without redirection - s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt, - env=denv, universal_newlines=True) - branch = s.strip() - except subprocess.TimeoutExpired as e: - branch = e - except (subprocess.CalledProcessError, FileNotFoundError): - branch = None - return branch - - -def _get_parent_dir_for(path, dir_name, timeout): - # walk up the directory tree to see if we are inside an hg repo - # the timeout makes sure that we don't thrash the file system - previous_path = '' - t0 = time.time() - while path != previous_path and ((time.time() - t0) < timeout): - if os.path.isdir(os.path.join(path, dir_name)): - return path - previous_path = path - path, _ = os.path.split(path) - return (path == previous_path) - - -def get_hg_branch(cwd=None, root=None): - env = builtins.__xonsh_env__ - cwd = env['PWD'] - root = _get_parent_dir_for(cwd, '.hg', env['VC_BRANCH_TIMEOUT']) - if not isinstance(root, str): - # Bail if we are not in a repo or we timed out - if root: - return None - else: - return subprocess.TimeoutExpired(['hg'], env['VC_BRANCH_TIMEOUT']) - # get branch name - branch_path = os.path.sep.join([root, '.hg', 'branch']) - if os.path.exists(branch_path): - with open(branch_path, 'r') as branch_file: - branch = branch_file.read() - else: - branch = 'default' - # add bookmark, if we can - bookmark_path = os.path.sep.join([root, '.hg', 'bookmarks.current']) - if os.path.exists(bookmark_path): - with open(bookmark_path, 'r') as bookmark_file: - active_bookmark = bookmark_file.read() - branch = "{0}, {1}".format(*(b.strip(os.linesep) for b in - (branch, active_bookmark))) - else: - branch = branch.strip(os.linesep) - return branch - - -_FIRST_BRANCH_TIMEOUT = True - - -def _first_branch_timeout_message(): - global _FIRST_BRANCH_TIMEOUT - sbtm = builtins.__xonsh_env__['SUPPRESS_BRANCH_TIMEOUT_MESSAGE'] - if not _FIRST_BRANCH_TIMEOUT or sbtm: - return - _FIRST_BRANCH_TIMEOUT = False - print('xonsh: branch timeout: computing the branch name, color, or both ' - 'timed out while formatting the prompt. You may avoid this by ' - 'increaing the value of $VC_BRANCH_TIMEOUT or by removing branch ' - 'fields, like {curr_branch}, from your $PROMPT. See the FAQ ' - 'for more details. This message will be suppressed for the remainder ' - 'of this session. To suppress this message permanently, set ' - '$SUPPRESS_BRANCH_TIMEOUT_MESSAGE = True in your xonshrc file.', - file=sys.stderr) - - -def current_branch(pad=NotImplemented): - """Gets the branch for a current working directory. Returns an empty string - if the cwd is not a repository. This currently only works for git and hg - and should be extended in the future. If a timeout occurred, the string - '' is returned. - """ - if pad is not NotImplemented: - warnings.warn("The pad argument of current_branch has no effect now " - "and will be removed in the future") - branch = None - cmds = builtins.__xonsh_commands_cache__ - if cmds.lazy_locate_binary('git') or cmds.is_empty(): - branch = get_git_branch() - if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and not branch: - branch = get_hg_branch() - if isinstance(branch, subprocess.TimeoutExpired): - branch = '' - _first_branch_timeout_message() - return branch or None - - -def git_dirty_working_directory(cwd=None, include_untracked=False): - """Returns whether or not the git directory is dirty. If this could not - be determined (timeout, file not sound, etc.) then this returns None. - """ - cmd = ['git', 'status', '--porcelain'] - if include_untracked: - cmd.append('--untracked-files=normal') - else: - cmd.append('--untracked-files=no') - env = builtins.__xonsh_env__ - cwd = env['PWD'] - denv = env.detype() - vcbt = env['VC_BRANCH_TIMEOUT'] - try: - s = subprocess.check_output(cmd, stderr=subprocess.PIPE, cwd=cwd, - timeout=vcbt, universal_newlines=True, - env=denv) - if ON_WINDOWS and len(s) == 0: - # Workaround for a bug in ConEMU/cmder, retry without redirection - s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt, - env=denv, universal_newlines=True) - return bool(s) - except (subprocess.CalledProcessError, subprocess.TimeoutExpired, - FileNotFoundError): - return None - - -def hg_dirty_working_directory(): - """Computes whether or not the mercurial working directory is dirty or not. - If this cannot be deterimined, None is returned. - """ - env = builtins.__xonsh_env__ - cwd = env['PWD'] - denv = env.detype() - vcbt = env['VC_BRANCH_TIMEOUT'] - # Override user configurations settings and aliases - denv['HGRCPATH'] = '' - try: - s = subprocess.check_output(['hg', 'identify', '--id'], - stderr=subprocess.PIPE, cwd=cwd, timeout=vcbt, - universal_newlines=True, env=denv) - return s.strip(os.linesep).endswith('+') - except (subprocess.CalledProcessError, subprocess.TimeoutExpired, - FileNotFoundError): - return None - - -def dirty_working_directory(cwd=None): - """Returns a boolean as to whether there are uncommitted files in version - control repository we are inside. If this cannot be determined, returns - None. Currently supports git and hg. - """ - dwd = None - cmds = builtins.__xonsh_commands_cache__ - if cmds.lazy_locate_binary('git') or cmds.is_empty(): - dwd = git_dirty_working_directory() - if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and (dwd is None): - dwd = hg_dirty_working_directory() - return dwd - - -def branch_color(): - """Return red if the current branch is dirty, yellow if the dirtiness can - not be determined, and green if it clean. These are bold, intense colors - for the foreground. - """ - dwd = dirty_working_directory() - if dwd is None: - color = '{BOLD_INTENSE_YELLOW}' - elif dwd: - color = '{BOLD_INTENSE_RED}' - else: - color = '{BOLD_INTENSE_GREEN}' - return color - - -def branch_bg_color(): - """Return red if the current branch is dirty, yellow if the dirtiness can - not be determined, and green if it clean. These are bacground colors. - """ - dwd = dirty_working_directory() - if dwd is None: - color = '{BACKGROUND_YELLOW}' - elif dwd: - color = '{BACKGROUND_RED}' - else: - color = '{BACKGROUND_GREEN}' - return color - - -def _replace_home(x): - if ON_WINDOWS: - home = (builtins.__xonsh_env__['HOMEDRIVE'] + - builtins.__xonsh_env__['HOMEPATH'][0]) - if x.startswith(home): - x = x.replace(home, '~', 1) - - if builtins.__xonsh_env__.get('FORCE_POSIX_PATHS'): - x = x.replace(os.sep, os.altsep) - - return x - else: - home = builtins.__xonsh_env__['HOME'] - if x.startswith(home): - x = x.replace(home, '~', 1) - return x - - -def _replace_home_cwd(): - return _replace_home(builtins.__xonsh_env__['PWD']) - - -def _collapsed_pwd(): - sep = get_sep() - pwd = _replace_home_cwd().split(sep) - l = len(pwd) - leader = sep if l > 0 and len(pwd[0]) == 0 else '' - base = [i[0] if ix != l - 1 else i for ix, i in enumerate(pwd) if len(i) > 0] - return leader + sep.join(base) - - -def _dynamically_collapsed_pwd(): - """Return the compact current working directory. It respects the - environment variable DYNAMIC_CWD_WIDTH. - """ - originial_path = _replace_home_cwd() - target_width, units = builtins.__xonsh_env__['DYNAMIC_CWD_WIDTH'] - if target_width == float('inf'): - return originial_path - if (units == '%'): - cols, _ = shutil.get_terminal_size() - target_width = (cols * target_width) // 100 - sep = get_sep() - pwd = originial_path.split(sep) - last = pwd.pop() - remaining_space = target_width - len(last) - # Reserve space for separators - remaining_space_for_text = remaining_space - len(pwd) - parts = [] - for i in range(len(pwd)): - part = pwd[i] - part_len = int(min(len(part), max(1, remaining_space_for_text // (len(pwd) - i)))) - remaining_space_for_text -= part_len - reduced_part = part[0:part_len] - parts.append(reduced_part) - parts.append(last) - full = sep.join(parts) - # If even if displaying one letter per dir we are too long - if (len(full) > target_width): - # We truncate the left most part - full = "..." + full[int(-target_width) + 3:] - # if there is not even a single separator we still - # want to display at least the beginning of the directory - if full.find(sep) == -1: - full = ("..." + sep + last)[0:int(target_width)] - return full - - -def _current_job(): - j = get_next_task() - if j is not None: - if not j['bg']: - cmd = j['cmds'][-1] - s = cmd[0] - if s == 'sudo' and len(cmd) > 1: - s = cmd[1] - return s - - -def env_name(pre_chars='(', post_chars=')'): - """Extract the current environment name from $VIRTUAL_ENV or - $CONDA_DEFAULT_ENV if that is set - """ - env_path = builtins.__xonsh_env__.get('VIRTUAL_ENV', '') - if len(env_path) == 0 and ON_ANACONDA: - env_path = builtins.__xonsh_env__.get('CONDA_DEFAULT_ENV', '') - env_name = os.path.basename(env_path) - if env_name: - return pre_chars + env_name + post_chars - - -if ON_WINDOWS: - USER = 'USERNAME' -else: - USER = 'USER' - - -def vte_new_tab_cwd(): - """This prints an escape squence that tells VTE terminals the hostname - and pwd. This should not be needed in most cases, but sometimes is for - certain Linux terminals that do not read the PWD from the environment - on startup. Note that this does not return a string, it simply prints - and flushes the escape sequence to stdout directly. - """ - env = builtins.__xonsh_env__ - t = '\033]7;file://{}{}\007' - s = t.format(env.get('HOSTNAME'), env.get('PWD')) - print(s, end='', flush=True) - - -FORMATTER_DICT = LazyObject(lambda: dict( - user=os.environ.get(USER, ''), - prompt_end='#' if is_superuser() else '$', - hostname=socket.gethostname().split('.', 1)[0], - cwd=_dynamically_collapsed_pwd, - cwd_dir=lambda: os.path.dirname(_replace_home_cwd()), - cwd_base=lambda: os.path.basename(_replace_home_cwd()), - short_cwd=_collapsed_pwd, - curr_branch=current_branch, - branch_color=branch_color, - branch_bg_color=branch_bg_color, - current_job=_current_job, - env_name=env_name, - vte_new_tab_cwd=vte_new_tab_cwd, -), globals(), 'FORMATTER_DICT') - _FORMATTER = LazyObject(string.Formatter, globals(), '_FORMATTER') diff --git a/xonsh/prompt/__init__.py b/xonsh/prompt/__init__.py new file mode 100644 index 000000000..21b6b04e2 --- /dev/null +++ b/xonsh/prompt/__init__.py @@ -0,0 +1,24 @@ +# amalgamate exclude +import os as _os +if _os.getenv('XONSH_DEBUG', ''): + pass +else: + import sys as _sys + try: + from xonsh.prompt import __amalgam__ + cwd = __amalgam__ + _sys.modules['xonsh.prompt.cwd'] = __amalgam__ + env = __amalgam__ + _sys.modules['xonsh.prompt.env'] = __amalgam__ + job = __amalgam__ + _sys.modules['xonsh.prompt.job'] = __amalgam__ + vc_branch = __amalgam__ + _sys.modules['xonsh.prompt.vc_branch'] = __amalgam__ + base = __amalgam__ + _sys.modules['xonsh.prompt.base'] = __amalgam__ + del __amalgam__ + except ImportError: + pass + del _sys +del _os +# amalgamate end diff --git a/xonsh/prompt/base.py b/xonsh/prompt/base.py new file mode 100644 index 000000000..88d31b2a2 --- /dev/null +++ b/xonsh/prompt/base.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import socket + +from xonsh.lazyasd import LazyObject +from xonsh.tools import is_superuser +from xonsh.platform import ON_WINDOWS + + +from xonsh.prompt.cwd import ( + _collapsed_pwd, _replace_home_cwd, _dynamically_collapsed_pwd +) +from xonsh.prompt.job import _current_job +from xonsh.prompt.env import (env_name, vte_new_tab_cwd) +from xonsh.prompt.vc_branch import ( + current_branch, branch_color, branch_bg_color +) + + +if ON_WINDOWS: + USER = 'USERNAME' +else: + USER = 'USER' + + +FORMATTER_DICT = LazyObject(lambda: dict( + user=os.environ.get(USER, ''), + prompt_end='#' if is_superuser() else '$', + hostname=socket.gethostname().split('.', 1)[0], + cwd=_dynamically_collapsed_pwd, + cwd_dir=lambda: os.path.dirname(_replace_home_cwd()), + cwd_base=lambda: os.path.basename(_replace_home_cwd()), + short_cwd=_collapsed_pwd, + curr_branch=current_branch, + branch_color=branch_color, + branch_bg_color=branch_bg_color, + current_job=_current_job, + env_name=env_name, + vte_new_tab_cwd=vte_new_tab_cwd, +), globals(), 'FORMATTER_DICT') diff --git a/xonsh/prompt/cwd.py b/xonsh/prompt/cwd.py new file mode 100644 index 000000000..7f9e3c5c2 --- /dev/null +++ b/xonsh/prompt/cwd.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Created by i@BlahGeek.com at 2016-09-03 + +import os +import shutil +import builtins + +from xonsh.tools import get_sep +from xonsh.platform import ON_WINDOWS + + +def _replace_home(x): + if ON_WINDOWS: + home = (builtins.__xonsh_env__['HOMEDRIVE'] + + builtins.__xonsh_env__['HOMEPATH'][0]) + if x.startswith(home): + x = x.replace(home, '~', 1) + + if builtins.__xonsh_env__.get('FORCE_POSIX_PATHS'): + x = x.replace(os.sep, os.altsep) + + return x + else: + home = builtins.__xonsh_env__['HOME'] + if x.startswith(home): + x = x.replace(home, '~', 1) + return x + + +def _replace_home_cwd(): + return _replace_home(builtins.__xonsh_env__['PWD']) + + +def _collapsed_pwd(): + sep = get_sep() + pwd = _replace_home_cwd().split(sep) + l = len(pwd) + leader = sep if l > 0 and len(pwd[0]) == 0 else '' + base = [i[0] if ix != l - 1 else i + for ix, i in enumerate(pwd) if len(i) > 0] + return leader + sep.join(base) + + +def _dynamically_collapsed_pwd(): + """Return the compact current working directory. It respects the + environment variable DYNAMIC_CWD_WIDTH. + """ + originial_path = _replace_home_cwd() + target_width, units = builtins.__xonsh_env__['DYNAMIC_CWD_WIDTH'] + if target_width == float('inf'): + return originial_path + if (units == '%'): + cols, _ = shutil.get_terminal_size() + target_width = (cols * target_width) // 100 + sep = get_sep() + pwd = originial_path.split(sep) + last = pwd.pop() + remaining_space = target_width - len(last) + # Reserve space for separators + remaining_space_for_text = remaining_space - len(pwd) + parts = [] + for i in range(len(pwd)): + part = pwd[i] + part_len = int(min(len(part), + max(1, remaining_space_for_text // (len(pwd) - i)))) + remaining_space_for_text -= part_len + reduced_part = part[0:part_len] + parts.append(reduced_part) + parts.append(last) + full = sep.join(parts) + # If even if displaying one letter per dir we are too long + if (len(full) > target_width): + # We truncate the left most part + full = "..." + full[int(-target_width) + 3:] + # if there is not even a single separator we still + # want to display at least the beginning of the directory + if full.find(sep) == -1: + full = ("..." + sep + last)[0:int(target_width)] + return full diff --git a/xonsh/prompt/env.py b/xonsh/prompt/env.py new file mode 100644 index 000000000..1d82b7b0b --- /dev/null +++ b/xonsh/prompt/env.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import builtins + +from xonsh.platform import ON_ANACONDA + + +def env_name(pre_chars='(', post_chars=')'): + """Extract the current environment name from $VIRTUAL_ENV or + $CONDA_DEFAULT_ENV if that is set + """ + env_path = builtins.__xonsh_env__.get('VIRTUAL_ENV', '') + if len(env_path) == 0 and ON_ANACONDA: + env_path = builtins.__xonsh_env__.get('CONDA_DEFAULT_ENV', '') + env_name = os.path.basename(env_path) + if env_name: + return pre_chars + env_name + post_chars + + +def vte_new_tab_cwd(): + """This prints an escape squence that tells VTE terminals the hostname + and pwd. This should not be needed in most cases, but sometimes is for + certain Linux terminals that do not read the PWD from the environment + on startup. Note that this does not return a string, it simply prints + and flushes the escape sequence to stdout directly. + """ + env = builtins.__xonsh_env__ + t = '\033]7;file://{}{}\007' + s = t.format(env.get('HOSTNAME'), env.get('PWD')) + print(s, end='', flush=True) diff --git a/xonsh/prompt/job.py b/xonsh/prompt/job.py new file mode 100644 index 000000000..d816b0ab1 --- /dev/null +++ b/xonsh/prompt/job.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from xonsh.jobs import get_next_task + + +def _current_job(): + j = get_next_task() + if j is not None: + if not j['bg']: + cmd = j['cmds'][-1] + s = cmd[0] + if s == 'sudo' and len(cmd) > 1: + s = cmd[1] + return s diff --git a/xonsh/prompt/vc_branch.py b/xonsh/prompt/vc_branch.py new file mode 100644 index 000000000..ecf7d5605 --- /dev/null +++ b/xonsh/prompt/vc_branch.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Created by i@BlahGeek.com at 2016-09-03 + +import os +import sys +import time +import builtins +import warnings +import subprocess + +from xonsh.platform import ON_WINDOWS + + +def get_git_branch(): + """Attempts to find the current git branch. If no branch is found, then + an empty string is returned. If a timeout occured, the timeout exception + (subprocess.TimeoutExpired) is returned. + """ + branch = None + env = builtins.__xonsh_env__ + cwd = env['PWD'] + denv = env.detype() + vcbt = env['VC_BRANCH_TIMEOUT'] + if not ON_WINDOWS: + prompt_scripts = ['/usr/lib/git-core/git-sh-prompt', + '/usr/local/etc/bash_completion.d/git-prompt.sh'] + for script in prompt_scripts: + # note that this is about 10x faster than bash -i "__git_ps1" + inp = 'source {}; __git_ps1 "${{1:-%s}}"'.format(script) + try: + branch = subprocess.check_output(['bash'], cwd=cwd, input=inp, + stderr=subprocess.PIPE, timeout=vcbt, env=denv, + universal_newlines=True) + break + except subprocess.TimeoutExpired as e: + branch = e + break + except (subprocess.CalledProcessError, FileNotFoundError): + continue + # fall back to using the git binary if the above failed + if branch is None: + cmd = ['git', 'rev-parse', '--abbrev-ref', 'HEAD'] + try: + s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt, env=denv, + stderr=subprocess.PIPE, universal_newlines=True) + if ON_WINDOWS and len(s) == 0: + # Workaround for a bug in ConEMU/cmder, retry without redirection + s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt, + env=denv, universal_newlines=True) + branch = s.strip() + except subprocess.TimeoutExpired as e: + branch = e + except (subprocess.CalledProcessError, FileNotFoundError): + branch = None + return branch + + +def _get_parent_dir_for(path, dir_name, timeout): + # walk up the directory tree to see if we are inside an hg repo + # the timeout makes sure that we don't thrash the file system + previous_path = '' + t0 = time.time() + while path != previous_path and ((time.time() - t0) < timeout): + if os.path.isdir(os.path.join(path, dir_name)): + return path + previous_path = path + path, _ = os.path.split(path) + return (path == previous_path) + + +def get_hg_branch(cwd=None, root=None): + env = builtins.__xonsh_env__ + cwd = env['PWD'] + root = _get_parent_dir_for(cwd, '.hg', env['VC_BRANCH_TIMEOUT']) + if not isinstance(root, str): + # Bail if we are not in a repo or we timed out + if root: + return None + else: + return subprocess.TimeoutExpired(['hg'], env['VC_BRANCH_TIMEOUT']) + # get branch name + branch_path = os.path.sep.join([root, '.hg', 'branch']) + if os.path.exists(branch_path): + with open(branch_path, 'r') as branch_file: + branch = branch_file.read() + else: + branch = 'default' + # add bookmark, if we can + bookmark_path = os.path.sep.join([root, '.hg', 'bookmarks.current']) + if os.path.exists(bookmark_path): + with open(bookmark_path, 'r') as bookmark_file: + active_bookmark = bookmark_file.read() + branch = "{0}, {1}".format(*(b.strip(os.linesep) for b in + (branch, active_bookmark))) + else: + branch = branch.strip(os.linesep) + return branch + + +_FIRST_BRANCH_TIMEOUT = True + + +def _first_branch_timeout_message(): + global _FIRST_BRANCH_TIMEOUT + sbtm = builtins.__xonsh_env__['SUPPRESS_BRANCH_TIMEOUT_MESSAGE'] + if not _FIRST_BRANCH_TIMEOUT or sbtm: + return + _FIRST_BRANCH_TIMEOUT = False + print('xonsh: branch timeout: computing the branch name, color, or both ' + 'timed out while formatting the prompt. You may avoid this by ' + 'increaing the value of $VC_BRANCH_TIMEOUT or by removing branch ' + 'fields, like {curr_branch}, from your $PROMPT. See the FAQ ' + 'for more details. This message will be suppressed for the remainder ' + 'of this session. To suppress this message permanently, set ' + '$SUPPRESS_BRANCH_TIMEOUT_MESSAGE = True in your xonshrc file.', + file=sys.stderr) + + +def current_branch(pad=NotImplemented): + """Gets the branch for a current working directory. Returns an empty string + if the cwd is not a repository. This currently only works for git and hg + and should be extended in the future. If a timeout occurred, the string + '' is returned. + """ + if pad is not NotImplemented: + warnings.warn("The pad argument of current_branch has no effect now " + "and will be removed in the future") + branch = None + cmds = builtins.__xonsh_commands_cache__ + if cmds.lazy_locate_binary('git') or cmds.is_empty(): + branch = get_git_branch() + if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and not branch: + branch = get_hg_branch() + if isinstance(branch, subprocess.TimeoutExpired): + branch = '' + _first_branch_timeout_message() + return branch or None + + +def git_dirty_working_directory(cwd=None, include_untracked=False): + """Returns whether or not the git directory is dirty. If this could not + be determined (timeout, file not sound, etc.) then this returns None. + """ + cmd = ['git', 'status', '--porcelain'] + if include_untracked: + cmd.append('--untracked-files=normal') + else: + cmd.append('--untracked-files=no') + env = builtins.__xonsh_env__ + cwd = env['PWD'] + denv = env.detype() + vcbt = env['VC_BRANCH_TIMEOUT'] + try: + s = subprocess.check_output(cmd, stderr=subprocess.PIPE, cwd=cwd, + timeout=vcbt, universal_newlines=True, + env=denv) + if ON_WINDOWS and len(s) == 0: + # Workaround for a bug in ConEMU/cmder, retry without redirection + s = subprocess.check_output(cmd, cwd=cwd, timeout=vcbt, + env=denv, universal_newlines=True) + return bool(s) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, + FileNotFoundError): + return None + + +def hg_dirty_working_directory(): + """Computes whether or not the mercurial working directory is dirty or not. + If this cannot be deterimined, None is returned. + """ + env = builtins.__xonsh_env__ + cwd = env['PWD'] + denv = env.detype() + vcbt = env['VC_BRANCH_TIMEOUT'] + # Override user configurations settings and aliases + denv['HGRCPATH'] = '' + try: + s = subprocess.check_output(['hg', 'identify', '--id'], + stderr=subprocess.PIPE, cwd=cwd, timeout=vcbt, + universal_newlines=True, env=denv) + return s.strip(os.linesep).endswith('+') + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, + FileNotFoundError): + return None + + +def dirty_working_directory(cwd=None): + """Returns a boolean as to whether there are uncommitted files in version + control repository we are inside. If this cannot be determined, returns + None. Currently supports git and hg. + """ + dwd = None + cmds = builtins.__xonsh_commands_cache__ + if cmds.lazy_locate_binary('git') or cmds.is_empty(): + dwd = git_dirty_working_directory() + if (cmds.lazy_locate_binary('hg') or cmds.is_empty()) and (dwd is None): + dwd = hg_dirty_working_directory() + return dwd + + +def branch_color(): + """Return red if the current branch is dirty, yellow if the dirtiness can + not be determined, and green if it clean. These are bold, intense colors + for the foreground. + """ + dwd = dirty_working_directory() + if dwd is None: + color = '{BOLD_INTENSE_YELLOW}' + elif dwd: + color = '{BOLD_INTENSE_RED}' + else: + color = '{BOLD_INTENSE_GREEN}' + return color + + +def branch_bg_color(): + """Return red if the current branch is dirty, yellow if the dirtiness can + not be determined, and green if it clean. These are bacground colors. + """ + dwd = dirty_working_directory() + if dwd is None: + color = '{BACKGROUND_YELLOW}' + elif dwd: + color = '{BACKGROUND_RED}' + else: + color = '{BACKGROUND_GREEN}' + return color