Skip to content

Commit

Permalink
Merge pull request #1687 from rmartin16/yield-streaming
Browse files Browse the repository at this point in the history
Capture emulator `stdout` during startup
  • Loading branch information
freakboy3742 authored Mar 14, 2024
2 parents 65d425a + ebc42b8 commit 622d9b8
Show file tree
Hide file tree
Showing 7 changed files with 637 additions and 295 deletions.
1 change: 1 addition & 0 deletions changes/1687.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
On Windows, the Android emulator will always open without needing to press CTRL-C.
32 changes: 19 additions & 13 deletions src/briefcase/integrations/android_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,7 @@ def start_emulator(
if extra_args is None:
extra_args = []

# Start the emulator
emulator_popen = self.tools.subprocess.Popen(
[self.emulator_path, f"@{avd}", "-dns-server", "8.8.8.8"] + extra_args,
env=self.env,
Expand All @@ -1290,6 +1291,17 @@ def start_emulator(
start_new_session=True,
)

# Start capturing the emulator's output
# On Windows, the emulator can block until stdout is read and the emulator will
# not actually run until the user sends CTRL+C to Briefcase (#1573). This
# avoids that scenario while also ensuring emulator output is always available
# to print in the console if other issues occur.
emulator_streamer = self.tools.subprocess.stream_output_non_blocking(
label="Android emulator",
popen_process=emulator_popen,
capture_output=True,
)

# wrap AVD name in quotes since '@' is a special char in PowerShell
emulator_command = " ".join(
f'"{arg}"' if arg.startswith("@") else arg
Expand Down Expand Up @@ -1317,12 +1329,11 @@ def start_emulator(
failed_startup_error_msg = f"{{prologue}}\n{general_error_msg}"

# The boot process happens in 2 phases.
# First, the emulator appears in the device list. However, it's
# not ready until the boot process has finished. To determine
# the boot status, we need the device ID, and an ADB connection.
# First, the emulator appears in the device list. However, it's not ready until
# the boot process has finished. To determine the boot status, we need the
# device ID, and an ADB connection.

# Phase 1: Wait for the device to appear so we can get an
# ADB instance for the new device.
# Phase 1: Wait for the device to appear so we can get an ADB instance for it.
try:
with self.tools.input.wait_bar("Starting emulator...") as keep_alive:
adb = None
Expand Down Expand Up @@ -1375,14 +1386,7 @@ def start_emulator(
"Emulator output log for startup failure",
prefix=self.name,
)
try:
# if the emulator exited, this should return its output immediately
self.tools.logger.info(emulator_popen.communicate(timeout=1)[0])
except subprocess.TimeoutExpired:
self.tools.logger.info(
"Briefcase failed to retrieve emulator output "
"(this is expected if the emulator is running)"
)
self.tools.logger.info(emulator_streamer.captured_output)

# Provide troubleshooting steps if user gives up on the emulator starting
if isinstance(e, KeyboardInterrupt):
Expand All @@ -1399,6 +1403,8 @@ def start_emulator(
self.tools.logger.info(general_error_msg)

raise
finally:
emulator_streamer.request_stop()

# Return the device ID and full name.
return device, full_name
Expand Down
211 changes: 157 additions & 54 deletions src/briefcase/integrations/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import operator
import os
import queue
import shlex
import subprocess
import sys
Expand Down Expand Up @@ -56,7 +57,7 @@ def is_process_dead(pid: int) -> bool:
monitoring of a PID to identify when the process goes from existing to not existing.
:param pid: integer value to be checked if assigned as a PID.
:return: True if PID does not exist; False otherwise.
:returns: True if PID does not exist; False otherwise.
"""
return not psutil.pid_exists(pid)

Expand All @@ -75,7 +76,7 @@ def get_process_id_by_command(
takes a filepath to a directory for an application; therefore, the actual
running process will be running a command within that directory.
:param logger: optional Log to show messages about process matching to users
:return: PID if found else None
:returns: PID if found else None
"""
matching_procs = []
# retrieve command line, creation time, and ID for all running processes.
Expand Down Expand Up @@ -116,7 +117,7 @@ def inner(sub: Subprocess, args: SubprocessArgsT, *wrapped_args, **wrapped_kwarg
:param sub: Bound Subprocess object
:param args: command line to run in subprocess
:return: the return value for the Subprocess method
:returns: the return value for the Subprocess method
"""
# Just run the command if no dynamic elements are active
if not sub.tools.input.is_console_controlled:
Expand Down Expand Up @@ -145,6 +146,121 @@ def inner(sub: Subprocess, args: SubprocessArgsT, *wrapped_args, **wrapped_kwarg
return inner


class PopenOutputStreamer(threading.Thread):
def __init__(
self,
label: str,
popen_process: subprocess.Popen,
logger: Log,
capture_output: bool = False,
filter_func: Callable[[str], Iterator[str]] | None = None,
):
"""Thread for streaming stdout for a Popen process.
:param label: Descriptive name for process being streamed
:param popen_process: Popen process with stdout to stream
:param logger: logger for printing to console
:param capture_output: Retain process output in ``output_queue`` via a
``queue.Queue`` instead of printing to console
:param filter_func: a callable that will be invoked on every line of output
that is streamed; see ``Subprocess.stream_output`` for details
"""
super().__init__(name=f"{label} output streamer", daemon=True)

self.popen_process = popen_process
self.logger = logger
self.capture_output = capture_output
self.filter_func = filter_func

# arbitrarily large maxsize to prevent unbounded memory use if things go south
self.output_queue = queue.Queue(maxsize=10_000_000)
self.stop_flag = threading.Event()

def run(self):
"""Stream output for a Popen process."""
try:
while output_line := self._readline():
# The stop_flag is intentionally checked both at the top and bottom of
# this loop; if the flag was set during the call to readline(), then
# processing the output is skipped altogether. And if the flag is set
# as a consequence of filter_func(), the streamer still exits before
# calling readline() again and potentially blocking indefinitely.
if not self.stop_flag.is_set():
filtered_output, stop_streaming = self._filter(output_line)

for filtered_line in filtered_output:
if self.capture_output:
self.output_queue.put_nowait(filtered_line)
else:
self.logger.info(filtered_line)

if stop_streaming:
self.stop_flag.set()

if self.stop_flag.is_set():
break
except Exception as e:
self.logger.error(f"Error while streaming output: {type(e).__name__}: {e}")
self.logger.capture_stacktrace("Output thread")

def request_stop(self):
"""Set the stop flag to cause the streamer to exit.
If the streamer is currently blocking on `readline()` because the process'
stdout buffer is empty, then the streamer will not exit until `readline()`
returns or until Briefcase exits.
"""
self.stop_flag.set()

@property
def captured_output(self) -> str:
"""The captured output from the process."""
output = []
while not self.output_queue.empty():
with contextlib.suppress(queue.Empty):
output.append(self.output_queue.get_nowait())
self.output_queue.task_done()
return "".join(output)

def _readline(self) -> str:
"""Read a line of output from the process while blocking.
Calling readline() for stdout always returns at least a newline, i.e. "\n",
UNLESS the process is exiting or already exited; in that case, an empty string
is returned.
:returns: one line of output or "" if nothing more can be read from stdout
"""
try:
return ensure_str(self.popen_process.stdout.readline())
except ValueError as e:
# Catch ValueError if stdout is unexpectedly closed; this can
# happen, for instance, if the user starts spamming CTRL+C.
if "I/O operation on closed file" in str(e):
self.logger.warning(
"WARNING: stdout was unexpectedly closed while streaming output"
)
return ""
else:
raise

def _filter(self, line: str) -> tuple[list[str], bool]:
"""Run filter function over output from process."""
filtered_output = []
stop_streaming = False

if self.filter_func is not None:
try:
for filtered_line in self.filter_func(line.strip("\n")):
filtered_output.append(filtered_line)
except StopStreaming:
stop_streaming = True
else:
filtered_output.append(line)

return filtered_output, stop_streaming


class NativeAppContext(Tool):
"""A wrapper around subprocess for use as an app-bound tool."""

Expand Down Expand Up @@ -409,7 +525,7 @@ def run(
user. Can raise StopStreaming to terminate the output stream.
:param kwargs: keyword args for ``subprocess.run()``
:raises ValueError: if a filter function is provided when in non-streaming mode.
:return: ``CompletedProcess`` for invoked process
:returns: ``CompletedProcess`` for invoked process
"""

# Stream the output unless the caller explicitly disables it. When a
Expand Down Expand Up @@ -617,19 +733,19 @@ def stream_output(
:param label: A description of the content being streamed; used for to provide
context in logging messages.
:param popen_process: a running Popen process with output to print
:param stop_func: a Callable that returns True when output streaming should stop
:param stop_func: A Callable that returns True when output streaming should stop
and the popen_process should be terminated.
:param filter_func: a callable that will be invoked on every line of output that
:param filter_func: A callable that will be invoked on every line of output that
is streamed. The function accepts the "raw" line of input (stripped of any
trailing newline); it returns a generator that yields the filtered output
that should be displayed to the user. Can raise StopStreaming to terminate
the output stream.
"""
output_streamer = threading.Thread(
name=f"{label} output streamer",
target=self._stream_output_thread,
args=(popen_process, filter_func),
daemon=True,
output_streamer = PopenOutputStreamer(
label=label,
popen_process=popen_process,
logger=self.tools.logger,
filter_func=filter_func,
)
try:
output_streamer.start()
Expand All @@ -653,52 +769,39 @@ def stream_output(
"Log stream hasn't terminated; log output may be corrupted."
)

def _stream_output_thread(
def stream_output_non_blocking(
self,
popen_process: subprocess.Popen,
filter_func: Callable[[str], Iterator[str]],
):
"""Stream output for a Popen process in a Thread.
label: str,
popen_process: Popen,
capture_output: bool = False,
filter_func: Callable[[str], Iterator[str]] | None = None,
) -> PopenOutputStreamer:
"""Stream the output of a Popen process without blocking.
This is useful for streaming or capturing the output of a process in the
background. In this way, the process' output can be shown to users while the
main thread monitors other activities; alternatively, the output of the process
can be captured to be retrieved later in the event of an error, for instance.
:param popen_process: popen process to stream stdout
:param filter_func: a callable that will be invoked on every line
of output that is streamed; see ``stream_output`` for details.
:param label: A description of the content being streamed; used for to provide
context in logging messages.
:param popen_process: A running Popen process with output to print
:param capture_output: Retain process output instead of printing to the console
:param filter_func: A callable that will be invoked on every line of output that
is streamed. The function accepts the "raw" line of input (stripped of any
trailing newline); it returns a generator that yields the filtered output
that should be displayed to the user. Can raise StopStreaming to terminate
the output stream.
"""
try:
while True:
try:
output_line = ensure_str(popen_process.stdout.readline())
except ValueError as e:
# Catch ValueError if stdout is unexpectedly closed; this can
# happen, for instance, if the user starts spamming CTRL+C.
if "I/O operation on closed file" in str(e):
self.tools.logger.warning(
"WARNING: stdout was unexpectedly closed while streaming output"
)
return
else:
raise

# readline should always return at least a newline (ie \n) UNLESS
# the underlying process is exiting/gone; then "" is returned.
if output_line:
if filter_func is not None:
try:
for filtered_output in filter_func(
output_line.rstrip("\n")
):
self.tools.logger.info(filtered_output)
except StopStreaming:
return
else:
self.tools.logger.info(output_line)
else:
return
except Exception as e:
self.tools.logger.error(
f"Error while streaming output: {e.__class__.__name__}: {e}"
)
self.tools.logger.capture_stacktrace("Output thread")
output_streamer = PopenOutputStreamer(
label=label,
popen_process=popen_process,
logger=self.tools.logger,
capture_output=capture_output,
filter_func=filter_func,
)
output_streamer.start()
return output_streamer

def cleanup(self, label: str, popen_process: subprocess.Popen):
"""Clean up after a Popen process, gracefully terminating if possible; forcibly
Expand Down
Loading

0 comments on commit 622d9b8

Please # to comment.