Merge pull request #2756 from xonsh/xernel

Jupyter Xernel
This commit is contained in:
Morten Enemark Lund 2018-08-07 08:43:11 +02:00 committed by GitHub
commit 0a07353043
Failed to generate hash of commit
10 changed files with 591 additions and 50 deletions

View file

@ -77,6 +77,7 @@ For those of you who want the gritty details.
color_tools
pyghooks
jupyter_kernel
jupyter_shell
wizard
xonfig
codecache

View file

@ -0,0 +1,10 @@
.. _xonsh_jupyter_shell:
******************************************
Jupyter Shell (``xonsh.jupyter_shell``)
******************************************
.. automodule:: xonsh.jupyter_shell
:members:
:undoc-members:

15
news/xernel.rst Normal file
View file

@ -0,0 +1,15 @@
**Added:**
* New ``JupyterShell`` for interactive interfacing with Jupyter.
**Changed:** None
**Deprecated:** None
**Removed:** None
**Fixed:**
* The JupyterKernel has been fixed from a rather broken state.
**Security:** None

View file

@ -5,6 +5,6 @@ prompt_toolkit
pygments>=2.2
ply
psutil
ipykernel
pyzmq
matplotlib
doctr

View file

@ -11,6 +11,7 @@ flake8-ignore =
xonsh/built_ins.py F821 E721
xonsh/commands_cache.py F841
xonsh/history.py F821
xonsh/jupyter_kernel.py E203
xonsh/pyghooks.py F821
xonsh/style_tools.py F821
xonsh/readline_shell.py F401

View file

@ -3,6 +3,7 @@ __version__ = '0.7.2'
# amalgamate exclude jupyter_kernel parser_table parser_test_table pyghooks
# amalgamate exclude winutils wizard pytest_plugin fs macutils pygments_cache
# amalgamate exclude jupyter_shell
import os as _os
if _os.getenv('XONSH_DEBUG', ''):
pass

View file

