From 0f25a5a348e5569982d2b579a1bfa34d68df2418 Mon Sep 17 00:00:00 2001 From: Andy Kipp Date: Wed, 22 May 2024 17:45:39 +0200 Subject: [PATCH] Read stop signals from the process and update the process state. (#5361) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reading stop signals from the process and update the process state. ### The issue Technically. In a couple of places that critical for processing signals we have `os.waitpid()`. The function behavior is pretty unobvious and one of things is processing return code after catching the signal. We had no good signal processing around this and this PR fixes this. See also `proc_untraced_waitpid` function description. From user perspective. For example we have process that is waiting for user input from terminal e.g. `python -c "input()"` or `fzf`. If this process will be in captured pipeline e.g. `!(echo 1 | fzf | head)` it will be suspended by OS and the pipeline will be in the endless loop with future crashing and corrupting std at the end. This PR fixes this. ### The solution Technically. The key function is `proc_untraced_waitpid` - it catches the stop signals and updates the process state. From user perspective. First of all we expect that users will use captured object `!()` only for capturable processes. Because of it our goal here is to just make the behavior in this case stable. In this PR we detect that process in the pipeline is suspended and we need to finish the command pipeline carefully: * Show the message about suspended process. * Keep suspended process in `jobs`. The same behavior we can see in bash. This is good because we don't know what process suspended and why. May be experienced user will want to continue it manually. * Finish the CommandPipeline with returncode=None and suspended=True. ### Before ```xsh !(fzf) # or !(python -c "input()") # Hanging / Exceptions / OSError / No way to end the command. # After exception: $(echo 1) # OSError / IO error ``` ### After ```xsh !(fzf) # or `!(ls | fzf | head)` or `!(python -c "input()")` # Process ['fzf'] with pid 60000 suspended with signal 22 SIGTTOU and stay in `jobs`. # This happends when process start waiting for input but there is no terminal attached in captured mode. # CommandPipeline(returncode=None, suspended=True, ...) $(echo 1) # Success. ``` Closes #4752 #4577 ### Notes * There is pretty edge case situation when the process was terminated so fast that we can't catch pid alive and check signal ([src](https://github.com/xonsh/xonsh/blob/67d672783db6397bdec7ae44a9d9727b1e20a772/xonsh/jobs.py#L71-L80)). I leave it as is for now. ### Mentions #2159 ## For community ⬇️ **Please click the 👍 reaction instead of leaving a `+1` or 👍 comment** --------- Co-authored-by: a <1@1.1> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Gil Forsyth --- news/fix_interactive_suspended_subproc.rst | 23 ++++ tests/procs/test_specs.py | 37 +++++- tests/test_integrations.py | 21 +++- xonsh/jobs.py | 134 +++++++++++++++------ xonsh/procs/pipelines.py | 32 ++++- xonsh/procs/posix.py | 11 ++ xonsh/procs/specs.py | 3 +- xonsh/tools.py | 13 +- 8 files changed, 224 insertions(+), 50 deletions(-) create mode 100644 news/fix_interactive_suspended_subproc.rst diff --git a/news/fix_interactive_suspended_subproc.rst b/news/fix_interactive_suspended_subproc.rst new file mode 100644 index 000000000..a8eaea263 --- /dev/null +++ b/news/fix_interactive_suspended_subproc.rst @@ -0,0 +1,23 @@ +**Added:** + +* Reading stop signals from the process and update the process state (#5361). + +**Changed:** + +* + +**Deprecated:** + +* + +**Removed:** + +* + +**Fixed:** + +* + +**Security:** + +* diff --git a/tests/procs/test_specs.py b/tests/procs/test_specs.py index 7721a20c8..7274f28fc 100644 --- a/tests/procs/test_specs.py +++ b/tests/procs/test_specs.py @@ -19,6 +19,14 @@ from xonsh.pytest.tools import skip_if_on_windows from xonsh.tools import XonshError +def cmd_sig(sig): + return [ + "python", + "-c", + f"import os, signal; os.kill(os.getpid(), signal.{sig})", + ] + + @skip_if_on_windows def test_cmds_to_specs_thread_subproc(xession): env = xession.env @@ -141,15 +149,37 @@ def test_capture_always( @skip_if_on_windows -@pytest.mark.flaky(reruns=3, reruns_delay=1) -def test_interrupted_process_returncode(xonsh_session): +@pytest.mark.parametrize("captured", ["stdout", "object"]) +@pytest.mark.parametrize("interactive", [True, False]) +def test_interrupted_process_returncode(xonsh_session, captured, interactive): + xonsh_session.env["XONSH_INTERACTIVE"] = interactive xonsh_session.env["RAISE_SUBPROC_ERROR"] = False - cmd = [["python", "-c", "import os, signal; os.kill(os.getpid(), signal.SIGINT)"]] + cmd = [cmd_sig("SIGINT")] specs = cmds_to_specs(cmd, captured="stdout") (p := _run_command_pipeline(specs, cmd)).end() assert p.proc.returncode == -signal.SIGINT +@skip_if_on_windows +@pytest.mark.parametrize( + "suspended_pipeline", + [ + [cmd_sig("SIGTTIN")], + [["echo", "1"], "|", cmd_sig("SIGTTIN")], + [["echo", "1"], "|", cmd_sig("SIGTTIN"), "|", ["head"]], + ], +) +def test_specs_with_suspended_captured_process_pipeline( + xonsh_session, suspended_pipeline +): + xonsh_session.env["XONSH_INTERACTIVE"] = True + specs = cmds_to_specs(suspended_pipeline, captured="object") + p = _run_command_pipeline(specs, suspended_pipeline) + p.proc.send_signal(signal.SIGCONT) + p.end() + assert p.suspended + + @skip_if_on_windows @pytest.mark.parametrize( "cmds, exp_stream_lines, exp_list_lines", @@ -162,6 +192,7 @@ def test_interrupted_process_returncode(xonsh_session): ([["echo", "-n", "1\n2 3"]], "1\n2 3", ["1", "2 3"]), ], ) +@pytest.mark.flaky(reruns=3, reruns_delay=1) def test_subproc_output_format(cmds, exp_stream_lines, exp_list_lines, xonsh_session): xonsh_session.env["XONSH_SUBPROC_OUTPUT_FORMAT"] = "stream_lines" output = run_subproc(cmds, "stdout") diff --git a/tests/test_integrations.py b/tests/test_integrations.py index a0d1918ff..bc590a1f7 100644 --- a/tests/test_integrations.py +++ b/tests/test_integrations.py @@ -1239,17 +1239,26 @@ def test_catching_system_exit(): out, err, ret = run_xonsh( cmd=None, stdin_cmd=stdin_cmd, interactive=True, single_command=False, timeout=3 ) - if ON_WINDOWS: - assert ret == 1 - else: - assert ret == 2 + assert ret > 0 @skip_if_on_windows -@pytest.mark.flaky(reruns=3, reruns_delay=1) def test_catching_exit_signal(): - stdin_cmd = "kill -SIGHUP @(__import__('os').getpid())\n" + stdin_cmd = "sleep 0.2; kill -SIGHUP @(__import__('os').getpid())\n" out, err, ret = run_xonsh( cmd=None, stdin_cmd=stdin_cmd, interactive=True, single_command=False, timeout=3 ) assert ret > 0 + + +@skip_if_on_windows +def test_suspended_captured_process_pipeline(): + """See also test_specs.py:test_specs_with_suspended_captured_process_pipeline""" + stdin_cmd = "!(python -c 'import os, signal, time; time.sleep(0.2); os.kill(os.getpid(), signal.SIGTTIN)')\n" + out, err, ret = run_xonsh( + cmd=None, stdin_cmd=stdin_cmd, interactive=True, single_command=False, timeout=5 + ) + match = ".*suspended=True.*" + assert re.match( + match, out, re.MULTILINE | re.DOTALL + ), f"\nFailed:\n```\n{stdin_cmd.strip()}\n```,\nresult: {out!r}\nexpected: {match!r}." diff --git a/xonsh/jobs.py b/xonsh/jobs.py index 3a239d595..96143b2e7 100644 --- a/xonsh/jobs.py +++ b/xonsh/jobs.py @@ -16,7 +16,7 @@ from xonsh.cli_utils import Annotated, Arg, ArgParserAlias from xonsh.completers.tools import RichCompletion from xonsh.lazyasd import LazyObject from xonsh.platform import FD_STDERR, LIBC, ON_CYGWIN, ON_DARWIN, ON_MSYS, ON_WINDOWS -from xonsh.tools import on_main_thread, unthreadable +from xonsh.tools import get_signal_name, on_main_thread, unthreadable # Track time stamp of last exit command, so that two consecutive attempts to # exit can kill all jobs and exit. @@ -39,18 +39,84 @@ _jobs_thread_local = threading.local() _tasks_main: collections.deque[int] = collections.deque() -def waitpid(pid, opt): +def proc_untraced_waitpid(proc, hang, task=None, raise_child_process_error=False): """ - Transparent wrapper on `os.waitpid` to make notes about undocumented subprocess behavior. + Read a stop signals from the process and update the process state. + + Return code + =========== + Basically ``p = subprocess.Popen()`` populates ``p.returncode`` after ``p.wait()``, ``p.poll()`` or ``p.communicate()`` (https://docs.python.org/3/library/os.html#os.waitpid). But if you're using `os.waitpid()` BEFORE these functions you're capturing return code from a signal subsystem and ``p.returncode`` will be ``0``. - After ``os.waitid`` call you need to set return code manually - ``p.returncode = -os.WTERMSIG(status)`` like in Popen. + After ``os.waitid`` call you need to set return code and process signal manually. See also ``xonsh.tools.describe_waitpid_status()``. + + Signals + ======= + + The command that is waiting for input can be suspended by OS in case there is no terminal attached + because without terminal command will never end. Read more about SIGTTOU and SIGTTIN signals: + * https://www.linusakesson.net/programming/tty/ + * http://curiousthing.org/sigttin-sigttou-deep-dive-linux + * https://www.gnu.org/software/libc/manual/html_node/Job-Control-Signals.html """ - return os.waitpid(pid, opt) + + info = {"backgrounded": False, "signal": None, "signal_name": None} + + if ON_WINDOWS: + return info + + if proc is not None and getattr(proc, "pid", None) is None: + """ + When the process stopped before os.waitpid it has no pid. + Note that in this case there is high probability + that we will have return code 0 instead of real return code. + """ + if raise_child_process_error: + raise ChildProcessError("Process Identifier (PID) not found.") + else: + return info + + try: + """ + The WUNTRACED flag indicates that the caller wishes to wait for stopped or terminated + child processes, but doesn't want to return information about them. A stopped process is one + that has been suspended and is waiting to be resumed or terminated. + """ + opt = os.WUNTRACED if hang else (os.WUNTRACED | os.WNOHANG) + wpid, wcode = os.waitpid(proc.pid, opt) + except ChildProcessError: + wpid, wcode = 0, 0 + if raise_child_process_error: + raise + + if wpid == 0: + # Process has no changes in state. + pass + + elif os.WIFSTOPPED(wcode): + if task is not None: + task["status"] = "stopped" + info["backgrounded"] = True + proc.signal = (os.WSTOPSIG(wcode), os.WCOREDUMP(wcode)) + info["signal"] = os.WSTOPSIG(wcode) + proc.suspended = True + + elif os.WIFSIGNALED(wcode): + print() # get a newline because ^C will have been printed + proc.signal = (os.WTERMSIG(wcode), os.WCOREDUMP(wcode)) + proc.returncode = -os.WTERMSIG(wcode) # Popen default. + info["signal"] = os.WTERMSIG(wcode) + + else: + proc.returncode = os.WEXITSTATUS(wcode) + proc.signal = None + info["signal"] = None + + info["signal_name"] = f'{info["signal"]} {get_signal_name(info["signal"])}'.strip() + return info @contextlib.contextmanager @@ -144,7 +210,7 @@ if ON_WINDOWS: def _kill(job): subprocess.check_output( - ["taskkill", "/F", "/T", "/PID", str(job["obj"].pid)], + ["taskkill", "/F", "/T", "/PID", str(job["proc"].pid)], stderr=subprocess.STDOUT, ) @@ -165,11 +231,11 @@ if ON_WINDOWS: # Return when there are no foreground active task if active_task is None: return last_task - obj = active_task["obj"] + proc = active_task["proc"] _continue(active_task) - while obj.returncode is None: + while proc.returncode is None: try: - obj.wait(0.01) + proc.wait(0.01) except subprocess.TimeoutExpired: pass except KeyboardInterrupt: @@ -278,31 +344,23 @@ else: # Return when there are no foreground active task if active_task is None: return last_task - thread = active_task["obj"] - backgrounded = False + proc = active_task["proc"] + info = {"backgrounded": False} + try: - if thread.pid is None: - # When the process stopped before os.waitpid it has no pid. - raise ChildProcessError("The process PID not found.") - _, wcode = waitpid(thread.pid, os.WUNTRACED) - except ChildProcessError as e: # No child processes + info = proc_untraced_waitpid( + proc, hang=True, task=active_task, raise_child_process_error=True + ) + except ChildProcessError as e: if return_error: return e else: return _safe_wait_for_active_job( - last_task=active_task, backgrounded=backgrounded + last_task=active_task, backgrounded=info["backgrounded"] ) - if os.WIFSTOPPED(wcode): - active_task["status"] = "stopped" - backgrounded = True - elif os.WIFSIGNALED(wcode): - print() # get a newline because ^C will have been printed - thread.signal = (os.WTERMSIG(wcode), os.WCOREDUMP(wcode)) - thread.returncode = -os.WTERMSIG(wcode) # Default Popen - else: - thread.returncode = os.WEXITSTATUS(wcode) - thread.signal = None - return wait_for_active_job(last_task=active_task, backgrounded=backgrounded) + return wait_for_active_job( + last_task=active_task, backgrounded=info["backgrounded"] + ) def _safe_wait_for_active_job(last_task=None, backgrounded=False): @@ -344,8 +402,8 @@ def _clear_dead_jobs(): to_remove = set() tasks = get_tasks() for tid in tasks: - obj = get_task(tid)["obj"] - if obj is None or obj.poll() is not None: + proc = get_task(tid)["proc"] + if proc is None or proc.poll() is not None: to_remove.add(tid) for job in to_remove: tasks.remove(job) @@ -364,13 +422,13 @@ def format_job_string(num: int, format="dict") -> str: "cmd": " ".join( [" ".join(i) if isinstance(i, list) else i for i in job["cmds"]] ), - "pid": int(job["pids"][-1]) if job["pids"] else None, + "pids": job["pids"] if "pids" in job else None, } if format == "posix": r["pos"] = "+" if tasks[0] == num else "-" if tasks[1] == num else " " r["bg"] = " &" if job["bg"] else "" - r["pid"] = f"({r['pid']})" if r["pid"] else "" + r["pid"] = f"({','.join(str(pid) for pid in r['pids'])})" if r["pids"] else "" return "[{num}]{pos} {status}: {cmd}{bg} {pid}".format(**r) else: return repr(r) @@ -396,13 +454,21 @@ def add_job(info): """Add a new job to the jobs dictionary.""" num = get_next_job_number() info["started"] = time.time() - info["status"] = "running" + info["status"] = info["status"] if "status" in info else "running" get_tasks().appendleft(num) get_jobs()[num] = info if info["bg"] and XSH.env.get("XONSH_INTERACTIVE"): print_one_job(num) +def update_job_attr(pid, name, value): + """Update job attribute.""" + jobs = get_jobs() + for num, job in get_jobs().items(): + if "pids" in job and pid in job["pids"]: + jobs[num][name] = value + + def clean_jobs(): """Clean up jobs for exiting shell diff --git a/xonsh/procs/pipelines.py b/xonsh/procs/pipelines.py index d31723c5d..20483a638 100644 --- a/xonsh/procs/pipelines.py +++ b/xonsh/procs/pipelines.py @@ -94,6 +94,7 @@ class CommandPipeline: attrnames = ( "returncode", + "suspended", "pid", "args", "alias", @@ -153,6 +154,7 @@ class CommandPipeline: self._raw_output = self._raw_error = b"" self._stderr_prefix = self._stderr_postfix = None self.term_pgid = None + self.suspended = None background = self.spec.background pipeline_group = None @@ -186,11 +188,14 @@ class CommandPipeline: self.proc = self.procs[-1] def __repr__(self): - attrs = self.attrnames + ( - self.attrnames_ext if XSH.env.get("XONSH_DEBUG", False) else () - ) + debug = XSH.env.get("XONSH_DEBUG", False) + attrs = self.attrnames + (self.attrnames_ext if debug else ()) s = self.__class__.__name__ + "(\n " - s += ",\n ".join(a + "=" + repr(getattr(self, a)) for a in attrs) + s += ",\n ".join( + a + "=" + repr(getattr(self, a)) + for a in attrs + if debug or getattr(self, a) is not None + ) s += "\n)" return s @@ -288,7 +293,9 @@ class CommandPipeline: prev_end_time = None i = j = cnt = 1 while proc.poll() is None: - if getattr(proc, "suspended", False): + if getattr(proc, "suspended", False) or self._procs_suspended() is not None: + self.suspended = True + xj.update_job_attr(proc.pid, "status", "suspended") return elif getattr(proc, "in_alt_mode", False): time.sleep(0.1) # probably not leaving any time soon @@ -303,6 +310,7 @@ class CommandPipeline: self._close_prev_procs() proc.prevs_are_closed = True break + stdout_lines = safe_readlines(stdout, 1024) i = len(stdout_lines) if i != 0: @@ -513,6 +521,20 @@ class CommandPipeline: def _safe_close(self, handle): safe_fdclose(handle, cache=self._closed_handle_cache) + def _procs_suspended(self): + """Check procs and return suspended proc.""" + for proc in self.procs: + info = xj.proc_untraced_waitpid(proc, hang=False) + if getattr(proc, "suspended", False): + proc = getattr(proc, "proc", proc) + procname = f"{getattr(proc, 'args', '')} with pid {proc.pid}".strip() + print( + f"Process {procname} was suspended with signal {info['signal_name']} and placed in `jobs`.\n" + f"This happens when a process starts waiting for input but there is no terminal attached in captured mode.", + file=sys.stderr, + ) + return proc + def _prev_procs_done(self): """Boolean for if all previous processes have completed. If there is only a single process in the pipeline, this returns False. diff --git a/xonsh/procs/posix.py b/xonsh/procs/posix.py index 97f1b6478..209d4d2a4 100644 --- a/xonsh/procs/posix.py +++ b/xonsh/procs/posix.py @@ -14,6 +14,7 @@ import xonsh.lazyimps as xli import xonsh.platform as xp import xonsh.tools as xt from xonsh.built_ins import XSH +from xonsh.jobs import proc_untraced_waitpid from xonsh.procs.readers import ( BufferedFDParallelReader, NonBlockingFDReader, @@ -168,6 +169,16 @@ class PopenThread(threading.Thread): # loop over reads while process is running. i = j = cnt = 1 while proc.poll() is None: + info = proc_untraced_waitpid(proc, hang=False) + if getattr(proc, "suspended", False): + self.suspended = True + if XSH.env.get("XONSH_DEBUG", False): + procname = f"{getattr(proc, 'args', '')} {proc.pid}".strip() + print( + f"Process {procname} suspended with signal {info['signal_name']}.", + file=sys.stderr, + ) + # this is here for CPU performance reasons. if i + j == 0: cnt = min(cnt + 1, 1000) diff --git a/xonsh/procs/specs.py b/xonsh/procs/specs.py index e63a3a237..4a93e88e5 100644 --- a/xonsh/procs/specs.py +++ b/xonsh/procs/specs.py @@ -955,7 +955,8 @@ def _run_command_pipeline(specs, cmds): { "cmds": cmds, "pids": [i.pid for i in cp.procs], - "obj": proc, + "status": "suspended" if cp.suspended else "running", + "proc": proc, "bg": background, "pipeline": cp, "pgrp": cp.term_pgid, diff --git a/xonsh/tools.py b/xonsh/tools.py index cd97b5ff7..aa6eacd0d 100644 --- a/xonsh/tools.py +++ b/xonsh/tools.py @@ -31,6 +31,7 @@ import os import pathlib import re import shlex +import signal import string import subprocess import sys @@ -2839,6 +2840,14 @@ def to_repr_pretty_(inst, p, cycle): p.pretty(dict(inst)) +def get_signal_name(signum): + """Return a signal name by the signal number.""" + for name in dir(signal): + if name.startswith("SIG") and getattr(signal, name) == signum: + return name + return "" + + def describe_waitpid_status(status): """Describes ``pid, status = os.waitpid(pid, opt)`` status.""" funcs = [ @@ -2848,9 +2857,11 @@ def describe_waitpid_status(status): os.WTERMSIG, os.WIFSTOPPED, os.WSTOPSIG, + os.WCOREDUMP, ] for f in funcs: - print(f.__name__, "-", f(status), "-", f.__doc__) + s = f(status) + print(f.__name__, "-", s, get_signal_name(s), "-", f.__doc__) def unquote(s: str, chars="'\""):