Implement history pull for JSON history backend (#5788)

* add --session-id option to `history pull` command

* implement `history pull` for JSON history

* add news item for `history pull` updates

* add documentation for history pull `--session-id` option

* add explanatory comment for sleep in test

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update json-history-pull.rst

* fix failing tests

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Andy Kipp <anki-code@users.noreply.github.com>
This commit is contained in:
jfmontanaro 2025-02-03 11:34:19 -05:00 committed by GitHub
parent 77ecefff34
commit a33ccdf636
Failed to generate hash of commit
7 changed files with 214 additions and 24 deletions

View file

@ -262,6 +262,10 @@ Tries to pull the history from parallel sessions and add to the current session.
For example if there are two parallel terminal windows the run of ``history pull``
command from the second terminal window will get the commands from the first terminal.
The optional `--session-id` allows you to specify that history should only be pulled
from a specific other session. Most useful when using the JSON history backend, as
the overhead of an unfiltered `pull` can be significantly higher.
``clear`` action
================
Deletes the history from the current session up until this point. Later commands

View file

@ -0,0 +1,24 @@
**Added:**
* history: Added and documented `--session-id` parameter for `history pull` command.
* history-json: Implemented `history pull` for JSON history backend.
**Changed:**
* <news item>
**Deprecated:**
* <news item>
**Removed:**
* <news item>
**Fixed:**
* history: Prevented `history pull` command from adding consecutive duplicates to propmter history.
**Security:**
* <news item>

View file

@ -611,3 +611,44 @@ def test_hist_on_cmd(hist, xession, capsys, tmpdir):
hist.append({"inp": cmd, "rtn": 0, "ts": (ts + 1, ts + 1.5)})
assert len(xession.history) == 6
@pytest.mark.parametrize(
"src_sessionid", [None, "e2265764-041c-4c57-acba-49d4e4f676e5"]
)
def test_hist_pull(src_sessionid, ptk_shell, tmpdir, xonsh_session, monkeypatch):
"""Test that `pull` method correctly loads history entries
added to the database by other sessions."""
xonsh_session.env["XONSH_DATA_DIR"] = str(tmpdir)
before = time.time()
# simulate commands being run in other sessions before this session starts
hist_a = JsonHistory(sessionid=src_sessionid, gc=False)
hist_a.append({"inp": "cmd hist_a before", "rtn": 0, "ts": [before, before]})
hist_b = JsonHistory(gc=False)
hist_b.append({"inp": "cmd hist_b before", "rtn": 0, "ts": [before, before]})
hist_main = JsonHistory(gc=False)
# simulate commands being run in other sessions after this session starts
after = time.time() + 1
hist_a.append({"inp": "cmd hist_a after", "rtn": 0, "ts": [after, after]})
hist_b.append({"inp": "cmd hist_b after", "rtn": 0, "ts": [after + 1, after + 1]})
# give the filesystem long enough that it will update the mtime
time.sleep(0.01)
# at_exit ensures that we run the flush synchronously instead of in a background thread
hist_a.flush(at_exit=True)
hist_b.flush(at_exit=True)
# pull only works with PTK shell
monkeypatch.setattr(xonsh_session.shell, "shell", ptk_shell[2])
hist_main.pull(src_sessionid=src_sessionid)
hist_strings = ptk_shell[2].prompter.history.get_strings()
if src_sessionid is None:
# ensure that only commands from after the pulling session started get pulled in
assert hist_strings == ["cmd hist_a after", "cmd hist_b after"]
else:
# and that the commands are correctly filtered by session id if applicable
assert hist_strings == ["cmd hist_a after"]

View file

@ -5,6 +5,7 @@ import itertools
import os
import shlex
import sys
import time
import pytest
@ -350,3 +351,37 @@ def test_hist_store_cwd(hist, xession):
assert cmds[1]["cwd"] is None
_clean_up(hist)
@pytest.mark.parametrize(
"src_sessionid", [None, "e2265764-041c-4c57-acba-49d4e4f676e5"]
)
def test_hist_pull(src_sessionid, tmpdir, ptk_shell, monkeypatch):
"""Test that `pull` method correctly loads history entries
added to the database by other sessions."""
db_file = tmpdir / "xonsh-HISTORY-TEST-PULL.sqlite"
before = time.time()
# simulate commands being run in other sessions before this session starts
hist_a = SqliteHistory(filename=db_file, gc=False, sessionid=src_sessionid)
hist_a.append({"inp": "cmd hist_a before", "rtn": 0, "ts": [before, before]})
hist_b = SqliteHistory(filename=db_file, gc=False)
hist_b.append({"inp": "cmd hist_b after", "rtn": 0, "ts": [before, before]})
hist_main = SqliteHistory(filename=db_file, gc=False)
# simulate commands being run in other sessions after this session starts
after = time.time() + 1
hist_a.append({"inp": "cmd hist_a after", "rtn": 0, "ts": [after, after]})
hist_b.append({"inp": "cmd hist_b after", "rtn": 0, "ts": [after + 1, after + 1]})
# pull only works with PTK shell
monkeypatch.setattr("xonsh.built_ins.XSH.shell.shell", ptk_shell[2])
hist_main.pull(src_sessionid=src_sessionid)
hist_strings = ptk_shell[2].prompter.history.get_strings()
if src_sessionid is None:
# ensure that only commands from after the pulling session started get pulled in
assert hist_strings == ["cmd hist_a after", "cmd hist_b after"]
else:
# and that the commands are correctly filtered by session id if applicable
assert hist_strings == ["cmd hist_a after"]

View file

@ -103,9 +103,28 @@ def _xhj_get_data_dir():
return dir
def _xhj_get_history_files(sort=True, newest_first=False):
def _xhj_get_data_dir_files(data_dir, include_mtime=False):
"""Iterate over all the history files in a data dir,
optionally including the `mtime` for each file.
"""
# list of (file, mtime) pairs
data_dir = xt.expanduser_abs_path(data_dir)
try:
for file in os.listdir(data_dir):
if file.startswith("xonsh-") and file.endswith(".json"):
fullpath = os.path.join(data_dir, file)
mtime = os.path.getmtime(fullpath) if include_mtime else None
yield fullpath, mtime
except OSError:
if XSH.env.get("XONSH_DEBUG"):
xt.print_exception(
f"Could not collect xonsh history json files from {data_dir}"
)
def _xhj_get_history_files(sort=True, newest_first=False, modified_since=None):
"""Find and return the history files. Optionally sort files by
modify time.
modify time, or include only those modified after a certain time.
"""
data_dirs = [
_xhj_get_data_dir(),
@ -114,20 +133,14 @@ def _xhj_get_history_files(sort=True, newest_first=False):
files = []
for data_dir in data_dirs:
data_dir = xt.expanduser_abs_path(data_dir)
try:
files += [
os.path.join(data_dir, f)
for f in os.listdir(data_dir)
if f.startswith("xonsh-") and f.endswith(".json")
]
except OSError:
if XSH.env.get("XONSH_DEBUG"):
xt.print_exception(
f"Could not collect xonsh history json files from {data_dir}"
)
include_mtime = sort or (modified_since is not None)
for file, mtime in _xhj_get_data_dir_files(data_dir, include_mtime):
if modified_since is None or mtime > modified_since:
files.append((file, mtime))
if sort:
files.sort(key=lambda x: os.path.getmtime(x), reverse=newest_first)
files.sort(key=lambda x: x[1], reverse=newest_first)
# drop the mtimes
files = [f[0] for f in files]
custom_history_file = XSH.env.get("XONSH_HISTORY_FILE", None)
if custom_history_file:
@ -137,6 +150,43 @@ def _xhj_get_history_files(sort=True, newest_first=False):
return files
def _xhj_pull_items(last_pull_time, src_sessionid=None):
"""List all history items after a given start time.
Optionally restrict to just items from a single session.
"""
if src_sessionid:
filename = os.path.join(_xhj_get_data_dir(), f"xonsh-{src_sessionid}.json")
src_paths = [filename]
else:
src_paths = _xhj_get_history_files(sort=True, modified_since=last_pull_time)
# src_paths may include the current session's file, so skip it to avoid duplicates
custom_history_file = XSH.env.get("XONSH_HISTORY_FILE") or ""
current_session_path = xt.expanduser_abs_path(custom_history_file)
items = []
for path in src_paths:
if path == current_session_path:
continue
try:
lj = xlj.LazyJSON(open(path))
except (JSONDecodeError, ValueError):
continue
cmds = lj["cmds"]
if len(cmds) == 0:
continue
# the cutoff point is likely to be very near the end of the session, so iterate backward
for i in range(len(cmds) - 1, -1, -1):
item = cmds[i].load()
if item["ts"][1] > last_pull_time:
items.append(item)
else:
break
items.sort(key=lambda i: i["ts"][1])
return items
class JsonHistoryGC(threading.Thread):
"""Shell history garbage collection."""
@ -444,6 +494,7 @@ class JsonHistory(History):
self.last_cmd_out = None
self.last_cmd_rtn = None
self.gc = JsonHistoryGC() if gc else None
self.last_pull_time = time.time()
# command fields that are known
self.tss = JsonCommandField("ts", self)
self.inps = JsonCommandField("inp", self)
@ -585,6 +636,24 @@ class JsonHistory(History):
data["gc_last_size"] = f"{(self.hist_size, self.hist_units)}"
return data
def pull(self, show_commands=False, src_sessionid=None):
if not hasattr(XSH.shell.shell, "prompter"):
print(f"Shell type {XSH.shell.shell} is not supported.")
return 0
cnt = 0
prev = None
for item in _xhj_pull_items(self.last_pull_time, src_sessionid):
line = item["inp"].rstrip()
if show_commands:
print(line)
if line != prev:
XSH.shell.shell.prompter.history.append_string(line)
cnt += 1
prev = line
self.last_pull_time = time.time()
return cnt
def run_gc(self, size=None, blocking=True, force=False, **_):
self.gc = JsonHistoryGC(wait_for_shell=False, size=size, force=force)
if blocking:

View file

@ -320,13 +320,16 @@ class HistoryAlias(xcli.ArgParserAlias):
print(str(hist.sessionid), file=_stdout)
@staticmethod
def pull(show_commands=False, _stdout=None):
def pull(show_commands=False, session_id=None, _stdout=None):
"""Pull history from other parallel sessions.
Parameters
----------
show_commands: -c, --show-commands
show pulled commands
session_id: -s, --session-id
pull from specified session only
"""
hist = XSH.history
@ -338,7 +341,7 @@ class HistoryAlias(xcli.ArgParserAlias):
file=_stdout,
)
lines_added = hist.pull(show_commands)
lines_added = hist.pull(show_commands, session_id)
if lines_added:
print(f"Added {lines_added} records!", file=_stdout)
else:

View file

@ -204,9 +204,20 @@ def xh_sqlite_delete_items(size_to_keep, filename=None):
return _xh_sqlite_delete_records(c, size_to_keep)
def xh_sqlite_pull(filename, last_pull_time, current_sessionid):
sql = "SELECT inp FROM xonsh_history WHERE tsb > ? AND sessionid != ? ORDER BY tsb"
params = [last_pull_time, current_sessionid]
def xh_sqlite_pull(filename, last_pull_time, current_sessionid, src_sessionid=None):
# ensure we don't duplicate history entries if some crazy person passes the current session
if src_sessionid == current_sessionid:
return []
if src_sessionid:
sql = (
"SELECT inp FROM xonsh_history WHERE tsb > ? AND sessionid = ? ORDER BY tsb"
)
params = [last_pull_time, src_sessionid]
else:
sql = "SELECT inp FROM xonsh_history WHERE tsb > ? AND sessionid != ? ORDER BY tsb"
params = [last_pull_time, current_sessionid]
with _xh_sqlite_get_conn(filename=filename) as conn:
c = conn.cursor()
c.execute(sql, tuple(params))
@ -366,19 +377,22 @@ class SqliteHistory(History):
data["gc options"] = envs.get("XONSH_HISTORY_SIZE")
return data
def pull(self, show_commands=False):
def pull(self, show_commands=False, src_sessionid=None):
if not hasattr(XSH.shell.shell, "prompter"):
print(f"Shell type {XSH.shell.shell} is not supported.")
return 0
cnt = 0
prev = None
for r in xh_sqlite_pull(
self.filename, self.last_pull_time, str(self.sessionid)
self.filename, self.last_pull_time, str(self.sessionid), src_sessionid
):
if show_commands:
print(r[0])
XSH.shell.shell.prompter.history.append_string(r[0])
cnt += 1
if r[0] != prev:
XSH.shell.shell.prompter.history.append_string(r[0])
cnt += 1
prev = r[0]
self.last_pull_time = time.time()
return cnt