@ -1,72 +1,397 @@
# -*- coding: utf-8 -*-
"""Hooks for Jupyter Xonsh Kernel."""
import sys
import json
import hmac
import uuid
import errno
import hashlib
import datetime
import builtins
import threading
from pprint import pformat
from argparse import ArgumentParser
from collections.abc import Set
from ipykernel.kernelbase import Kernel
import zmq
from zmq.eventloop import ioloop, zmqstream
from zmq.error import ZMQError
from xonsh import __version__ as version
from xonsh.main import main_context
from xonsh.main import setup
from xonsh.completer import Completer
MAX_SIZE = 8388608 # 8 Mb
DELIM = b"<IDS|MSG>"
class XonshKernel(Kernel):
def dump_bytes(*args, **kwargs):
"""Converts an object to JSON and returns the bytes."""
return json.dumps(*args, **kwargs).encode("ascii")
def load_bytes(b):
"""Converts bytes of JSON to an object."""
return json.loads(b.decode("ascii"))
def bind(socket, connection, port):
"""Binds a socket to a port, or a random port if needed. Returns the port."""
if port <= 0:
return socket.bind_to_random_port(connection)
else:
socket.bind("{}:{}".format(connection, port))
return port
class XonshKernel:
"""Xonsh xernal for Jupyter"""
implementation = 'Xonsh ' + version
implementation = "Xonsh " + version
implementation_version = version
language = 'xonsh'
language_version = version
banner = 'Xonsh - Python-powered, cross-platform shell'
language_info = {'name': 'xonsh',
'version': version,
'pygments_lexer': 'xonsh',
'codemirror_mode': 'shell',
'mimetype': 'text/x-sh',
'file_extension': '.xsh',
}
language = "xonsh"
language_version = version.split(".")[:3]
banner = "Xonsh - Python-powered, cross-platform shell"
language_info = {
"name": "xonsh",
"version": version,
"pygments_lexer": "xonsh",
"codemirror_mode": "shell",
"mimetype": "text/x-sh",
"file_extension": ".xsh",
}
signature_schemes = {"hmac-sha256": hashlib.sha256}
def __init__(self, **kwargs):
def __init__(self, debug_level=0, session_id=None, config=None, **kwargs):
"""
Parameters
----------
debug_level : int, optional
Integer from 0 (no debugging) to 3 (all debugging), default: 0.
session_id : str or None, optional
Unique string id representing the kernel session. If None, this will
be replaced with a random UUID.
config : dict or None, optional
Configuration dictionary to start server with. BY default will
search the command line for options (if given) or use default
configuration.
"""
self.debug_level = debug_level
self.session_id = str(uuid.uuid4()) if session_id is None else session_id
self._parser = None
self.config = self.make_default_config() if config is None else config
self.exiting = False
self.execution_count = 1
self.completer = Completer()
super().__init__(**kwargs)
def do_execute(self, code, silent, store_history=True, user_expressions=None,
allow_stdin=False):
@property
def parser(self):
if self._parser is None:
p = ArgumentParser("jupyter_kerenel")
p.add_argument("-f", dest="config_file", default=None)
self._parser = p
return self._parser
def make_default_config(self):
"""Provides default configuration"""
ns, unknown = self.parser.parse_known_args(sys.argv)
if ns.config_file is None:
self.dprint(1, "Starting xonsh kernel with default args...")
config = {
"control_port": 0,
"hb_port": 0,
"iopub_port": 0,
"ip": "127.0.0.1",
"key": str(uuid.uuid4()),
"shell_port": 0,
"signature_scheme": "hmac-sha256",
"stdin_port": 0,
"transport": "tcp",
}
else:
self.dprint(1, "Loading simple_kernel with args:", sys.argv)
self.dprint(1, "Reading config file {!r}...".format(ns.config_file))
with open(ns.config_file) as f:
config = json.load(f)
return config
def iopub_handler(self, message):
"""Handles iopub requests."""
self.dprint(2, "iopub received:", message)
def control_handler(self, wire_message):
"""Handles control requests"""
self.dprint(1, "control received:", wire_message)
identities, msg = self.deserialize_wire_message(wire_message)
if msg["header"]["msg_type"] == "shutdown_request":
self.shutdown()
def stdin_handler(self, message):
self.dprint(2, "stdin received:", message)
def start(self):
"""Starts the server"""
ioloop.install()
connection = self.config["transport"] + "://" + self.config["ip"]
secure_key = self.config["key"].encode()
digestmod = self.signature_schemes[self.config["signature_scheme"]]
self.auth = hmac.HMAC(secure_key, digestmod=digestmod)
# Heartbeat
ctx = zmq.Context()
self.heartbeat_socket = ctx.socket(zmq.REP)
self.config["hb_port"] = bind(
self.heartbeat_socket, connection, self.config["hb_port"]
)
# IOPub/Sub, aslo called SubSocketChannel in IPython sources
self.iopub_socket = ctx.socket(zmq.PUB)
self.config["iopub_port"] = bind(
self.iopub_socket, connection, self.config["iopub_port"]
)
self.iopub_stream = zmqstream.ZMQStream(self.iopub_socket)
self.iopub_stream.on_recv(self.iopub_handler)
# Control
self.control_socket = ctx.socket(zmq.ROUTER)
self.config["control_port"] = bind(
self.control_socket, connection, self.config["control_port"]
)
self.control_stream = zmqstream.ZMQStream(self.control_socket)
self.control_stream.on_recv(self.control_handler)
# Stdin:
self.stdin_socket = ctx.socket(zmq.ROUTER)
self.config["stdin_port"] = bind(
self.stdin_socket, connection, self.config["stdin_port"]
)
self.stdin_stream = zmqstream.ZMQStream(self.stdin_socket)
self.stdin_stream.on_recv(self.stdin_handler)
# Shell
self.shell_socket = ctx.socket(zmq.ROUTER)
self.config["shell_port"] = bind(
self.shell_socket, connection, self.config["shell_port"]
)
self.shell_stream = zmqstream.ZMQStream(self.shell_socket)
self.shell_stream.on_recv(self.shell_handler)
# start up configurtation
self.dprint(2, "Config:", json.dumps(self.config))
self.dprint(1, "Starting loops...")
self.hb_thread = threading.Thread(target=self.heartbeat_loop)
self.hb_thread.daemon = True
self.hb_thread.start()
self.dprint(1, "Ready! Listening...")
ioloop.IOLoop.instance().start()
def shutdown(self):
"""Shutsdown the kernel"""
self.exiting = True
ioloop.IOLoop.instance().stop()
def dprint(self, level, *args, **kwargs):
"""Print but with debug information."""
if level <= self.debug_level:
print("DEBUG" + str(level) + ":", file=sys.__stdout__, *args, **kwargs)
sys.__stdout__.flush()
def sign(self, messages):
"""Sign a message list with a secure signature."""
h = self.auth.copy()
for m in messages:
h.update(m)
return h.hexdigest().encode("ascii")
def new_header(self, message_type):
"""Make a new header"""
return {
"date": datetime.datetime.now().isoformat(),
"msg_id": str(uuid.uuid4()),
"username": "kernel",
"session": self.session_id,
"msg_type": message_type,
"version": "5.0",
}
def send(
self,
stream,
message_type,
content=None,
parent_header=None,
metadata=None,
identities=None,
):
"""Send data to the client via a stream"""
header = self.new_header(message_type)
if content is None:
content = {}
if parent_header is None:
parent_header = {}
if metadata is None:
metadata = {}
messages = list(map(dump_bytes, [header, parent_header, metadata, content]))
signature = self.sign(messages)
parts = [DELIM, signature] + messages
if identities:
parts = identities + parts
self.dprint(3, "send parts:", parts)
stream.send_multipart(parts)
if isinstance(stream, zmqstream.ZMQStream):
stream.flush()
def deserialize_wire_message(self, wire_message):
"""Split the routing prefix and message frames from a message on the wire"""
delim_idx = wire_message.index(DELIM)
identities = wire_message[:delim_idx]
m_signature = wire_message[delim_idx + 1]
msg_frames = wire_message[delim_idx + 2 :]
keys = ("header", "parent_header", "metadata", "content")
m = {k: load_bytes(v) for k, v in zip(keys, msg_frames)}
check_sig = self.sign(msg_frames)
if check_sig != m_signature:
raise ValueError("Signatures do not match")
return identities, m
def run_thread(self, loop, name):
"""Run main thread"""
self.dprint(2, "Starting loop for {name!r}...".format(name=name))
while not self.exiting:
self.dprint(2, "{} Loop!".format(name))
try:
loop.start()
except ZMQError as e:
self.dprint(1, "{} ZMQError!\n {}".format(name, e))
if e.errno == errno.EINTR:
continue
else:
raise
except Exception:
self.dprint(2, "{} Exception!".format(name))
if self.exiting:
break
else:
raise
else:
self.dprint(2, "{} Break!".format(name))
break
def heartbeat_loop(self):
"""Run heartbeat"""
self.dprint(2, "Starting heartbeat loop...")
while not self.exiting:
self.dprint(3, ".", end="")
try:
zmq.device(zmq.FORWARDER, self.heartbeat_socket, self.heartbeat_socket)
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
continue
else:
raise
else:
break
def shell_handler(self, message):
"""Dispatch shell messages to their handlers"""
self.dprint(1, "received:", message)
identities, msg = self.deserialize_wire_message(message)
handler = getattr(self, "handle_" + msg["header"]["msg_type"], None)
if handler is None:
self.dprint(0, "unknown message type:", msg["header"]["msg_type"])
return
handler(msg, identities)
def handle_execute_request(self, message, identities):
"""Handle execute request messages."""
self.dprint(2, "Xonsh Kernel Executing:", pformat(message["content"]["code"]))
# Start by sending busy signal
content = {"execution_state": "busy"}
self.send(self.iopub_stream, "status", content, parent_header=message["header"])
# confirm the input that we are executing
content = {
"execution_count": self.execution_count,
"code": message["content"]["code"],
}
self.send(
self.iopub_stream, "execute_input", content, parent_header=message["header"]
)
# execute the code
metadata = {
"dependencies_met": True,
"engine": self.session_id,
"status": "ok",
"started": datetime.datetime.now().isoformat(),
}
content = self.do_execute(parent_header=message["header"], **message["content"])
self.send(
self.shell_stream,
"execute_reply",
content,
metadata=metadata,
parent_header=message["header"],
identities=identities,
)
self.execution_count += 1
# once we are done, send a signal that we are idle
content = {"execution_state": "idle"}
self.send(self.iopub_stream, "status", content, parent_header=message["header"])
def do_execute(
self,
code="",
silent=False,
store_history=True,
user_expressions=None,
allow_stdin=False,
parent_header=None,
**kwargs
):
"""Execute user code."""
if len(code.strip()) == 0:
return {'status': 'ok', 'execution_count': self.execution_count,
'payload': [], 'user_expressions': {}}
return {
"status": "ok",
"execution_count": self.execution_count,
"payload": [],
"user_expressions": {},
}
shell = builtins.__xonsh_shell__
hist = builtins.__xonsh_history__
try:
shell.default(code)
shell.default(code, self, parent_header)
interrupted = False
except KeyboardInterrupt:
interrupted = True
if not silent: # stdout response
if hasattr(builtins, '_') and builtins._ is not None:
# rely on sys.displayhook functionality
self._respond_in_chunks('stdout', pformat(builtins._))
builtins._ = None
if hist is not None and len(hist) > 0:
self._respond_in_chunks('stdout', hist.outs[-1])
if interrupted:
return {'status': 'abort', 'execution_count': self.execution_count}
return {"status": "abort", "execution_count": self.execution_count}
rtn = 0 if (hist is None or len(hist) == 0) else hist.rtns[-1]
if 0 < rtn:
message = {'status': 'error', 'execution_count': self.execution_count,
'ename': '', 'evalue': str(rtn), 'traceback': []}
message = {
"status": "error",
"execution_count": self.execution_count,
"ename": "",
"evalue": str(rtn),
"traceback": [],
}
else:
message = {'status': 'ok', 'execution_count': self.execution_count,
'payload': [], 'user_expressions': {}}
message = {
"status": "ok",
"execution_count": self.execution_count,
"payload": [],
"user_expressions": {},
}
return message
def _respond_in_chunks(self, name, s, chunksize=1024):
def _respond_in_chunks(self, name, s, chunksize=1024, parent_header=None):
if s is None:
return
n = len(s)
@ -75,26 +400,67 @@ class XonshKernel(Kernel):
lower = range(0, n, chunksize)
upper = range(chunksize, n + chunksize, chunksize)
for l, u in zip(lower, upper):
response = {'name': name, 'text': s[l:u], }
self.send_response(self.iopub_socket, 'stream', response)
response = {"name": name, "text": s[l:u]}
self.send(
self.iopub_socket, "stream", response, parent_header=parent_header
)
def handle_complete_request(self, message, identities):
"""Handles kernel info requests."""
content = self.do_complete(
message["content"]["code"], message["content"]["cursor_pos"]
)
self.send(
self.shell_stream,
"complete_reply",
content,
parent_header=message["header"],
identities=identities,
)
def do_complete(self, code, pos):
"""Get completions."""
shell = builtins.__xonsh_shell__
line = code.split('\n')[-1]
line = code.split("\n")[-1]
line = builtins.aliases.expand_alias(line)
prefix = line.split(' ')[-1]
prefix = line.split(" ")[-1]
endidx = pos
begidx = pos - len(prefix)
rtn, _ = self.completer.complete(prefix, line, begidx,
endidx, shell.ctx)
message = {'matches': rtn, 'cursor_start': begidx, 'cursor_end': endidx,
'metadata': {}, 'status': 'ok'}
rtn, _ = self.completer.complete(prefix, line, begidx, endidx, shell.ctx)
if isinstance(rtn, Set):
rtn = list(rtn)
message = {
"matches": rtn,
"cursor_start": begidx,
"cursor_end": endidx,
"metadata": {},
"status": "ok",
}
return message
def handle_kernel_info_request(self, message, identities):
"""Handles kernel info requests."""
content = {
"protocol_version": "5.0",
"ipython_version": [1, 1, 0, ""],
"language": self.language,
"language_version": self.language_version,
"implementation": self.implementation,
"implementation_version": self.implementation_version,
"language_info": self.language_info,
"banner": self.banner,
}
self.send(
self.shell_stream,
"kernel_info_reply",
content,
parent_header=message["header"],
identities=identities,
)
if __name__ == '__main__':
from ipykernel.kernelapp import IPKernelApp
# must manually pass in args to avoid interfering w/ Jupyter arg parsing
with main_context(argv=['--shell-type=readline']):
IPKernelApp.launch_instance(kernel_class=XonshKernel)
if __name__ == "__main__":
setup(shell_type="jupyter", env={})
shell = builtins.__xonsh_shell__
kernel = shell.kernel = XonshKernel()
kernel.start()

