diff --git a/CHANGELOG.md b/CHANGELOG.md index 5aa6218a1..601e9112e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,9 @@ prompt is displayed. - Changed `StatementParser.parse_command_only()` to return a `PartialStatement` object. - Renamed `Macro.arg_list` to `Macro.args`. - Removed `terminal_utils.py` since `prompt-toolkit` provides this functionality. + - Replaced `async_alert()` and `async_update_prompt()` with a single function called + `add_alert()`. This new function is thread-safe and does not require you to acquire a mutex + before calling it like the previous functions did. - Enhancements - New `cmd2.Cmd` parameters - **auto_suggest**: (boolean) if `True`, provide fish shell style auto-suggestions. These @@ -66,8 +69,6 @@ prompt is displayed. displaying realtime status information while the prompt is displayed, see the `cmd2.Cmd2.get_bottom_toolbar` method that can be overridden as well as the updated `getting_started.py` example - - Added `cmd2.Cmd._in_prompt` flag that is set to `True` when the prompt is displayed and the - application is waiting for user input - New `cmd2.Cmd` methods - **get_bottom_toolbar**: populates bottom toolbar if `bottom_toolbar` is `True` - **get_rprompt**: override to populate right prompt diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 5e7bfe19b..4f36c3f03 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -39,8 +39,12 @@ import sys import tempfile import threading +import time from code import InteractiveConsole -from collections import namedtuple +from collections import ( + deque, + namedtuple, +) from collections.abc import ( Callable, Iterable, @@ -48,6 +52,10 @@ MutableSequence, Sequence, ) +from dataclasses import ( + dataclass, + field, +) from types import FrameType from typing import ( IO, @@ -60,6 +68,7 @@ ) import rich.box +from prompt_toolkit import print_formatted_text from prompt_toolkit.application import get_app from rich.console import ( Group, @@ -177,6 +186,7 @@ def __init__(self, msg: str = '') -> None: Cmd2Completer, Cmd2History, Cmd2Lexer, + pt_filter_style, ) from .utils import ( Settable, @@ -273,6 +283,23 @@ def remove(self, command_method: CommandFunc) -> None: del self._parsers[full_method_name] +@dataclass(kw_only=True) +class AsyncAlert: + """Contents of an asynchonous alert which display while user is at prompt. + + :param msg: an optional message to be printed above the prompt. + :param prompt: an optional string to dynamically replace the current prompt. + + :ivar timestamp: monotonic creation time of the alert. If an alert was created + before the current prompt was rendered, the prompt update is ignored + to avoid a stale display but the msg will still be displayed. + """ + + msg: str | None = None + prompt: str | None = None + timestamp: float = field(default_factory=time.monotonic, init=False) + + class Cmd: """An easy but powerful framework for writing line-oriented command interpreters. @@ -370,7 +397,7 @@ def __init__( self._initialize_plugin_system() # Configure a few defaults - self.prompt = Cmd.DEFAULT_PROMPT + self.prompt: str = Cmd.DEFAULT_PROMPT self.intro = intro # What to use for standard input @@ -587,6 +614,14 @@ def __init__( # Command parsers for this Cmd instance. self._command_parsers: _CommandParsers = _CommandParsers(self) + # Members related to printing asychronous alerts + self._alert_queue: deque[AsyncAlert] = deque() + self._alert_condition = threading.Condition() + self._alert_allowed = False + self._alert_shutdown = False + self._alert_thread: threading.Thread | None = None + self._alert_prompt_timestamp: float = 0.0 # Uses time.monotonic() + # Add functions decorated to be subcommands self._register_subcommands(self) @@ -2588,7 +2623,7 @@ def pre_prompt(self) -> None: """Ran just before the prompt is displayed (and after the event loop has started).""" def precmd(self, statement: Statement | str) -> Statement: - """Ran just before the command is executed by [cmd2.Cmd.onecmd][] and after adding it to history (cmd Hook method). + """Ran just before the command is executed by [cmd2.Cmd.onecmd][] and after adding it to history (cmd Hook method). :param statement: subclass of str which also contains the parsed input :return: a potentially modified version of the input Statement object @@ -3200,9 +3235,9 @@ def _read_raw_input( ) -> str: """Execute the low-level input read from either a terminal or a redirected stream. - If the session is interactive (TTY), it uses `prompt_toolkit` to render a - rich UI with completion and `patch_stdout` protection. If non-interactive - (Pipe/File), it performs a direct line read from `stdin`. + If input is coming from a TTY, it uses `prompt_toolkit` to render a + UI with completion and `patch_stdout` protection. Otherwise it performs + a direct line read from `stdin`. :param prompt: the prompt text or a callable that returns the prompt. :param session: the PromptSession instance to use for reading. @@ -3214,6 +3249,8 @@ def _read_raw_input( # Check if the session is configured for interactive terminal use. if not isinstance(session.input, DummyInput): with patch_stdout(): + if not callable(prompt): + prompt = pt_filter_style(prompt) return session.prompt(prompt, completer=completer, **prompt_kwargs) # We're not at a terminal, so we're likely reading from a file or a pipe. @@ -3321,6 +3358,60 @@ def read_input( return self._read_raw_input(prompt, temp_session, completer_to_use) + def _process_alerts(self) -> None: + """Background worker that processes queued alerts and dynamic prompt updates.""" + while True: + with self._alert_condition: + # Wait until we have alerts and are allowed to display them, or shutdown is signaled. + self._alert_condition.wait_for( + lambda: (len(self._alert_queue) > 0 and self._alert_allowed) or self._alert_shutdown + ) + + # Shutdown immediately even if we have alerts. + if self._alert_shutdown: + break + + # Hold the condition lock while printing to block command execution. This + # prevents async alerts from printing once a command starts. + + # Print all alerts at once to reduce flicker. + alert_text = "\n".join(alert.msg for alert in self._alert_queue if alert.msg) + + # Find the latest prompt update among all pending alerts. + latest_prompt = None + for alert in reversed(self._alert_queue): + if ( + alert.prompt is not None + and alert.prompt != self.prompt + and alert.timestamp > self._alert_prompt_timestamp + ): + latest_prompt = alert.prompt + self._alert_prompt_timestamp = alert.timestamp + break + + # Clear the alerts + self._alert_queue.clear() + + if alert_text: + if not self._at_continuation_prompt and latest_prompt is not None: + # Update prompt now so patch_stdout can redraw it immediately. + self.prompt = latest_prompt + + # Print the alert messages above the prompt. + with patch_stdout(): + print_formatted_text(pt_filter_style(alert_text)) + + if self._at_continuation_prompt and latest_prompt is not None: + # Update state only. The onscreen prompt won't change until the next prompt starts. + self.prompt = latest_prompt + + elif latest_prompt is not None: + self.prompt = latest_prompt + + # Refresh UI immediately unless at a continuation prompt. + if not self._at_continuation_prompt: + get_app().invalidate() + def _read_command_line(self, prompt: str) -> str: """Read the next command line from the input stream. @@ -3331,19 +3422,43 @@ def _read_command_line(self, prompt: str) -> str: """ # Use dynamic prompt if the prompt matches self.prompt - def get_prompt() -> ANSI | str: - return ANSI(self.prompt) + def get_prompt() -> str | ANSI: + return pt_filter_style(self.prompt) prompt_to_use: Callable[[], ANSI | str] | ANSI | str = ANSI(prompt) if prompt == self.prompt: prompt_to_use = get_prompt - return self._read_raw_input( - prompt=prompt_to_use, - session=self.session, - completer=self.completer, - pre_run=self.pre_prompt, - ) + def _pre_prompt() -> None: + """Run standard pre-prompt processing and activate the background alerter.""" + self.pre_prompt() + + # Record when this prompt was rendered. + self._alert_prompt_timestamp = time.monotonic() + + # Start alerter thread if it's not already running. + if self._alert_thread is None or not self._alert_thread.is_alive(): + self._alert_allowed = False + self._alert_shutdown = False + self._alert_thread = threading.Thread(target=self._process_alerts, daemon=True) + self._alert_thread.start() + + # Allow alerts to be printed now that we are at a prompt. + with self._alert_condition: + self._alert_allowed = True + self._alert_condition.notify_all() + + try: + return self._read_raw_input( + prompt=prompt_to_use, + session=self.session, + completer=self.completer, + pre_run=_pre_prompt, + ) + finally: + # Ensure no alerts print while not at a prompt. + with self._alert_condition: + self._alert_allowed = False def _cmdloop(self) -> None: """Repeatedly issue a prompt, accept input, parse it, and dispatch to apporpriate commands. @@ -3371,7 +3486,18 @@ def _cmdloop(self) -> None: # Run the command along with all associated pre and post hooks stop = self.onecmd_plus_hooks(line) finally: - pass + with self.sigint_protection: + # Shut down the alert thread. + if self._alert_thread is not None: + with self._alert_condition: + self._alert_shutdown = True + self._alert_condition.notify_all() + + # The thread is event-driven and stays suspended until notified. + # We join with a 1 second timeout as a safety measure. If it hangs, + # the daemon status allows the OS to reap it on exit. + self._alert_thread.join(timeout=1.0) + self._alert_thread = None ############################################################# # Parsers and functions for alias command and subcommands @@ -5207,66 +5333,25 @@ def do__relative_run_script(self, args: argparse.Namespace) -> bool | None: # self.last_result will be set by do_run_script() return self.do_run_script(su.quote(relative_path)) - def async_alert(self, alert_msg: str, new_prompt: str | None = None) -> None: - """Display an important message to the user while they are at a command line prompt. - - To the user it appears as if an alert message is printed above the prompt and their - current input text and cursor location is left alone. + def add_alert(self, *, msg: str | None = None, prompt: str | None = None) -> None: + """Queue an asynchronous alert to be displayed when the prompt is active. - This function checks self._in_prompt to ensure a prompt is on screen. - If the main thread is not at the prompt, a RuntimeError is raised. + Examples: + add_alert(msg="System error!") # Print message only + add_alert(prompt="user@host> ") # Update prompt only + add_alert(msg="Done", prompt="> ") # Update both - This function is only needed when you need to print an alert or update the prompt while the - main thread is blocking at the prompt. Therefore, this should never be called from the main - thread. Doing so will raise a RuntimeError. + :param msg: an optional message to be printed above the prompt. + :param prompt: an optional string to dynamically replace the current prompt. - :param alert_msg: the message to display to the user - :param new_prompt: If you also want to change the prompt that is displayed, then include it here. - See async_update_prompt() docstring for guidance on updating a prompt. - :raises RuntimeError: if called from the main thread. - :raises RuntimeError: if main thread is not currently at the prompt. """ + if msg is None and prompt is None: + return - # Check if prompt is currently displayed and waiting for user input - def _alert() -> None: - if new_prompt is not None: - self.prompt = new_prompt - - if alert_msg: - # Since we are running in the loop, patch_stdout context manager from read_input - # should be active (if tty), or at least we are in the main thread. - print(alert_msg) - - if hasattr(self, 'session'): - # Invalidate to force prompt update - get_app().invalidate() - - # Schedule the alert to run on the main thread's event loop - try: - get_app().loop.call_soon_threadsafe(_alert) # type: ignore[union-attr] - except AttributeError: - # Fallback if loop is not accessible (e.g. prompt not running or session not initialized) - # This shouldn't happen if _in_prompt is True, unless prompt exited concurrently. - raise RuntimeError("Event loop not available") from None - - def async_update_prompt(self, new_prompt: str) -> None: # pragma: no cover - """Update the command line prompt while the user is still typing at it. - - This is good for alerting the user to system changes dynamically in between commands. - For instance you could alter the color of the prompt to indicate a system status or increase a - counter to report an event. If you do alter the actual text of the prompt, it is best to keep - the prompt the same width as what's on screen. Otherwise the user's input text will be shifted - and the update will not be seamless. - - If user is at a continuation prompt while entering a multiline command, the onscreen prompt will - not change. However, self.prompt will still be updated and display immediately after the multiline - line command completes. - - :param new_prompt: what to change the prompt to - :raises RuntimeError: if called from the main thread. - :raises RuntimeError: if main thread is not currently at the prompt. - """ - self.async_alert('', new_prompt) + with self._alert_condition: + alert = AsyncAlert(msg=msg, prompt=prompt) + self._alert_queue.append(alert) + self._alert_condition.notify_all() @staticmethod def set_window_title(title: str) -> None: # pragma: no cover diff --git a/cmd2/pt_utils.py b/cmd2/pt_utils.py index 2adde87db..c2a4ee6f3 100644 --- a/cmd2/pt_utils.py +++ b/cmd2/pt_utils.py @@ -26,6 +26,7 @@ utils, ) from . import rich_utils as ru +from . import string_utils as su if TYPE_CHECKING: # pragma: no cover from .cmd2 import Cmd @@ -34,6 +35,21 @@ BASE_DELIMITERS = " \t\n" + "".join(constants.QUOTES) + "".join(constants.REDIRECTION_CHARS) +def pt_filter_style(text: str | ANSI) -> str | ANSI: + """Strip styles if disallowed by ru.ALLOW_STYLE. Otherwise return an ANSI object. + + This function is intended specifically for text rendered by prompt-toolkit. + """ + # We only use prompt-toolkit to write to a terminal. Therefore + # we only have to check if ALLOW_STYLE is Never. + if ru.ALLOW_STYLE == ru.AllowStyle.NEVER: + raw_text = text.value if isinstance(text, ANSI) else text + return su.strip_style(raw_text) + + # String must be an ANSI object for prompt-toolkit to render ANSI style sequences. + return text if isinstance(text, ANSI) else ANSI(text) + + class Cmd2Completer(Completer): """Completer that delegates to cmd2's completion logic.""" @@ -72,16 +88,16 @@ def get_completions(self, document: Document, _complete_event: object) -> Iterab ) if completions.completion_error: - print_formatted_text(ANSI(completions.completion_error)) + print_formatted_text(pt_filter_style(completions.completion_error)) return # Print completion table if present if completions.completion_table: - print_formatted_text(ANSI("\n" + completions.completion_table)) + print_formatted_text(pt_filter_style("\n" + completions.completion_table)) # Print hint if present and settings say we should if completions.completion_hint and (self.cmd_app.always_show_hint or not completions): - print_formatted_text(ANSI(completions.completion_hint)) + print_formatted_text(pt_filter_style(completions.completion_hint)) if not completions: return @@ -103,9 +119,6 @@ def get_completions(self, document: Document, _complete_event: object) -> Iterab buffer.cursor_right(search_text_length) return - # Determine if we should remove style from completion text - remove_style = ru.ALLOW_STYLE == ru.AllowStyle.NEVER - # Return the completions for item in completions: # Set offset to the start of the current word to overwrite it with the completion @@ -134,8 +147,8 @@ def get_completions(self, document: Document, _complete_event: object) -> Iterab yield Completion( match_text, start_position=start_position, - display=item.display_plain if remove_style else ANSI(item.display), - display_meta=item.display_meta_plain if remove_style else ANSI(item.display_meta), + display=pt_filter_style(item.display), + display_meta=pt_filter_style(item.display_meta), ) @@ -215,8 +228,9 @@ def get_line(lineno: int) -> list[tuple[str, str]]: tokens: list[tuple[str, str]] = [] # Use cmd2's command pattern to find the first word (the command) - match = self.cmd_app.statement_parser._command_pattern.search(line) - if match: + if ru.ALLOW_STYLE != ru.AllowStyle.NEVER and ( + match := self.cmd_app.statement_parser._command_pattern.search(line) + ): # Group 1 is the command, Group 2 is the character(s) that terminated the command match command = match.group(1) cmd_start = match.start(1) @@ -277,7 +291,7 @@ def get_line(lineno: int) -> list[tuple[str, str]]: else: tokens.append(('', text)) elif line: - # No command match found, add the entire line unstyled + # No command match found or colors aren't allowed, add the entire line unstyled tokens.append(('', line)) return tokens diff --git a/docs/features/prompt.md b/docs/features/prompt.md index 546a40f94..fdb4e2391 100644 --- a/docs/features/prompt.md +++ b/docs/features/prompt.md @@ -28,18 +28,29 @@ for an example of dynamically updating the prompt. ## Asynchronous Feedback -`cmd2` provides these functions to provide asynchronous feedback to the user without interfering -with the command line. This means the feedback is provided to the user when they are still entering -text at the prompt. To use this functionality, the application must be running in a terminal that -supports [VT100](https://en.wikipedia.org/wiki/VT100) control characters. Linux, Mac, and Windows 10 -and greater all support these. - -- [cmd2.Cmd.async_alert][] -- [cmd2.Cmd.async_update_prompt][] - -`cmd2` also provides a function to change the title of the terminal window. This feature requires -the application be running in a terminal that supports VT100 control characters. Linux, Mac, and -Windows 10 and greater all support these. +`cmd2` provides a function to deliver asynchronous feedback to the user without interfering with the +command line. This allows feedback to be provided while the user is still entering text at the +prompt. + +- [cmd2.Cmd.add_alert][] + +### Asynchronous Feedback Mechanisms + +Alerts can interact with the CLI in two ways: + +1. **Message Printing**: It can print a message directly above the current prompt line. +1. **Prompt Updates**: It can dynamically replace the text of the active prompt to reflect changing + state. + +!!! note + + To ensure the user interface remains accurate, a prompt update is ignored if the alert + was created before the current prompt was rendered. This prevents older alerts from overwriting a newer + prompt, though the alert's message will still be printed. + +### Terminal Window Management + +`cmd2` also provides a function to change the title of the terminal window. - [cmd2.Cmd.set_window_title][] diff --git a/examples/async_printing.py b/examples/async_printing.py index bb58eb679..cd9ffa27c 100755 --- a/examples/async_printing.py +++ b/examples/async_printing.py @@ -3,9 +3,8 @@ and changes the window title. """ -import asyncio -import contextlib import random +import threading import time import cmd2 @@ -31,63 +30,50 @@ class AlerterApp(cmd2.Cmd): """An app that shows off async_alert() and async_update_prompt().""" - def __init__(self, *args, **kwargs) -> None: + def __init__(self) -> None: """Initializer.""" - super().__init__(*args, **kwargs) + super().__init__() self.prompt = "(APR)> " - # The task that will asynchronously alert the user of events - self._alerter_task: asyncio.Task | None = None - self._alerts_enabled = True + # The thread that will asynchronously alert the user of events + self._stop_event = threading.Event() + self._add_alert_thread = threading.Thread() self._alert_count = 0 - self._next_alert_time = 0 + self._next_alert_time = 0.0 - # Register hook to stop alerts when the command loop finishes + # Create some hooks to handle the starting and stopping of our thread + self.register_preloop_hook(self._preloop_hook) self.register_postloop_hook(self._postloop_hook) - def pre_prompt(self) -> None: - """Start the alerter task if enabled. - This is called after the prompt event loop has started, so create_background_task works. - """ - if self._alerts_enabled: - self._start_alerter_task() + def _preloop_hook(self) -> None: + """Start the alerter thread.""" + self._stop_event.clear() + self._add_alert_thread = threading.Thread(name='alerter', target=self._add_alerts_func) + self._add_alert_thread.start() def _postloop_hook(self) -> None: - """Stops the alerter task.""" - self._cancel_alerter_task() - - def do_start_alerts(self, _) -> None: - """Starts the alerter task.""" - if self._alerts_enabled: - print("The alert task is already started") + """Stops the alerter thread.""" + self._stop_event.set() + if self._add_alert_thread.is_alive(): + self._add_alert_thread.join() + + def do_start_alerts(self, _: cmd2.Statement) -> None: + """Starts the alerter thread.""" + if self._add_alert_thread.is_alive(): + print("The alert thread is already started") else: - self._alerts_enabled = True - # Task will be started in pre_prompt at next prompt - - def do_stop_alerts(self, _) -> None: - """Stops the alerter task.""" - if not self._alerts_enabled: - print("The alert task is already stopped") + self._stop_event.clear() + self._add_alert_thread = threading.Thread(name='alerter', target=self._add_alerts_func) + self._add_alert_thread.start() + + def do_stop_alerts(self, _: cmd2.Statement) -> None: + """Stops the alerter thread.""" + self._stop_event.set() + if self._add_alert_thread.is_alive(): + self._add_alert_thread.join() else: - self._alerts_enabled = False - self._cancel_alerter_task() - - def _start_alerter_task(self) -> None: - """Start the alerter task if it's not running.""" - if self._alerter_task is not None and not self._alerter_task.done(): - return - - # self.session.app is the prompt_toolkit Application. - # create_background_task creates a task that runs on the same loop as the app. - with contextlib.suppress(RuntimeError): - self._alerter_task = self.session.app.create_background_task(self._alerter()) - - def _cancel_alerter_task(self) -> None: - """Cancel the alerter task.""" - if self._alerter_task is not None: - self._alerter_task.cancel() - self._alerter_task = None + print("The alert thread is already stopped") def _get_alerts(self) -> list[str]: """Reports alerts @@ -143,7 +129,7 @@ def _generate_colored_prompt(self) -> str: """Randomly generates a colored prompt :return: the new prompt. """ - rand_num = random.randint(1, 20) + rand_num = random.randint(1, 6) status_color = Color.DEFAULT @@ -160,38 +146,29 @@ def _generate_colored_prompt(self) -> str: return stylize(self.visible_prompt, style=status_color) - async def _alerter(self) -> None: + def _add_alerts_func(self) -> None: """Prints alerts and updates the prompt any time the prompt is showing.""" self._alert_count = 0 self._next_alert_time = 0 - try: - while True: - # Get any alerts that need to be printed - alert_str = self._generate_alert_str() - - # Generate a new prompt - new_prompt = self._generate_colored_prompt() - - # Check if we have alerts to print - if alert_str: - # We are running on the main loop, so we can print directly. - # patch_stdout (active during read_input) handles the output. - print(alert_str) - - self.prompt = new_prompt - new_title = f"Alerts Printed: {self._alert_count}" - self.set_window_title(new_title) - self.session.app.invalidate() - - # Otherwise check if the prompt needs to be updated or refreshed - elif self.prompt != new_prompt: - self.prompt = new_prompt - self.session.app.invalidate() - - await asyncio.sleep(0.5) - except asyncio.CancelledError: - pass + while not self._stop_event.is_set(): + # Get any alerts that need to be printed + alert_str = self._generate_alert_str() + + # Generate a new prompt + new_prompt = self._generate_colored_prompt() + + # Check if we have alerts to print + if alert_str: + self.add_alert(msg=alert_str, prompt=new_prompt) + new_title = f"Alerts Printed: {self._alert_count}" + self.set_window_title(new_title) + + # Otherwise check if the prompt needs to be updated or refreshed + elif self.prompt != new_prompt: + self.add_alert(prompt=new_prompt) + + self._stop_event.wait(0.5) if __name__ == '__main__': diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index f13e8c53a..56e787b5a 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -1203,6 +1203,96 @@ def test_ctrl_d_at_prompt(say_app, monkeypatch) -> None: assert out == 'hello\n\n' +@pytest.mark.skipif( + sys.platform.startswith('win'), + reason="Don't have a real Windows console with how we are currently running tests in GitHub Actions", +) +@pytest.mark.parametrize( + ('msg', 'prompt', 'is_stale', 'at_continuation_prompt'), + [ + ("msg_text", None, False, False), + ("msg_text", "new_prompt> ", False, False), + ("msg_text", "new_prompt> ", False, True), + ("msg_text", "new_prompt> ", True, False), + ("msg_text", "new_prompt> ", True, True), + (None, "new_prompt> ", False, False), + (None, "new_prompt> ", False, True), + (None, "new_prompt> ", True, False), + (None, "new_prompt> ", True, True), + # Blank prompt is acceptable + ("msg_text", "", False, False), + (None, "", False, False), + ], +) +def test_async_alert(base_app, msg, prompt, is_stale, at_continuation_prompt) -> None: + import time + + with ( + mock.patch('cmd2.cmd2.print_formatted_text') as mock_print, + mock.patch('cmd2.cmd2.get_app') as mock_get_app, + ): + # Set up the chained mock: get_app() returns mock_app, which has invalidate() + mock_app = mock.MagicMock() + mock_get_app.return_value = mock_app + + base_app.add_alert(msg=msg, prompt=prompt) + alert = base_app._alert_queue[0] + + # Stale means alert was created before the current prompt. + if is_stale: + # In the past + alert.timestamp = 0.0 + else: + # In the future + alert.timestamp = time.monotonic() + 99999999 + + base_app._at_continuation_prompt = at_continuation_prompt + + with create_pipe_input() as pipe_input: + base_app.session = PromptSession( + input=pipe_input, + output=DummyOutput(), + history=base_app.session.history, + completer=base_app.session.completer, + ) + pipe_input.send_text("quit\n") + + base_app._cmdloop() + + # If there was a message, patch_stdout handles the redraw (no invalidate) + if msg: + assert msg in str(mock_print.call_args_list[0]) + mock_app.invalidate.assert_not_called() + + # If there's only a prompt update, we expect invalidate() only if not continuation/stale + elif prompt is not None: + if is_stale or at_continuation_prompt: + mock_app.invalidate.assert_not_called() + else: + mock_app.invalidate.assert_called_once() + + # The state of base_app.prompt should always be correct regardless of redraw + if prompt is not None: + if is_stale: + assert base_app.prompt != prompt + else: + assert base_app.prompt == prompt + + +def test_add_alert(base_app) -> None: + orig_num_alerts = len(base_app._alert_queue) + + # Nothing is added when both are None + base_app.add_alert(msg=None, prompt=None) + assert len(base_app._alert_queue) == orig_num_alerts + + # Now test valid alert arguments + base_app.add_alert(msg="Hello", prompt=None) + base_app.add_alert(msg="Hello", prompt="prompt> ") + base_app.add_alert(msg=None, prompt="prompt> ") + assert len(base_app._alert_queue) == orig_num_alerts + 3 + + class ShellApp(cmd2.Cmd): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) @@ -2733,6 +2823,27 @@ def test_perror_no_style(base_app, capsys) -> None: assert err == msg + end +@with_ansi_style(ru.AllowStyle.ALWAYS) +def test_psuccess(outsim_app) -> None: + msg = 'testing...' + end = '\n' + outsim_app.psuccess(msg) + + expected = su.stylize(msg + end, style=Cmd2Style.SUCCESS) + assert outsim_app.stdout.getvalue() == expected + + +@with_ansi_style(ru.AllowStyle.ALWAYS) +def test_pwarning(base_app, capsys) -> None: + msg = 'testing...' + end = '\n' + base_app.pwarning(msg) + + expected = su.stylize(msg + end, style=Cmd2Style.WARNING) + _out, err = capsys.readouterr() + assert err == expected + + @with_ansi_style(ru.AllowStyle.ALWAYS) def test_pexcept_style(base_app, capsys) -> None: msg = Exception('testing...') diff --git a/tests/test_pt_utils.py b/tests/test_pt_utils.py index 0d0feb443..b9a483756 100644 --- a/tests/test_pt_utils.py +++ b/tests/test_pt_utils.py @@ -18,8 +18,13 @@ stylize, utils, ) +from cmd2 import rich_utils as ru +from cmd2 import string_utils as su from cmd2.history import HistoryItem from cmd2.parsing import Statement +from cmd2.pt_utils import pt_filter_style + +from .conftest import with_ansi_style # Mock for cmd2.Cmd @@ -47,6 +52,48 @@ def mock_cmd_app() -> MockCmd: return MockCmd() +@with_ansi_style(ru.AllowStyle.ALWAYS) +def test_pt_filter_style_always() -> None: + """This should preserve all styles and return ANSI.""" + unstyled = "unstyled" + result = pt_filter_style(unstyled) + assert isinstance(result, ANSI) + assert result.value == unstyled + + styled = stylize("styled", Cmd2Style.COMMAND_LINE) + result = pt_filter_style(styled) + assert isinstance(result, ANSI) + assert result.value == styled + + +@with_ansi_style(ru.AllowStyle.TERMINAL) +def test_pt_filter_style_terminal() -> None: + """This should preserve all styles and return ANSI.""" + unstyled = "unstyled" + result = pt_filter_style(unstyled) + assert isinstance(result, ANSI) + assert result.value == unstyled + + styled = stylize("styled", Cmd2Style.COMMAND_LINE) + result = pt_filter_style(styled) + assert isinstance(result, ANSI) + assert result.value == styled + + +@with_ansi_style(ru.AllowStyle.NEVER) +def test_pt_filter_style_never() -> None: + """This should strip all styles and return str.""" + unstyled = "unstyled" + result = pt_filter_style(unstyled) + assert isinstance(result, str) + assert result == unstyled + + styled = stylize("styled", Cmd2Style.COMMAND_LINE) + result = pt_filter_style(styled) + assert isinstance(result, str) + assert result == su.strip_style(styled) + + class TestCmd2Lexer: def test_lex_document_command(self, mock_cmd_app): """Test lexing a command name."""