298 lines
9.2 KiB
Python
298 lines
9.2 KiB
Python
|
"""
|
||
|
patch_stdout
|
||
|
============
|
||
|
|
||
|
This implements a context manager that ensures that print statements within
|
||
|
it won't destroy the user interface. The context manager will replace
|
||
|
`sys.stdout` by something that draws the output above the current prompt,
|
||
|
rather than overwriting the UI.
|
||
|
|
||
|
Usage::
|
||
|
|
||
|
with patch_stdout(application):
|
||
|
...
|
||
|
application.run()
|
||
|
...
|
||
|
|
||
|
Multiple applications can run in the body of the context manager, one after the
|
||
|
other.
|
||
|
"""
|
||
|
|
||
|
from __future__ import annotations
|
||
|
|
||
|
import asyncio
|
||
|
import queue
|
||
|
import sys
|
||
|
import threading
|
||
|
import time
|
||
|
from contextlib import contextmanager
|
||
|
from typing import Generator, TextIO, cast
|
||
|
|
||
|
from .application import get_app_session, run_in_terminal
|
||
|
from .output import Output
|
||
|
|
||
|
__all__ = [
|
||
|
"patch_stdout",
|
||
|
"StdoutProxy",
|
||
|
]
|
||
|
|
||
|
|
||
|
@contextmanager
|
||
|
def patch_stdout(raw: bool = False) -> Generator[None, None, None]:
|
||
|
"""
|
||
|
Replace `sys.stdout` by an :class:`_StdoutProxy` instance.
|
||
|
|
||
|
Writing to this proxy will make sure that the text appears above the
|
||
|
prompt, and that it doesn't destroy the output from the renderer. If no
|
||
|
application is curring, the behavior should be identical to writing to
|
||
|
`sys.stdout` directly.
|
||
|
|
||
|
Warning: If a new event loop is installed using `asyncio.set_event_loop()`,
|
||
|
then make sure that the context manager is applied after the event loop
|
||
|
is changed. Printing to stdout will be scheduled in the event loop
|
||
|
that's active when the context manager is created.
|
||
|
|
||
|
:param raw: (`bool`) When True, vt100 terminal escape sequences are not
|
||
|
removed/escaped.
|
||
|
"""
|
||
|
with StdoutProxy(raw=raw) as proxy:
|
||
|
original_stdout = sys.stdout
|
||
|
original_stderr = sys.stderr
|
||
|
|
||
|
# Enter.
|
||
|
sys.stdout = cast(TextIO, proxy)
|
||
|
sys.stderr = cast(TextIO, proxy)
|
||
|
|
||
|
try:
|
||
|
yield
|
||
|
finally:
|
||
|
sys.stdout = original_stdout
|
||
|
sys.stderr = original_stderr
|
||
|
|
||
|
|
||
|
class _Done:
|
||
|
"Sentinel value for stopping the stdout proxy."
|
||
|
|
||
|
|
||
|
class StdoutProxy:
|
||
|
"""
|
||
|
File-like object, which prints everything written to it, output above the
|
||
|
current application/prompt. This class is compatible with other file
|
||
|
objects and can be used as a drop-in replacement for `sys.stdout` or can
|
||
|
for instance be passed to `logging.StreamHandler`.
|
||
|
|
||
|
The current application, above which we print, is determined by looking
|
||
|
what application currently runs in the `AppSession` that is active during
|
||
|
the creation of this instance.
|
||
|
|
||
|
This class can be used as a context manager.
|
||
|
|
||
|
In order to avoid having to repaint the prompt continuously for every
|
||
|
little write, a short delay of `sleep_between_writes` seconds will be added
|
||
|
between writes in order to bundle many smaller writes in a short timespan.
|
||
|
"""
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
sleep_between_writes: float = 0.2,
|
||
|
raw: bool = False,
|
||
|
) -> None:
|
||
|
self.sleep_between_writes = sleep_between_writes
|
||
|
self.raw = raw
|
||
|
|
||
|
self._lock = threading.RLock()
|
||
|
self._buffer: list[str] = []
|
||
|
|
||
|
# Keep track of the curret app session.
|
||
|
self.app_session = get_app_session()
|
||
|
|
||
|
# See what output is active *right now*. We should do it at this point,
|
||
|
# before this `StdoutProxy` instance is possibly assigned to `sys.stdout`.
|
||
|
# Otherwise, if `patch_stdout` is used, and no `Output` instance has
|
||
|
# been created, then the default output creation code will see this
|
||
|
# proxy object as `sys.stdout`, and get in a recursive loop trying to
|
||
|
# access `StdoutProxy.isatty()` which will again retrieve the output.
|
||
|
self._output: Output = self.app_session.output
|
||
|
|
||
|
# Flush thread
|
||
|
self._flush_queue: queue.Queue[str | _Done] = queue.Queue()
|
||
|
self._flush_thread = self._start_write_thread()
|
||
|
self.closed = False
|
||
|
|
||
|
def __enter__(self) -> StdoutProxy:
|
||
|
return self
|
||
|
|
||
|
def __exit__(self, *args: object) -> None:
|
||
|
self.close()
|
||
|
|
||
|
def close(self) -> None:
|
||
|
"""
|
||
|
Stop `StdoutProxy` proxy.
|
||
|
|
||
|
This will terminate the write thread, make sure everything is flushed
|
||
|
and wait for the write thread to finish.
|
||
|
"""
|
||
|
if not self.closed:
|
||
|
self._flush_queue.put(_Done())
|
||
|
self._flush_thread.join()
|
||
|
self.closed = True
|
||
|
|
||
|
def _start_write_thread(self) -> threading.Thread:
|
||
|
thread = threading.Thread(
|
||
|
target=self._write_thread,
|
||
|
name="patch-stdout-flush-thread",
|
||
|
daemon=True,
|
||
|
)
|
||
|
thread.start()
|
||
|
return thread
|
||
|
|
||
|
def _write_thread(self) -> None:
|
||
|
done = False
|
||
|
|
||
|
while not done:
|
||
|
item = self._flush_queue.get()
|
||
|
|
||
|
if isinstance(item, _Done):
|
||
|
break
|
||
|
|
||
|
# Don't bother calling when we got an empty string.
|
||
|
if not item:
|
||
|
continue
|
||
|
|
||
|
text = []
|
||
|
text.append(item)
|
||
|
|
||
|
# Read the rest of the queue if more data was queued up.
|
||
|
while True:
|
||
|
try:
|
||
|
item = self._flush_queue.get_nowait()
|
||
|
except queue.Empty:
|
||
|
break
|
||
|
else:
|
||
|
if isinstance(item, _Done):
|
||
|
done = True
|
||
|
else:
|
||
|
text.append(item)
|
||
|
|
||
|
app_loop = self._get_app_loop()
|
||
|
self._write_and_flush(app_loop, "".join(text))
|
||
|
|
||
|
# If an application was running that requires repainting, then wait
|
||
|
# for a very short time, in order to bundle actual writes and avoid
|
||
|
# having to repaint to often.
|
||
|
if app_loop is not None:
|
||
|
time.sleep(self.sleep_between_writes)
|
||
|
|
||
|
def _get_app_loop(self) -> asyncio.AbstractEventLoop | None:
|
||
|
"""
|
||
|
Return the event loop for the application currently running in our
|
||
|
`AppSession`.
|
||
|
"""
|
||
|
app = self.app_session.app
|
||
|
|
||
|
if app is None:
|
||
|
return None
|
||
|
|
||
|
return app.loop
|
||
|
|
||
|
def _write_and_flush(
|
||
|
self, loop: asyncio.AbstractEventLoop | None, text: str
|
||
|
) -> None:
|
||
|
"""
|
||
|
Write the given text to stdout and flush.
|
||
|
If an application is running, use `run_in_terminal`.
|
||
|
"""
|
||
|
|
||
|
def write_and_flush() -> None:
|
||
|
# Ensure that autowrap is enabled before calling `write`.
|
||
|
# XXX: On Windows, the `Windows10_Output` enables/disables VT
|
||
|
# terminal processing for every flush. It turns out that this
|
||
|
# causes autowrap to be reset (disabled) after each flush. So,
|
||
|
# we have to enable it again before writing text.
|
||
|
self._output.enable_autowrap()
|
||
|
|
||
|
if self.raw:
|
||
|
self._output.write_raw(text)
|
||
|
else:
|
||
|
self._output.write(text)
|
||
|
|
||
|
self._output.flush()
|
||
|
|
||
|
def write_and_flush_in_loop() -> None:
|
||
|
# If an application is running, use `run_in_terminal`, otherwise
|
||
|
# call it directly.
|
||
|
run_in_terminal(write_and_flush, in_executor=False)
|
||
|
|
||
|
if loop is None:
|
||
|
# No loop, write immediately.
|
||
|
write_and_flush()
|
||
|
else:
|
||
|
# Make sure `write_and_flush` is executed *in* the event loop, not
|
||
|
# in another thread.
|
||
|
loop.call_soon_threadsafe(write_and_flush_in_loop)
|
||
|
|
||
|
def _write(self, data: str) -> None:
|
||
|
"""
|
||
|
Note: print()-statements cause to multiple write calls.
|
||
|
(write('line') and write('\n')). Of course we don't want to call
|
||
|
`run_in_terminal` for every individual call, because that's too
|
||
|
expensive, and as long as the newline hasn't been written, the
|
||
|
text itself is again overwritten by the rendering of the input
|
||
|
command line. Therefor, we have a little buffer which holds the
|
||
|
text until a newline is written to stdout.
|
||
|
"""
|
||
|
if "\n" in data:
|
||
|
# When there is a newline in the data, write everything before the
|
||
|
# newline, including the newline itself.
|
||
|
before, after = data.rsplit("\n", 1)
|
||
|
to_write = self._buffer + [before, "\n"]
|
||
|
self._buffer = [after]
|
||
|
|
||
|
text = "".join(to_write)
|
||
|
self._flush_queue.put(text)
|
||
|
else:
|
||
|
# Otherwise, cache in buffer.
|
||
|
self._buffer.append(data)
|
||
|
|
||
|
def _flush(self) -> None:
|
||
|
text = "".join(self._buffer)
|
||
|
self._buffer = []
|
||
|
self._flush_queue.put(text)
|
||
|
|
||
|
def write(self, data: str) -> int:
|
||
|
with self._lock:
|
||
|
self._write(data)
|
||
|
|
||
|
return len(data) # Pretend everything was written.
|
||
|
|
||
|
def flush(self) -> None:
|
||
|
"""
|
||
|
Flush buffered output.
|
||
|
"""
|
||
|
with self._lock:
|
||
|
self._flush()
|
||
|
|
||
|
@property
|
||
|
def original_stdout(self) -> TextIO:
|
||
|
return self._output.stdout or sys.__stdout__
|
||
|
|
||
|
# Attributes for compatibility with sys.__stdout__:
|
||
|
|
||
|
def fileno(self) -> int:
|
||
|
return self._output.fileno()
|
||
|
|
||
|
def isatty(self) -> bool:
|
||
|
stdout = self._output.stdout
|
||
|
if stdout is None:
|
||
|
return False
|
||
|
|
||
|
return stdout.isatty()
|
||
|
|
||
|
@property
|
||
|
def encoding(self) -> str:
|
||
|
return self._output.encoding()
|
||
|
|
||
|
@property
|
||
|
def errors(self) -> str:
|
||
|
return "strict"
|