Merge branch 'master' of github.com:mitnk/xonsh into hist-be

This commit is contained in:
Hugo Wang 2017-02-05 22:07:16 +08:00
commit 4a5b561014
21 changed files with 578 additions and 125 deletions

View file

@ -1,6 +1,7 @@
machine:
environment:
PATH: /home/ubuntu/miniconda/envs/test_env/bin:/home/ubuntu/miniconda/bin:$PATH
XONSH_DEBUG: 1
post:
- pyenv global 3.4.4 3.5.2

View file

@ -126,6 +126,12 @@ Provides an interface to printing lines of source code prior to their execution.
.. command-help:: xonsh.tracer.tracermain
``xip``
=================
Runs the ``pip`` package manager for xonsh itself. Useful for installations where xonsh is in an
isolated environment (eg homebrew). Pronounced "kip".
``xonfig``
=================
Manages xonsh configuration information.

View file

@ -449,8 +449,8 @@ Exciting Technical Detail: Teeing and Pseudo Terminals
Xonsh is able to capture all stdout and stderr transparently and responsively. For aliases,
Python code, or xonsh code, this isn't a big deal. It is easy to redirect information
flowing through ``sys.stdout`` and ``sys.stderr``. For subprocess commands, this is
considerably harder. (Subprocess stdout capturing is currently skipped on Windows, though
is theoretically possible.)
considerably harder. Storing stdout is disabled by default, but can be enabled by setting:
``$XONSH_STORE_STDOUT=True`` in your ``~/.xonshrc`` file.
To be able to tee stdout and stderr and still have the terminal responsive, xonsh implements
its own teeing pseudo-terminal on top of the Python standard library ``pty`` module. You

15
news/history-indexing.rst Normal file
View file

@ -0,0 +1,15 @@
**Added:**
* HistoryEntry, a SimpleNamespace object that represents a command in history.
**Changed:**
* history indexing api to be more simple, now returns HistoryEntry.
**Deprecated:** None
**Removed:** None
**Fixed:** None
**Security:** None

17
news/uptime.rst Normal file
View file

@ -0,0 +1,17 @@
**Added:**
* Uptime module added to ``xonsh.xoreutils``. This can report the system
boot time and up time.
**Changed:**
* The JSON history backend will now unlock history files that were created
prior to the last reboot.
**Deprecated:** None
**Removed:** None
**Fixed:** None
**Security:** None

View file

@ -0,0 +1,13 @@
**Added:** None
**Changed:** None
**Deprecated:** None
**Removed:** None
**Fixed:**
* Fixed broken bash completions on Windows if 'Windows Subsystem for Linux' is installed.
**Security:** None

13
news/xip.rst Normal file
View file

@ -0,0 +1,13 @@
**Added:**
* Add alias ``xip`` ("kip") so that xonsh's Python environment (whatever that is) can be modified.
**Changed:** None
**Deprecated:** None
**Removed:** None
**Fixed:** None
**Security:** None

View file

