xonsh/xontrib/distributed.py
Noorhteen Raja NJ 38295a1dd9
Remove globals (#4280)
* refactor: remove usage of global variables in abbrevs.py

* chore: add flake8-mutable to prevent mutable defaults

* fix: abbrevs expand test

* refactor: add xonsh session singleton

* refactor: fix circular errors when using xonshSession as singleton

* refactor: remove black magicked builtin attributes

* style: black format tests as well

* refactor: update tests to use xonsh-session singleton

* refactor: update abbrevs to not use builtins

* test: remove DummyCommandsCache and patch orig class

* fix: failing test_command_completers

* test: use monkeypatch to update xession fixture

* fix: failing test_pipelines

* fix: failing test_main

* chore: run test suit as single invocation

* test: fix tests/test_xonsh.xsh

* refactor: remove builtins from docs/conf.py

* fix: mypy error in jobs

* fix: test error from test_main

* test: close xession error in test_command_completers

* chore: use pytest-cov for reporting coverage

this will include subprocess calls, and will increase coverage

* style:
2021-05-20 13:14:26 +03:00

74 lines
2.1 KiB
Python

"""Hooks for the distributed parallel computing library."""
from xonsh.contexts import Functor
from xonsh.built_ins import XSH
__all__ = ["DSubmitter", "dsubmit"]
def dworker(args, stdin=None):
"""Programmatic access to the dworker utility, to allow launching
workers that also have access to xonsh builtins.
"""
from distributed.cli import dworker
dworker.main.main(args=args, prog_name="dworker", standalone_mode=False)
XSH.aliases["dworker"] = dworker
class DSubmitter(Functor):
"""Context manager for submitting distributed jobs."""
def __init__(self, executor, **kwargs):
"""
Parameters
----------
executor : distributed.Executor
The executor to submit to.
kwargs : optional
All other kwargs are passed up to superclasses init.
"""
super().__init__(**kwargs)
self.executor = executor
self.future = None
def __enter__(self):
super().__enter__()
self.future = None
return self
def __exit__(self, exc_type, exc_value, traceback):
res = super().__exit__(exc_type, exc_value, traceback)
if not res:
return res
self.future = self.executor.submit(self.func)
return res
def dsubmit(*a, args=(), kwargs=None, rtn="", **kw):
"""Returns a distributed submission context manager, DSubmitter(),
with a new executor instance.
Parameters
----------
args : Sequence of str, optional
A tuple of argument names for DSubmitter.
kwargs : Mapping of str to values or list of item tuples, optional
Keyword argument names and values for DSubmitter.
rtn : str, optional
Name of object to return for DSubmitter.
a, kw : Sequence and Mapping
All other arguments and keyword arguments are used to construct
the executor instance.
Returns
-------
dsub : DSubmitter
An instance of the DSubmitter context manager.
"""
from distributed import Executor
e = Executor(*a, **kw)
dsub = DSubmitter(e, args=args, kwargs=kwargs, rtn=rtn)
return dsub