145
xonsh/jupyter_shell.py Normal file
View file

@ -0,0 +1,145 @@
"""An interactive shell for the Jupyter kernel."""
import io
import sys
import builtins
from xonsh.base_shell import BaseShell
class StdJupyterRedirectBuf(io.RawIOBase):
"""Redirects standard I/O buffers to the Jupyter kernel."""
def __init__(self, redirect):
self.redirect = redirect
self.encoding = redirect.encoding
self.errors = redirect.errors
def fileno(self):
"""Returns the file descriptor of the std buffer."""
return self.redirect.fileno()
def seek(self, offset, whence=io.SEEK_SET):
"""Sets the location in both the stdbuf and the membuf."""
raise io.UnsupportedOperation('cannot seek Jupyter redirect')
def truncate(self, size=None):
"""Truncate both buffers."""
raise io.UnsupportedOperation('cannot truncate Jupyter redirect')
def readinto(self, b):
"""Read bytes into buffer from both streams."""
raise io.UnsupportedOperation('cannot read into Jupyter redirect')
def write(self, b):
"""Write bytes to kernel."""
s = b if isinstance(b, str) else b.decode(self.encoding, self.errors)
self.redirect.write(s)
class StdJupyterRedirect(io.TextIOBase):
"""Redirects a standard I/O stream to the Jupyter kernel."""
def __init__(self, name, kernel, parent_header=None):
"""
Parameters
----------
name : str
The name of the buffer in the sys module, e.g. 'stdout'.
kernel : XonshKernel
Instance of a Jupyter kernel
parent_header : dict or None, optional
parent header information to pass along with the kernel
"""
self._name = name
self.kernel = kernel
self.parent_header = parent_header
self.std = getattr(sys, name)
self.buffer = StdJupyterRedirectBuf(self)
setattr(sys, name, self)
@property
def encoding(self):
"""The encoding of the stream"""
env = builtins.__xonsh_env__
return getattr(self.std, 'encoding', env.get('XONSH_ENCODING'))
@property
def errors(self):
"""The encoding errors of the stream"""
env = builtins.__xonsh_env__
return getattr(self.std, 'errors', env.get('XONSH_ENCODING_ERRORS'))
@property
def newlines(self):
"""The newlines of the standard buffer."""
return self.std.newlines
def _replace_std(self):
std = self.std
if std is None:
return
setattr(sys, self._name, std)
self.std = None
def __del__(self):
self._replace_std()
def close(self):
"""Restores the original std stream."""
self._replace_std()
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
def write(self, s):
"""Writes data to the original kernel stream."""
self.kernel._respond_in_chunks(self._name, s,
parent_header=self.parent_header)
def flush(self):
"""Flushes kernel iopub_stream."""
self.kernel.iopub_stream.flush()
def fileno(self):
"""Tunnel fileno() calls to the std stream."""
return self.std.fileno()
def seek(self, offset, whence=io.SEEK_SET):
"""Seek to a location."""
raise io.UnsupportedOperation('cannot seek Jupyter redirect')
def truncate(self, size=None):
"""Truncate the streams."""
raise io.UnsupportedOperation('cannot truncate Jupyter redirect')
def detach(self):
"""This operation is not supported."""
raise io.UnsupportedOperation('cannot detach a Jupyter redirect')
def read(self, size=None):
"""Read from the stream"""
raise io.UnsupportedOperation('cannot read a Jupyter redirect')
def readline(self, size=-1):
"""Read a line."""
raise io.UnsupportedOperation('cannot read a line from a Jupyter redirect')
class JupyterShell(BaseShell):
"""A shell for the Jupyter kernel."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.kernel = None
def default(self, line, kernel, parent_header=None):
"""Executes code, but redirects output to Jupyter client"""
stdout = StdJupyterRedirect('stdout', kernel, parent_header)
stderr = StdJupyterRedirect('stderr', kernel, parent_header)
with stdout, stderr:
rtn = super().default(line)
return rtn

View file

@ -436,6 +436,6 @@ def setup(ctx=None, shell_type='none', env=(('RAISE_SUBPROC_ERROR', True),)):
builtins.__xonsh_execer__ = Execer(xonsh_ctx=ctx)
builtins.__xonsh_shell__ = Shell(builtins.__xonsh_execer__,
ctx=ctx,
shell_type='none')
shell_type=shell_type)
builtins.__xonsh_env__.update(env)
install_import_hooks()

View file

@ -166,6 +166,8 @@ class Shell(object):
from xonsh.ptk.shell import PromptToolkitShell as shell_class
elif shell_type == 'readline':
from xonsh.readline_shell import ReadlineShell as shell_class
elif shell_type == 'jupyter':
from xonsh.jupyter_shell import JupyterShell as shell_class
else:
raise XonshError('{} is not recognized as a shell type'.format(
shell_type))