@ -4,13 +4,15 @@
import os
import shlex
import pytest
from xonsh.lazyjson import LazyJSON
from xonsh.history.dummy import DummyHistory
from xonsh.history.json import JsonHistory
from xonsh.history.main import history_main, _xh_parse_args, construct_history
import pytest
CMDS = ['ls', 'cat hello kitty', 'abc', 'def', 'touch me', 'grep from me']
@pytest.yield_fixture
def hist():
@ -119,8 +121,6 @@ def test_cmd_field(hist, xonsh_builtins):
assert None == hist.outs[-1]
CMDS = ['ls', 'cat hello kitty', 'abc', 'def', 'touch me', 'grep from me']
@pytest.mark.parametrize('inp, commands, offset', [
('', CMDS, (0, 1)),
('-r', list(reversed(CMDS)), (len(CMDS)- 1, -1)),
@ -257,18 +257,24 @@ def test_parser_show(args, exp):
@pytest.mark.parametrize('index, exp', [
(-1, 'grep from me'),
('hello', 'cat hello kitty'),
((-1, -1), 'me'),
(('hello', 0), 'cat'),
((-1, slice(0,2)), 'grep from'),
(('kitty', slice(1,3)), 'hello kitty')
(-1, ('grep from me', 'out', 0, (5, 6))),
(1, ('cat hello kitty', 'out', 0, (1, 2))),
(slice(1, 3), [('cat hello kitty', 'out', 0, (1, 2)),
('abc', 'out', 0, (2, 3))]),
])
def test_history_getitem(index, exp, hist, xonsh_builtins):
xonsh_builtins.__xonsh_env__['HISTCONTROL'] = set()
attrs = ('inp', 'out', 'rtn', 'ts')
for ts,cmd in enumerate(CMDS): # populate the shell history
hist.append({'inp': cmd, 'rtn': 0, 'ts':(ts + 1, ts + 1.5)})
assert hist[index] == exp
entry = {k: v for k, v in zip(attrs, [cmd, 'out', 0, (ts, ts+1)])}
hist.append(entry)
entry = hist[index]
if isinstance(entry, list):
assert [(e.cmd, e.out, e.rtn, e.ts) for e in entry] == exp
else:
assert (entry.cmd, entry.out, entry.rtn, entry.ts) == exp
def test_construct_history_str(xonsh_builtins):

View file

@ -161,18 +161,3 @@ def test_histcontrol(hist, xonsh_builtins):
assert '/bin/ls' == items[-1]['inp']
assert 0 == items[-1]['rtn']
assert -1 == hist.rtns[-1]
@pytest.mark.parametrize('index, exp', [
(-1, 'grep from me'),
('hello', 'cat hello kitty'),
((-1, -1), 'me'),
(('hello', 0), 'cat'),
((-1, slice(0, 2)), 'grep from'),
(('kitty', slice(1, 3)), 'hello kitty')
])
def test_history_getitem(index, exp, hist, xonsh_builtins):
xonsh_builtins.__xonsh_env__['HISTCONTROL'] = set()
for ts, cmd in enumerate(CMDS): # populate the shell history
hist.append({'inp': cmd, 'rtn': 0, 'ts': (ts + 1, ts + 1.5)})
assert hist[index] == exp

View file

@ -2,6 +2,7 @@ import os
import tempfile
from xonsh.xoreutils import _which
from xonsh.xoreutils import uptime
from xonsh.tools import ON_WINDOWS
@ -85,3 +86,17 @@ class TestWhich:
return path1 == path2
else:
return os.path.samefile(path1, path2)
def test_uptime():
up = uptime.uptime()
assert up is not None
assert up > 0.0
def test_boottime():
bt = uptime.boottime()
assert bt is not None
assert bt > 0.0
assert uptime._BOOTTIME is not None
assert uptime._BOOTTIME > 0.0

View file

@ -1,7 +1,7 @@
__version__ = '0.5.3'
# amalgamate exclude jupyter_kernel parser_table parser_test_table pyghooks
# amalgamate exclude winutils wizard pytest_plugin fs
# amalgamate exclude winutils wizard pytest_plugin fs macutils
import os as _os
if _os.getenv('XONSH_DEBUG', ''):
pass

View file

@ -380,6 +380,27 @@ def showcmd(args, stdin=None):
sys.displayhook(args)
def detect_xip_alias():
"""
Determines the correct invokation to get xonsh's pip
"""
if not getattr(sys, 'executable', None):
return lambda args, stdin=None: ("", "Sorry, unable to run pip on your system (missing sys.executable)", 1)
basecmd = [sys.executable, '-m', 'pip']
try:
if ON_WINDOWS:
# XXX: Does windows have an installation mode that requires UAC?
return basecmd
elif not os.access(os.path.dirname(sys.executable), os.W_OK):
return ['sudo'] + basecmd
else:
return basecmd
except Exception:
# Something freaky happened, return something that'll probably work
return basecmd
def make_default_aliases():
"""Creates a new default aliases dictionary."""
default_aliases = {
@ -410,7 +431,8 @@ def make_default_aliases():
'ipynb': ['jupyter', 'notebook', '--no-browser'],
'which': xxw.which,
'xontrib': xontribs_main,
'completer': xca.completer_alias
'completer': xca.completer_alias,
'xip': detect_xip_alias(),
}
if ON_WINDOWS:
# Borrow builtin commands from cmd.exe.

View file

@ -1,7 +1,25 @@
# -*- coding: utf-8 -*-
"""Base class of Xonsh History backends."""
import types
import uuid
import xonsh.tools as xt
class HistoryEntry(types.SimpleNamespace):
"""Represent a command in history.
Attributes
----------
cmd: str
The command as typed by the user, including newlines
out: str
The output of the command, if xonsh is configured to save it
rtn: int
The return of the command (ie, 0 on success)
ts: two-tuple of floats
The timestamps of when the command started and finished, including
fractions.
"""
class History:
@ -11,31 +29,10 @@ class History:
Indexing
--------
History object acts like a sequence that can be indexed in a special
way that adds extra functionality. Note that the most recent command
is the last item in history.
History acts like a sequence that can be indexed to return
``HistoryEntry`` objects.
The index acts as a filter with two parts, command and argument,
separated by comma. Based on the type of each part different
filtering can be achieved,
for the command part:
- an int returns the command in that position.
- a slice returns a list of commands.
- a string returns the most recent command containing the string.
for the argument part:
- an int returns the argument of the command in that position.
- a slice returns a part of the command based on the argument
position.
The argument part of the filter can be omitted but the command part is
required. Command arguments are separated by white space.
If the filtering produces only one result it is returned as a string
else a list of strings is returned.
Note that the most recent command is the last item in history.
Attributes
----------
@ -79,23 +76,22 @@ class History:
return len(list(self.items()))
def __getitem__(self, item):
"""Retrieve history parts based on filtering rules,
see ``History`` docs for more info. Accepts one of
int, string, slice or tuple of length two.
"""
if isinstance(item, tuple):
cmd_pat, arg_pat = item
"""Retrieve history entries, see ``History`` docs for more info."""
if isinstance(item, int):
if item >= len(self):
raise IndexError('history index out of range')
return HistoryEntry(cmd=self.inps[item], out=self.outs[item],
rtn=self.rtns[item], ts=self.tss[item])
elif isinstance(item, slice):
cmds = self.inps[item]
outs = self.outs[item]
rtns = self.rtns[item]
tss = self.tss[item]
return [HistoryEntry(cmd=c, out=o, rtn=r, ts=t)
for c, o, r, t in zip(cmds, outs, rtns, tss)]
else:
cmd_pat, arg_pat = item, None
cmds = [c['inp'] for c in self.items()]
cmds = self._cmd_filter(cmds, cmd_pat)
if arg_pat is not None:
cmds = self._args_filter(cmds, arg_pat)
cmds = list(cmds)
if len(cmds) == 1:
return cmds[0]
else:
return cmds
raise TypeError('history indices must be integers '
'or slices, not {}'.format(type(item)))
def __setitem__(self, *args):
raise PermissionError('You cannot change history! '
@ -145,27 +141,3 @@ class History:
If set blocking, then wait until gc action finished.
"""
pass
@staticmethod
def _cmd_filter(cmds, pat):
if isinstance(pat, (int, slice)):
s = xt.ensure_slice(pat)
yield from xt.get_portions(cmds, s)
elif xt.is_string(pat):
for command in reversed(list(cmds)):
if pat in command:
yield command
return
else:
raise TypeError('Command filter must be string, int or slice')
@staticmethod
def _args_filter(cmds, pat):
args = None
if isinstance(pat, (int, slice)):
s = xt.ensure_slice(pat)
for command in cmds:
yield ' '.join(command.split()[s])
else:
raise TypeError('Argument filter must be int or slice')
return args

View file

@ -10,6 +10,7 @@ import collections.abc as cabc
from xonsh.history.base import History
import xonsh.tools as xt
import xonsh.lazyjson as xlj
import xonsh.xoreutils.uptime as uptime
def _xhj_gc_commands_to_rmfiles(hsize, files):
@ -122,7 +123,7 @@ class JsonHistoryGC(threading.Thread):
env = getattr(builtins, '__xonsh_env__', None)
if env is None:
return []
boot = uptime.boottime()
fs = _xhj_get_history_files(sort=False)
files = []
for f in fs:
@ -132,10 +133,19 @@ class JsonHistoryGC(threading.Thread):
files.append((time.time(), 0, f))
continue
lj = xlj.LazyJSON(f, reopen=False)
if lj['locked'] and lj['ts'][0] < boot:
# computer was rebooted between when this history was created
# and now and so this history should be unlocked.
hist = lj.load()
lj.close()
hist['locked'] = False
with open(f, 'w', newline='\n') as fp:
xlj.ljdump(hist, fp, sort_keys=True)
lj = xlj.LazyJSON(f, reopen=False)
if only_unlocked and lj['locked']:
continue
# info: closing timestamp, number of commands, filename
files.append((lj['ts'][1] or time.time(),
files.append((lj['ts'][1] or lj['ts'][0],
len(lj.sizes['cmds']) - 1,
f))
lj.close()

View file

@ -11,7 +11,7 @@ import subprocess
import collections
from xonsh.lazyasd import LazyObject
from xonsh.platform import ON_DARWIN, ON_WINDOWS, ON_CYGWIN
from xonsh.platform import ON_DARWIN, ON_WINDOWS, ON_CYGWIN, LIBC
tasks = LazyObject(collections.deque, globals(), 'tasks')
@ -134,9 +134,6 @@ else:
# give_terminal_to from bash 4.3 source, jobs.c, line 4030
# this will give the terminal to the process group pgid
if ON_CYGWIN:
_libc = LazyObject(lambda: ctypes.CDLL('cygwin1.dll'),
globals(), '_libc')
# on cygwin, signal.pthread_sigmask does not exist in Python, even
# though pthread_sigmask is defined in the kernel. thus, we use
# ctypes to mimic the calls in the "normal" version below.
@ -145,16 +142,16 @@ else:
if st is not None and os.isatty(st):
omask = ctypes.c_ulong()
mask = ctypes.c_ulong()
_libc.sigemptyset(ctypes.byref(mask))
LIBC.sigemptyset(ctypes.byref(mask))
for i in _block_when_giving:
_libc.sigaddset(ctypes.byref(mask), ctypes.c_int(i))
_libc.sigemptyset(ctypes.byref(omask))
_libc.sigprocmask(ctypes.c_int(signal.SIG_BLOCK),
ctypes.byref(mask),
ctypes.byref(omask))
_libc.tcsetpgrp(ctypes.c_int(st), ctypes.c_int(pgid))
_libc.sigprocmask(ctypes.c_int(signal.SIG_SETMASK),
ctypes.byref(omask), None)
LIBC.sigaddset(ctypes.byref(mask), ctypes.c_int(i))
LIBC.sigemptyset(ctypes.byref(omask))
LIBC.sigprocmask(ctypes.c_int(signal.SIG_BLOCK),
ctypes.byref(mask),
ctypes.byref(omask))
LIBC.tcsetpgrp(ctypes.c_int(st), ctypes.c_int(pgid))
LIBC.sigprocmask(ctypes.c_int(signal.SIG_SETMASK),
ctypes.byref(omask), None)
else:
def _give_terminal_to(pgid):
st = _shell_tty()

View file

@ -1,7 +1,7 @@
"""Lazy imports that may apply across the xonsh package."""
import importlib
from xonsh.platform import ON_WINDOWS
from xonsh.platform import ON_WINDOWS, ON_DARWIN
from xonsh.lazyasd import LazyObject, lazyobject
pygments = LazyObject(lambda: importlib.import_module('pygments'),
@ -69,6 +69,15 @@ def winutils():
return m
@lazyobject
def macutils():
if ON_DARWIN:
import xonsh.macutils as m
else:
m = None
return m
@lazyobject
def terminal256():
return importlib.import_module('pygments.formatters.terminal256')

22
xonsh/macutils.py Normal file
View file

@ -0,0 +1,22 @@
"""Provides some Mac / Darwin based utility functions for xonsh."""
from ctypes import c_uint, byref, create_string_buffer
from xonsh.platform import LIBC
def sysctlbyname(name, return_str=True):
"""Gets a sysctrl value by name. If return_str is true, this will return
a string representation, else it will return the raw value.
"""
# forked from https://gist.github.com/pudquick/581a71425439f2cf8f09
size = c_uint(0)
# Find out how big our buffer will be
LIBC.sysctlbyname(name, None, byref(size), None, 0)
# Make the buffer
buf = create_string_buffer(size.value)
# Re-run, but provide the buffer
LIBC.sysctlbyname(name, buf, byref(size), None, 0)
if return_str:
return buf.value
else:
return buf.raw

View file

@ -4,8 +4,10 @@ on a platform.
"""
import os
import sys
import ctypes
import signal
import pathlib
import builtins
import platform
import functools
import subprocess
@ -50,9 +52,18 @@ ON_FREEBSD = LazyBool(lambda: (sys.platform.startswith('freebsd')),
ON_NETBSD = LazyBool(lambda: (sys.platform.startswith('netbsd')),
globals(), 'ON_NETBSD')
"""``True`` if on a NetBSD operating system, else ``False``."""
ON_BSD = LazyBool(lambda: ON_FREEBSD or ON_NETBSD,
globals(), 'ON_BSD')
"""``True`` if on a BSD operating system, else ``False``."""
@lazybool
def ON_BSD():
"""``True`` if on a BSD operating system, else ``False``."""
return bool(ON_FREEBSD) or bool(ON_NETBSD)
@lazybool
def ON_BEOS():
"""True if we are on BeOS or Haiku."""
return sys.platform == 'beos5' or sys.platform == 'haiku1'
#
@ -285,16 +296,22 @@ def windows_bash_command():
# Check that bash is on path otherwise try the default directory
# used by Git for windows
wbc = 'bash'
try:
subprocess.check_call([wbc, '--version'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except (FileNotFoundError, subprocess.CalledProcessError):
gfwp = git_for_windows_path()
if gfwp:
bashcmd = os.path.join(gfwp, 'bin\\bash.exe')
if os.path.isfile(bashcmd):
wbc = bashcmd
bash_on_path = builtins.__xonsh_commands_cache__.lazy_locate_binary('bash',
ignore_alias=True)
if bash_on_path:
# Check if Bash is from the "Windows Subsystem for Linux" (WSL)
# which can't be used by xonsh foreign-shell/completer
out = subprocess.check_output([bash_on_path, '--version'],
stderr=subprocess.PIPE,
universal_newlines=True)
if 'pc-linux-gnu' in out.splitlines()[0]:
gfwp = git_for_windows_path()
if gfwp:
bashcmd = os.path.join(gfwp, 'bin\\bash.exe')
if os.path.isfile(bashcmd):
wbc = bashcmd
else:
wbc = bash_on_path
return wbc
#
@ -354,3 +371,60 @@ def PATH_DEFAULT():
else:
pd = ()
return pd
#
# libc
#
@lazyobject
def LIBC():
"""The platform dependent libc implementation."""
if ON_DARWIN:
libc = ctypes.CDLL(ctypes.util.find_library("c"))
elif ON_CYGWIN:
libc = ctypes.CDLL('cygwin1.dll')
elif ON_BSD:
try:
libc = ctypes.CDLL('libc.so')
except AttributeError:
libc = None
except OSError:
# OS X; can't use ctypes.util.find_library because that creates
# a new process on Linux, which is undesirable.
try:
libc = ctypes.CDLL('libc.dylib')
except OSError:
libc = None
elif ON_POSIX:
try:
libc = ctypes.CDLL('libc.so')
except AttributeError:
libc = None
except OSError:
# Debian and derivatives do the wrong thing because /usr/lib/libc.so
# is a GNU ld script rather than an ELF object. To get around this, we
# have to be more specific.
# We don't want to use ctypes.util.find_library because that creates a
# new process on Linux. We also don't want to try too hard because at
# this point we're already pretty sure this isn't Linux.
try:
libc = ctypes.CDLL('libc.so.6')
except OSError:
libc = None
if not hasattr(libc, 'sysinfo'):
# Not Linux.
libc = None
elif ON_WINDOWS:
if hasattr(ctypes, 'windll') and hasattr(ctypes.windll, 'kernel32'):
libc = ctypes.windll.kernel32
else:
try:
# Windows CE uses the cdecl calling convention.
libc = ctypes.CDLL('coredll.lib')
except (AttributeError, OSError):
libc = None
elif ON_BEOS:
libc = ctypes.CDLL('libroot.so')
else:
libc = None
return libc

View file

@ -760,7 +760,7 @@ class PopenThread(threading.Thread):
self.old_int_handler = None
if frame is not None:
self._disable_cbreak_stdin()
if old is not None:
if old is not None and old is not self._signal_int:
old(signal.SIGINT, frame)
#

View file

@ -0,0 +1,2 @@
# amalgamate
# amalgamate end

274
xonsh/xoreutils/uptime.py Normal file
View file

@ -0,0 +1,274 @@
"""
Provides a cross-platform way to figure out the system uptime.
Should work on damned near any operating system you can realistically expect
to be asked to write Python code for.
If this module is invoked as a stand-alone script, it will print the current
uptime in a human-readable format, or display an error message if it can't,
to standard output.
This file was forked from the uptime project: https://github.com/Cairnarvon/uptime
Copyright (c) 2012, Koen Crolla, All rights reserved.
"""
import os
import sys
import time
import ctypes
import struct
import xonsh.platform as xp
import xonsh.lazyimps as xlimps
import xonsh.lazyasd as xl
_BOOTTIME = None
def _uptime_osx():
"""Returns the uptime on mac / darwin."""
global _BOOTTIME
bt = xlimps.macutils.sysctlbyname(b"kern.boottime", return_str=False)
if len(bt) == 4:
bt = struct.unpack_from('@hh', bt)
elif len(bt) == 8:
bt = struct.unpack_from('@ii', bt)
elif len(bt) == 16:
bt = struct.unpack_from('@qq', bt)
else:
raise ValueError('length of boot time not understood: ' + repr(bt))
bt = bt[0] + bt[1]*1e-6
if bt == 0.0:
return None
_BOOTTIME = bt
return time.time() - bt
def _uptime_linux():
"""Returns uptime in seconds or None, on Linux."""
# With procfs
try:
with open('/proc/uptime', 'r') as f:
up = float(f.readline().split()[0])
return up
except (IOError, ValueError):
pass
buf = ctypes.create_string_buffer(128) # 64 suffices on 32-bit, whatever.
if xp.LIBC.sysinfo(buf) < 0:
return None
up = struct.unpack_from('@l', buf.raw)[0]
if up < 0:
up = None
return up
def _boottime_linux():
"""A way to figure out the boot time directly on Linux."""
global _BOOTTIME
try:
with open('/proc/stat', 'r') as f:
for line in f:
if line.startswith('btime'):
_BOOTTIME = float(line.split()[1])
return _BOOTTIME
except (IOError, IndexError):
return None
def _uptime_amiga():
"""Returns uptime in seconds or None, on AmigaOS."""
global _BOOTTIME
try:
_BOOTTIME = os.stat('RAM:').st_ctime
return time.time() - _BOOTTIME
except (NameError, OSError):
return None
def _uptime_beos():
"""Returns uptime in seconds on None, on BeOS/Haiku."""
if not hasattr(xp.LIBC, 'system_time'):
return None
xp.LIBC.system_time.restype = ctypes.c_int64
return xp.LIBC.system_time() / 1000000.
def _uptime_bsd():
"""Returns uptime in seconds or None, on BSD (including OS X)."""
global _BOOTTIME
if not hasattr(xp.LIBC, 'sysctlbyname'):
# Not BSD.
return None
# Determine how much space we need for the response.
sz = ctypes.c_uint(0)
xp.LIBC.sysctlbyname('kern.boottime', None, ctypes.byref(sz), None, 0)
if sz.value != struct.calcsize('@LL'):
# Unexpected, let's give up.
return None
# For real now.
buf = ctypes.create_string_buffer(sz.value)
xp.LIBC.sysctlbyname('kern.boottime', buf, ctypes.byref(sz), None, 0)
sec, usec = struct.unpack_from('@LL', buf.raw)
# OS X disagrees what that second value is.
if usec > 1000000:
usec = 0.
_BOOTTIME = sec + usec / 1000000.
up = time.time() - _BOOTTIME
if up < 0:
up = None
return up
def _uptime_minix():
"""Returns uptime in seconds or None, on MINIX."""
try:
with open('/proc/uptime', 'r') as f:
up = float(f.read())
return up
except (IOError, ValueError):
return None
def _uptime_plan9():
"""Returns uptime in seconds or None, on Plan 9."""
# Apparently Plan 9 only has Python 2.2, which I'm not prepared to
# support. Maybe some Linuxes implement /dev/time, though, someone was
# talking about it somewhere.
try:
# The time file holds one 32-bit number representing the sec-
# onds since start of epoch and three 64-bit numbers, repre-
# senting nanoseconds since start of epoch, clock ticks, and
# clock frequency.
# -- cons(3)
with open('/dev/time', 'r') as f:
s, ns, ct, cf = f.read().split()
return float(ct) / float(cf)
except (IOError, ValueError):
return None
def _uptime_solaris():
"""Returns uptime in seconds or None, on Solaris."""
global _BOOTTIME
try:
kstat = ctypes.CDLL('libkstat.so')
except (AttributeError, OSError):
return None
# kstat doesn't have uptime, but it does have boot time.
# Unfortunately, getting at it isn't perfectly straightforward.
# First, let's pretend to be kstat.h
# Constant
KSTAT_STRLEN = 31 # According to every kstat.h I could find.
# Data structures
class anon_union(ctypes.Union):
# The ``value'' union in kstat_named_t actually has a bunch more
# members, but we're only using it for boot_time, so we only need
# the padding and the one we're actually using.
_fields_ = [('c', ctypes.c_char * 16),
('time', ctypes.c_int)]
class kstat_named_t(ctypes.Structure):
_fields_ = [('name', ctypes.c_char * KSTAT_STRLEN),
('data_type', ctypes.c_char),
('value', anon_union)]
# Function signatures
kstat.kstat_open.restype = ctypes.c_void_p
kstat.kstat_lookup.restype = ctypes.c_void_p
kstat.kstat_lookup.argtypes = [ctypes.c_void_p,
ctypes.c_char_p,
ctypes.c_int,
ctypes.c_char_p]
kstat.kstat_read.restype = ctypes.c_int
kstat.kstat_read.argtypes = [ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_void_p]
kstat.kstat_data_lookup.restype = ctypes.POINTER(kstat_named_t)
kstat.kstat_data_lookup.argtypes = [ctypes.c_void_p,
ctypes.c_char_p]
# Now, let's do something useful.
# Initialise kstat control structure.
kc = kstat.kstat_open()
if not kc:
return None
# We're looking for unix:0:system_misc:boot_time.
ksp = kstat.kstat_lookup(kc, 'unix', 0, 'system_misc')
if ksp and kstat.kstat_read(kc, ksp, None) != -1:
data = kstat.kstat_data_lookup(ksp, 'boot_time')
if data:
_BOOTTIME = data.contents.value.time
# Clean-up.
kstat.kstat_close(kc)
if _BOOTTIME is not None:
return time.time() - _BOOTTIME
return None
def _uptime_syllable():
"""Returns uptime in seconds or None, on Syllable."""
global _BOOTTIME
try:
_BOOTTIME = os.stat('/dev/pty/mst/pty0').st_mtime
return time.time() - _BOOTTIME
except (NameError, OSError):
return None
def _uptime_windows():
"""
Returns uptime in seconds or None, on Windows. Warning: may return
incorrect answers after 49.7 days on versions older than Vista.
"""
if hasattr(xp.LIBC, 'GetTickCount64'):
# Vista/Server 2008 or later.
xp.LIBC.GetTickCount64.restype = ctypes.c_uint64
return xp.LIBC.GetTickCount64() / 1000.
if hasattr(xp.LIBC, 'GetTickCount'):
# WinCE and Win2k or later; gives wrong answers after 49.7 days.
xp.LIBC.GetTickCount.restype = ctypes.c_uint32
return xp.LIBC.GetTickCount() / 1000.
return None
@xl.lazyobject
def _UPTIME_FUNCS():
return {'amiga': _uptime_amiga,
'aros12': _uptime_amiga,
'beos5': _uptime_beos,
'cygwin': _uptime_linux,
'darwin': _uptime_osx,
'haiku1': _uptime_beos,
'linux': _uptime_linux,
'linux-armv71': _uptime_linux,
'linux2': _uptime_linux,
'minix3': _uptime_minix,
'sunos5': _uptime_solaris,
'syllable': _uptime_syllable,
'win32': _uptime_windows,
'wince': _uptime_windows}
def uptime():
"""Returns uptime in seconds if even remotely possible, or None if not."""
if _BOOTTIME is not None:
return time.time() - _BOOTTIME
up = _UPTIME_FUNCS.get(sys.platform, _uptime_bsd)()
if up is None:
up = (_uptime_bsd() or _uptime_plan9() or _uptime_linux() or
_uptime_windows() or _uptime_solaris() or _uptime_beos() or
_uptime_amiga() or _uptime_syllable() or _uptime_osx())
return up
def boottime():
"""Returns boot time if remotely possible, or None if not."""
global _BOOTTIME
if _BOOTTIME is None:
up = uptime()
if up is None:
return None
_BOOTTIME = time.time() - up
return _BOOTTIME