Read stop signals from the process and update the process state. (#5361)

Reading stop signals from the process and update the process state.

### The issue

Technically. In a couple of places that critical for processing signals
we have `os.waitpid()`. The function behavior is pretty unobvious and
one of things is processing return code after catching the signal. We
had no good signal processing around this and this PR fixes this. See
also `proc_untraced_waitpid` function description.

From user perspective. For example we have process that is waiting for
user input from terminal e.g. `python -c "input()"` or `fzf`. If this
process will be in captured pipeline e.g. `!(echo 1 | fzf | head)` it
will be suspended by OS and the pipeline will be in the endless loop
with future crashing and corrupting std at the end. This PR fixes this.

### The solution

Technically. The key function is `proc_untraced_waitpid` - it catches
the stop signals and updates the process state.

From user perspective. First of all we expect that users will use
captured object `!()` only for capturable processes. Because of it our
goal here is to just make the behavior in this case stable.
In this PR we detect that process in the pipeline is suspended and we
need to finish the command pipeline carefully:
* Show the message about suspended process.
* Keep suspended process in `jobs`. The same behavior we can see in
bash. This is good because we don't know what process suspended and why.
May be experienced user will want to continue it manually.
* Finish the CommandPipeline with returncode=None and suspended=True.

### Before

```xsh
!(fzf) # or !(python -c "input()")
# Hanging / Exceptions / OSError / No way to end the command.
# After exception:
$(echo 1)
# OSError / IO error
```

### After

```xsh
!(fzf) # or `!(ls | fzf | head)` or `!(python -c "input()")`
# Process ['fzf'] with pid 60000 suspended with signal 22 SIGTTOU and stay in `jobs`.
# This happends when process start waiting for input but there is no terminal attached in captured mode.
# CommandPipeline(returncode=None, suspended=True, ...)

$(echo 1)
# Success.
```
Closes #4752 #4577

### Notes

* There is pretty edge case situation when the process was terminated so
fast that we can't catch pid alive and check signal
([src](67d672783d/xonsh/jobs.py (L71-L80))).
I leave it as is for now.

### Mentions

#2159

## For community
⬇️ **Please click the 👍 reaction instead of leaving a `+1` or 👍
comment**

---------

Co-authored-by: a <1@1.1>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Gil Forsyth <gforsyth@users.noreply.github.com>
This commit is contained in:
Andy Kipp 2024-05-22 17:45:39 +02:00 committed by GitHub
parent 635d7837c5
commit 0f25a5a348
Failed to generate hash of commit
8 changed files with 224 additions and 50 deletions

View file

@ -0,0 +1,23 @@
**Added:**
* Reading stop signals from the process and update the process state (#5361).
**Changed:**
* <news item>
**Deprecated:**
* <news item>
**Removed:**
* <news item>
**Fixed:**
* <news item>
**Security:**
* <news item>

View file

@ -19,6 +19,14 @@ from xonsh.pytest.tools import skip_if_on_windows
from xonsh.tools import XonshError
def cmd_sig(sig):
return [
"python",
"-c",
f"import os, signal; os.kill(os.getpid(), signal.{sig})",
]
@skip_if_on_windows
def test_cmds_to_specs_thread_subproc(xession):
env = xession.env
@ -141,15 +149,37 @@ def test_capture_always(
@skip_if_on_windows
@pytest.mark.flaky(reruns=3, reruns_delay=1)
def test_interrupted_process_returncode(xonsh_session):
@pytest.mark.parametrize("captured", ["stdout", "object"])
@pytest.mark.parametrize("interactive", [True, False])
def test_interrupted_process_returncode(xonsh_session, captured, interactive):
xonsh_session.env["XONSH_INTERACTIVE"] = interactive
xonsh_session.env["RAISE_SUBPROC_ERROR"] = False
cmd = [["python", "-c", "import os, signal; os.kill(os.getpid(), signal.SIGINT)"]]
cmd = [cmd_sig("SIGINT")]
specs = cmds_to_specs(cmd, captured="stdout")
(p := _run_command_pipeline(specs, cmd)).end()
assert p.proc.returncode == -signal.SIGINT
@skip_if_on_windows
@pytest.mark.parametrize(
"suspended_pipeline",
[
[cmd_sig("SIGTTIN")],
[["echo", "1"], "|", cmd_sig("SIGTTIN")],
[["echo", "1"], "|", cmd_sig("SIGTTIN"), "|", ["head"]],
],
)
def test_specs_with_suspended_captured_process_pipeline(
xonsh_session, suspended_pipeline
):
xonsh_session.env["XONSH_INTERACTIVE"] = True
specs = cmds_to_specs(suspended_pipeline, captured="object")
p = _run_command_pipeline(specs, suspended_pipeline)
p.proc.send_signal(signal.SIGCONT)
p.end()
assert p.suspended
@skip_if_on_windows
@pytest.mark.parametrize(
"cmds, exp_stream_lines, exp_list_lines",
@ -162,6 +192,7 @@ def test_interrupted_process_returncode(xonsh_session):
([["echo", "-n", "1\n2 3"]], "1\n2 3", ["1", "2 3"]),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=1)
def test_subproc_output_format(cmds, exp_stream_lines, exp_list_lines, xonsh_session):
xonsh_session.env["XONSH_SUBPROC_OUTPUT_FORMAT"] = "stream_lines"
output = run_subproc(cmds, "stdout")

View file

@ -1239,17 +1239,26 @@ def test_catching_system_exit():
out, err, ret = run_xonsh(
cmd=None, stdin_cmd=stdin_cmd, interactive=True, single_command=False, timeout=3
)
if ON_WINDOWS:
assert ret == 1
else:
assert ret == 2
assert ret > 0
@skip_if_on_windows
@pytest.mark.flaky(reruns=3, reruns_delay=1)
def test_catching_exit_signal():
stdin_cmd = "kill -SIGHUP @(__import__('os').getpid())\n"
stdin_cmd = "sleep 0.2; kill -SIGHUP @(__import__('os').getpid())\n"
out, err, ret = run_xonsh(
cmd=None, stdin_cmd=stdin_cmd, interactive=True, single_command=False, timeout=3
)
assert ret > 0
@skip_if_on_windows
def test_suspended_captured_process_pipeline():
"""See also test_specs.py:test_specs_with_suspended_captured_process_pipeline"""
stdin_cmd = "!(python -c 'import os, signal, time; time.sleep(0.2); os.kill(os.getpid(), signal.SIGTTIN)')\n"
out, err, ret = run_xonsh(
cmd=None, stdin_cmd=stdin_cmd, interactive=True, single_command=False, timeout=5
)
match = ".*suspended=True.*"
assert re.match(
match, out, re.MULTILINE | re.DOTALL
), f"\nFailed:\n```\n{stdin_cmd.strip()}\n```,\nresult: {out!r}\nexpected: {match!r}."

View file

@ -16,7 +16,7 @@ from xonsh.cli_utils import Annotated, Arg, ArgParserAlias
from xonsh.completers.tools import RichCompletion
from xonsh.lazyasd import LazyObject
from xonsh.platform import FD_STDERR, LIBC, ON_CYGWIN, ON_DARWIN, ON_MSYS, ON_WINDOWS
from xonsh.tools import on_main_thread, unthreadable
from xonsh.tools import get_signal_name, on_main_thread, unthreadable
# Track time stamp of last exit command, so that two consecutive attempts to
# exit can kill all jobs and exit.
@ -39,18 +39,84 @@ _jobs_thread_local = threading.local()
_tasks_main: collections.deque[int] = collections.deque()
def waitpid(pid, opt):
def proc_untraced_waitpid(proc, hang, task=None, raise_child_process_error=False):
"""
Transparent wrapper on `os.waitpid` to make notes about undocumented subprocess behavior.
Read a stop signals from the process and update the process state.
Return code
===========
Basically ``p = subprocess.Popen()`` populates ``p.returncode`` after ``p.wait()``, ``p.poll()``
or ``p.communicate()`` (https://docs.python.org/3/library/os.html#os.waitpid).
But if you're using `os.waitpid()` BEFORE these functions you're capturing return code
from a signal subsystem and ``p.returncode`` will be ``0``.
After ``os.waitid`` call you need to set return code manually
``p.returncode = -os.WTERMSIG(status)`` like in Popen.
After ``os.waitid`` call you need to set return code and process signal manually.
See also ``xonsh.tools.describe_waitpid_status()``.
Signals
=======
The command that is waiting for input can be suspended by OS in case there is no terminal attached
because without terminal command will never end. Read more about SIGTTOU and SIGTTIN signals:
* https://www.linusakesson.net/programming/tty/
* http://curiousthing.org/sigttin-sigttou-deep-dive-linux
* https://www.gnu.org/software/libc/manual/html_node/Job-Control-Signals.html
"""
return os.waitpid(pid, opt)
info = {"backgrounded": False, "signal": None, "signal_name": None}
if ON_WINDOWS:
return info
if proc is not None and getattr(proc, "pid", None) is None:
"""
When the process stopped before os.waitpid it has no pid.
Note that in this case there is high probability
that we will have return code 0 instead of real return code.
"""
if raise_child_process_error:
raise ChildProcessError("Process Identifier (PID) not found.")
else:
return info
try:
"""
The WUNTRACED flag indicates that the caller wishes to wait for stopped or terminated
child processes, but doesn't want to return information about them. A stopped process is one
that has been suspended and is waiting to be resumed or terminated.
"""
opt = os.WUNTRACED if hang else (os.WUNTRACED | os.WNOHANG)
wpid, wcode = os.waitpid(proc.pid, opt)
except ChildProcessError:
wpid, wcode = 0, 0
if raise_child_process_error:
raise
if wpid == 0:
# Process has no changes in state.
pass
elif os.WIFSTOPPED(wcode):
if task is not None:
task["status"] = "stopped"
info["backgrounded"] = True
proc.signal = (os.WSTOPSIG(wcode), os.WCOREDUMP(wcode))
info["signal"] = os.WSTOPSIG(wcode)
proc.suspended = True
elif os.WIFSIGNALED(wcode):
print() # get a newline because ^C will have been printed
proc.signal = (os.WTERMSIG(wcode), os.WCOREDUMP(wcode))
proc.returncode = -os.WTERMSIG(wcode) # Popen default.
info["signal"] = os.WTERMSIG(wcode)
else:
proc.returncode = os.WEXITSTATUS(wcode)
proc.signal = None
info["signal"] = None
info["signal_name"] = f'{info["signal"]} {get_signal_name(info["signal"])}'.strip()
return info
@contextlib.contextmanager
@ -144,7 +210,7 @@ if ON_WINDOWS:
def _kill(job):
subprocess.check_output(
["taskkill", "/F", "/T", "/PID", str(job["obj"].pid)],
["taskkill", "/F", "/T", "/PID", str(job["proc"].pid)],
stderr=subprocess.STDOUT,
)
@ -165,11 +231,11 @@ if ON_WINDOWS:
# Return when there are no foreground active task
if active_task is None:
return last_task
obj = active_task["obj"]
proc = active_task["proc"]
_continue(active_task)
while obj.returncode is None:
while proc.returncode is None:
try:
obj.wait(0.01)
proc.wait(0.01)
except subprocess.TimeoutExpired:
pass
except KeyboardInterrupt:
@ -278,31 +344,23 @@ else:
# Return when there are no foreground active task
if active_task is None:
return last_task
thread = active_task["obj"]
backgrounded = False
proc = active_task["proc"]
info = {"backgrounded": False}
try:
if thread.pid is None:
# When the process stopped before os.waitpid it has no pid.
raise ChildProcessError("The process PID not found.")
_, wcode = waitpid(thread.pid, os.WUNTRACED)
except ChildProcessError as e: # No child processes
info = proc_untraced_waitpid(
proc, hang=True, task=active_task, raise_child_process_error=True
)
except ChildProcessError as e:
if return_error:
return e
else:
return _safe_wait_for_active_job(
last_task=active_task, backgrounded=backgrounded
last_task=active_task, backgrounded=info["backgrounded"]
)
return wait_for_active_job(
last_task=active_task, backgrounded=info["backgrounded"]
)
if os.WIFSTOPPED(wcode):
active_task["status"] = "stopped"
backgrounded = True
elif os.WIFSIGNALED(wcode):
print() # get a newline because ^C will have been printed
thread.signal = (os.WTERMSIG(wcode), os.WCOREDUMP(wcode))
thread.returncode = -os.WTERMSIG(wcode) # Default Popen
else:
thread.returncode = os.WEXITSTATUS(wcode)
thread.signal = None
return wait_for_active_job(last_task=active_task, backgrounded=backgrounded)
def _safe_wait_for_active_job(last_task=None, backgrounded=False):
@ -344,8 +402,8 @@ def _clear_dead_jobs():
to_remove = set()
tasks = get_tasks()
for tid in tasks:
obj = get_task(tid)["obj"]
if obj is None or obj.poll() is not None:
proc = get_task(tid)["proc"]
if proc is None or proc.poll() is not None:
to_remove.add(tid)
for job in to_remove:
tasks.remove(job)
@ -364,13 +422,13 @@ def format_job_string(num: int, format="dict") -> str:
"cmd": " ".join(
[" ".join(i) if isinstance(i, list) else i for i in job["cmds"]]
),
"pid": int(job["pids"][-1]) if job["pids"] else None,
"pids": job["pids"] if "pids" in job else None,
}
if format == "posix":
r["pos"] = "+" if tasks[0] == num else "-" if tasks[1] == num else " "
r["bg"] = " &" if job["bg"] else ""
r["pid"] = f"({r['pid']})" if r["pid"] else ""
r["pid"] = f"({','.join(str(pid) for pid in r['pids'])})" if r["pids"] else ""
return "[{num}]{pos} {status}: {cmd}{bg} {pid}".format(**r)
else:
return repr(r)
@ -396,13 +454,21 @@ def add_job(info):
"""Add a new job to the jobs dictionary."""
num = get_next_job_number()
info["started"] = time.time()
info["status"] = "running"
info["status"] = info["status"] if "status" in info else "running"
get_tasks().appendleft(num)
get_jobs()[num] = info
if info["bg"] and XSH.env.get("XONSH_INTERACTIVE"):
print_one_job(num)
def update_job_attr(pid, name, value):
"""Update job attribute."""
jobs = get_jobs()
for num, job in get_jobs().items():
if "pids" in job and pid in job["pids"]:
jobs[num][name] = value
def clean_jobs():
"""Clean up jobs for exiting shell

View file

@ -94,6 +94,7 @@ class CommandPipeline:
attrnames = (
"returncode",
"suspended",
"pid",
"args",
"alias",
@ -153,6 +154,7 @@ class CommandPipeline:
self._raw_output = self._raw_error = b""
self._stderr_prefix = self._stderr_postfix = None
self.term_pgid = None
self.suspended = None
background = self.spec.background
pipeline_group = None
@ -186,11 +188,14 @@ class CommandPipeline:
self.proc = self.procs[-1]
def __repr__(self):
attrs = self.attrnames + (
self.attrnames_ext if XSH.env.get("XONSH_DEBUG", False) else ()
)
debug = XSH.env.get("XONSH_DEBUG", False)
attrs = self.attrnames + (self.attrnames_ext if debug else ())
s = self.__class__.__name__ + "(\n "
s += ",\n ".join(a + "=" + repr(getattr(self, a)) for a in attrs)
s += ",\n ".join(
a + "=" + repr(getattr(self, a))
for a in attrs
if debug or getattr(self, a) is not None
)
s += "\n)"
return s
@ -288,7 +293,9 @@ class CommandPipeline:
prev_end_time = None
i = j = cnt = 1
while proc.poll() is None:
if getattr(proc, "suspended", False):
if getattr(proc, "suspended", False) or self._procs_suspended() is not None:
self.suspended = True
xj.update_job_attr(proc.pid, "status", "suspended")
return
elif getattr(proc, "in_alt_mode", False):
time.sleep(0.1) # probably not leaving any time soon
@ -303,6 +310,7 @@ class CommandPipeline:
self._close_prev_procs()
proc.prevs_are_closed = True
break
stdout_lines = safe_readlines(stdout, 1024)
i = len(stdout_lines)
if i != 0:
@ -513,6 +521,20 @@ class CommandPipeline:
def _safe_close(self, handle):
safe_fdclose(handle, cache=self._closed_handle_cache)
def _procs_suspended(self):
"""Check procs and return suspended proc."""
for proc in self.procs:
info = xj.proc_untraced_waitpid(proc, hang=False)
if getattr(proc, "suspended", False):
proc = getattr(proc, "proc", proc)
procname = f"{getattr(proc, 'args', '')} with pid {proc.pid}".strip()
print(
f"Process {procname} was suspended with signal {info['signal_name']} and placed in `jobs`.\n"
f"This happens when a process starts waiting for input but there is no terminal attached in captured mode.",
file=sys.stderr,
)
return proc
def _prev_procs_done(self):
"""Boolean for if all previous processes have completed. If there
is only a single process in the pipeline, this returns False.

View file

@ -14,6 +14,7 @@ import xonsh.lazyimps as xli
import xonsh.platform as xp
import xonsh.tools as xt
from xonsh.built_ins import XSH
from xonsh.jobs import proc_untraced_waitpid
from xonsh.procs.readers import (
BufferedFDParallelReader,
NonBlockingFDReader,
@ -168,6 +169,16 @@ class PopenThread(threading.Thread):
# loop over reads while process is running.
i = j = cnt = 1
while proc.poll() is None:
info = proc_untraced_waitpid(proc, hang=False)
if getattr(proc, "suspended", False):
self.suspended = True
if XSH.env.get("XONSH_DEBUG", False):
procname = f"{getattr(proc, 'args', '')} {proc.pid}".strip()
print(
f"Process {procname} suspended with signal {info['signal_name']}.",
file=sys.stderr,
)
# this is here for CPU performance reasons.
if i + j == 0:
cnt = min(cnt + 1, 1000)

View file

@ -955,7 +955,8 @@ def _run_command_pipeline(specs, cmds):
{
"cmds": cmds,
"pids": [i.pid for i in cp.procs],
"obj": proc,
"status": "suspended" if cp.suspended else "running",
"proc": proc,
"bg": background,
"pipeline": cp,
"pgrp": cp.term_pgid,

View file

@ -31,6 +31,7 @@ import os
import pathlib
import re
import shlex
import signal
import string
import subprocess
import sys
@ -2839,6 +2840,14 @@ def to_repr_pretty_(inst, p, cycle):
p.pretty(dict(inst))
def get_signal_name(signum):
"""Return a signal name by the signal number."""
for name in dir(signal):
if name.startswith("SIG") and getattr(signal, name) == signum:
return name
return ""
def describe_waitpid_status(status):
"""Describes ``pid, status = os.waitpid(pid, opt)`` status."""
funcs = [
@ -2848,9 +2857,11 @@ def describe_waitpid_status(status):
os.WTERMSIG,
os.WIFSTOPPED,
os.WSTOPSIG,
os.WCOREDUMP,
]
for f in funcs:
print(f.__name__, "-", f(status), "-", f.__doc__)
s = f(status)
print(f.__name__, "-", s, get_signal_name(s), "-", f.__doc__)
def unquote(s: str, chars="'\""):