mirror of
https://github.com/xonsh/xonsh.git
synced 2025-03-04 08:24:40 +01:00

* refactor: remove usage of global variables in abbrevs.py * chore: add flake8-mutable to prevent mutable defaults * fix: abbrevs expand test * refactor: add xonsh session singleton * refactor: fix circular errors when using xonshSession as singleton * refactor: remove black magicked builtin attributes * style: black format tests as well * refactor: update tests to use xonsh-session singleton * refactor: update abbrevs to not use builtins * test: remove DummyCommandsCache and patch orig class * fix: failing test_command_completers * test: use monkeypatch to update xession fixture * fix: failing test_pipelines * fix: failing test_main * chore: run test suit as single invocation * test: fix tests/test_xonsh.xsh * refactor: remove builtins from docs/conf.py * fix: mypy error in jobs * fix: test error from test_main * test: close xession error in test_command_completers * chore: use pytest-cov for reporting coverage this will include subprocess calls, and will increase coverage * style:
74 lines
2.1 KiB
Python
74 lines
2.1 KiB
Python
"""Hooks for the distributed parallel computing library."""
|
|
from xonsh.contexts import Functor
|
|
from xonsh.built_ins import XSH
|
|
|
|
__all__ = ["DSubmitter", "dsubmit"]
|
|
|
|
|
|
def dworker(args, stdin=None):
|
|
"""Programmatic access to the dworker utility, to allow launching
|
|
workers that also have access to xonsh builtins.
|
|
"""
|
|
from distributed.cli import dworker
|
|
|
|
dworker.main.main(args=args, prog_name="dworker", standalone_mode=False)
|
|
|
|
|
|
XSH.aliases["dworker"] = dworker
|
|
|
|
|
|
class DSubmitter(Functor):
|
|
"""Context manager for submitting distributed jobs."""
|
|
|
|
def __init__(self, executor, **kwargs):
|
|
"""
|
|
Parameters
|
|
----------
|
|
executor : distributed.Executor
|
|
The executor to submit to.
|
|
kwargs : optional
|
|
All other kwargs are passed up to superclasses init.
|
|
"""
|
|
super().__init__(**kwargs)
|
|
self.executor = executor
|
|
self.future = None
|
|
|
|
def __enter__(self):
|
|
super().__enter__()
|
|
self.future = None
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
res = super().__exit__(exc_type, exc_value, traceback)
|
|
if not res:
|
|
return res
|
|
self.future = self.executor.submit(self.func)
|
|
return res
|
|
|
|
|
|
def dsubmit(*a, args=(), kwargs=None, rtn="", **kw):
|
|
"""Returns a distributed submission context manager, DSubmitter(),
|
|
with a new executor instance.
|
|
|
|
Parameters
|
|
----------
|
|
args : Sequence of str, optional
|
|
A tuple of argument names for DSubmitter.
|
|
kwargs : Mapping of str to values or list of item tuples, optional
|
|
Keyword argument names and values for DSubmitter.
|
|
rtn : str, optional
|
|
Name of object to return for DSubmitter.
|
|
a, kw : Sequence and Mapping
|
|
All other arguments and keyword arguments are used to construct
|
|
the executor instance.
|
|
|
|
Returns
|
|
-------
|
|
dsub : DSubmitter
|
|
An instance of the DSubmitter context manager.
|
|
"""
|
|
from distributed import Executor
|
|
|
|
e = Executor(*a, **kw)
|
|
dsub = DSubmitter(e, args=args, kwargs=kwargs, rtn=rtn)
|
|
return dsub
|