mirror of
https://github.com/xonsh/xonsh.git
synced 2025-03-04 08:24:40 +01:00
![pre-commit-ci[bot]](/assets/img/avatar_default.png)
* [pre-commit.ci] pre-commit autoupdate updates: - [github.com/psf/black: 23.12.1 → 24.1.1](https://github.com/psf/black/compare/23.12.1...24.1.1) * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
121 lines
3.9 KiB
Python
121 lines
3.9 KiB
Python
"""Context management tools for xonsh."""
|
|
|
|
import sys
|
|
import textwrap
|
|
from collections.abc import Mapping
|
|
|
|
from xonsh.built_ins import XSH
|
|
|
|
|
|
class Block:
|
|
"""This is a context manager for obtaining a block of lines without actually
|
|
executing the block. The lines are accessible as the 'lines' attribute.
|
|
This must be used as a macro.
|
|
"""
|
|
|
|
__xonsh_block__ = str
|
|
|
|
def __init__(self):
|
|
"""
|
|
Attributes
|
|
----------
|
|
lines : list of str or None
|
|
Block lines as if split by str.splitlines(), if available.
|
|
glbs : Mapping or None
|
|
Global execution context, ie globals().
|
|
locs : Mapping or None
|
|
Local execution context, ie locals().
|
|
"""
|
|
self.lines = self.glbs = self.locs = None
|
|
|
|
def __enter__(self):
|
|
if not hasattr(self, "macro_block"):
|
|
raise XSH.builtins.XonshError(
|
|
self.__class__.__name__ + " must be entered as a macro!"
|
|
)
|
|
self.lines = self.macro_block.splitlines()
|
|
self.glbs = self.macro_globals
|
|
if self.macro_locals is not self.macro_globals:
|
|
# leave locals as None when it is the same as globals
|
|
self.locs = self.macro_locals
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
pass
|
|
|
|
|
|
class Functor(Block):
|
|
"""This is a context manager that turns the block into a callable
|
|
object, bound to the execution context it was created in.
|
|
"""
|
|
|
|
def __init__(self, args=(), kwargs=None, rtn=""):
|
|
"""
|
|
Parameters
|
|
----------
|
|
args : Sequence of str, optional
|
|
A tuple of argument names for the functor.
|
|
kwargs : Mapping of str to values or list of item tuples, optional
|
|
Keyword argument names and values, if available.
|
|
rtn : str, optional
|
|
Name of object to return, if available.
|
|
|
|
Attributes
|
|
----------
|
|
func : function
|
|
The underlying function object. This defaults to none and is set
|
|
after the the block is exited.
|
|
"""
|
|
super().__init__()
|
|
self.func = None
|
|
self.args = args
|
|
if kwargs is None:
|
|
self.kwargs = []
|
|
elif isinstance(kwargs, Mapping):
|
|
self.kwargs = sorted(kwargs.items())
|
|
else:
|
|
self.kwargs = kwargs
|
|
self.rtn = rtn
|
|
|
|
def __enter__(self):
|
|
super().__enter__()
|
|
body = textwrap.indent(self.macro_block, " ")
|
|
uid = hash(body) + sys.maxsize # should always be a positive int
|
|
name = f"__xonsh_functor_{uid}__"
|
|
# construct signature string
|
|
sig = rtn = ""
|
|
sig = ", ".join(self.args)
|
|
kwstr = ", ".join([k + "=None" for k, _ in self.kwargs])
|
|
if len(kwstr) > 0:
|
|
sig = kwstr if len(sig) == 0 else sig + ", " + kwstr
|
|
# construct return string
|
|
rtn = str(self.rtn)
|
|
if len(rtn) > 0:
|
|
rtn = " return " + rtn + "\n"
|
|
# construct function string
|
|
fstr = "def {name}({sig}):\n{body}\n{rtn}"
|
|
fstr = fstr.format(name=name, sig=sig, body=body, rtn=rtn)
|
|
glbs = self.glbs
|
|
locs = self.locs
|
|
execer = XSH.execer
|
|
execer.exec(fstr, glbs=glbs, locs=locs)
|
|
if locs is not None and name in locs:
|
|
func = locs[name]
|
|
elif name in glbs:
|
|
func = glbs[name]
|
|
else:
|
|
raise ValueError("Functor block could not be found in context.")
|
|
if len(self.kwargs) > 0:
|
|
func.__defaults__ = tuple(v for _, v in self.kwargs)
|
|
self.func = func
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
pass
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
"""Dispatches to func."""
|
|
if self.func is None:
|
|
msg = "{} block with 'None' func not callable"
|
|
raise AttributeError(msg.formst(self.__class__.__name__))
|
|
return self.func(*args, **kwargs)
|