From c7962587e7f1e619b5dcf9a5e901eb7042520b00 Mon Sep 17 00:00:00 2001 From: Geoffrey Poore Date: Tue, 30 Jun 2020 23:55:43 -0500 Subject: [PATCH] added live_output option to display stdout and stderr live in terminal during code execution (#21) --- CHANGELOG.md | 3 + README.md | 18 ++ codebraid/codeprocessors/base.py | 280 ++++++++++++++++++++++++------ codebraid/converters/base.py | 3 +- codebraid/converters/pandoc.py | 1 + codebraid/languages/python.bespon | 2 +- codebraid/version.py | 2 +- 7 files changed, 249 insertions(+), 60 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d71f60d..8ba7747 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## v0.5.0 (dev) +* Added `live_output` option for the first code chunk in a session. This + shows code output (stdout and stderr) live in the terminal during code + execution (#21). * "Includes" are now skipped during internal, intermediate file transformations, which prevents duplicated "includes" and associated errors (#20). This applies to `header-includes`, `include-before`, diff --git a/README.md b/README.md index 96ba92f..b1699ff 100644 --- a/README.md +++ b/README.md @@ -327,6 +327,24 @@ session is in use). * `jupyter_timeout`={int} — Jupyter kernel timeout per code chunk in seconds. The default is 60. +* `live_output`={`true`, `false`} — Show code output (stdout and stderr) live + in the terminal during code execution. Output still appears in the document + as normal. + + All output is written to stderr, so stdout only contains the document when + `--output` is not specified. Output is interspersed with delimiters marking + the start of each session and the start of each code chunk. The delimiters + for the start of each code chunk include source names and line numbers. + + The output for a code chunk may be delayed until all code in the chunk has + finished executing, unless code output is line buffered or code manually + flushes stdout and stderr. For example, with Python you may want to use + print functions like `print("text", flush=True)`. Another option is to use + Python in line-buffered mode by setting `executable="python -u"` or + `executable="python3 -u"` in the first code chunk of a session. + + This option currently has no effect for Jupyter kernels. + #### Execution diff --git a/codebraid/codeprocessors/base.py b/codebraid/codeprocessors/base.py index 8364fb7..cf883f5 100644 --- a/codebraid/codeprocessors/base.py +++ b/codebraid/codeprocessors/base.py @@ -9,6 +9,7 @@ import atexit +from sys import stderr, stdout import bespon import collections import hashlib @@ -17,12 +18,15 @@ import locale import pathlib import pkgutil +import queue import re import subprocess import shlex import shutil import sys +import threading import tempfile +import textwrap import time import zipfile from .. import err @@ -617,8 +621,7 @@ def _load_cache(self, session): pass - def _subproc(self, cmd, tmpdir_path, hash, - pipes=True, stderr_is_stdout=False): + def _subproc_default(self, *, session, stage, stage_num, stage_tot_num, cmd, encoding, stderr_is_stdout=False): ''' Wrapper around `subprocess.run()` that provides a single location for customizing handling. @@ -627,54 +630,195 @@ def _subproc(self, cmd, tmpdir_path, hash, # it is ever necessary to switch to non-posix paths under Windows, the # backslashes will require extra escaping. args = shlex.split(cmd) - failed_proc_stderr = 'COMMAND FAILED (missing program or file): {0}'.format(cmd).encode('utf8') - if pipes: - if stderr_is_stdout: - try: - proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - except FileNotFoundError: - proc = FailedProcess(args, stdout=failed_proc_stderr) + failed_proc_stderr = 'COMMAND FAILED (missing program or file):\n {0}'.format(cmd).encode('utf8') + if stderr_is_stdout: + try: + proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + except FileNotFoundError: + proc = FailedProcess(args, stdout=failed_proc_stderr) + else: + try: + proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except FileNotFoundError: + proc = FailedProcess(args, stdout=b'', stderr=failed_proc_stderr) + return proc + + + def _subproc_live_output(self, *, session, stage, stage_num, stage_tot_num, cmd, encoding, stderr_is_stdout=False): + ''' + Drop-in replacement for `_subproc_default()` for when stdout and + stderr need to be both recorded and passed through (for example, for a + progress bar). + ''' + args = shlex.split(cmd) + failed_proc_stderr = 'COMMAND FAILED (missing program or file):\n {0}'.format(cmd).encode('utf8') + # Queue of bytes from stdout and stderr, plus string delims + print_queue = queue.Queue() + hash_bytes = session.hash[:64].encode('utf8') + delim_border_n_chars = 60 + if stage == 'run': + delim_text = 'run: {lang}, session {session}\n' + delim_text = delim_text.format(lang=session.lang, + session='"{0}"'.format(session.name) if session.name is not None else '') + chunk_delim_text = '''\ + run: {lang}, session {session}, chunk {{chunk}}/{total_chunks} + "{{source}}", line {{line}} + ''' + chunk_delim_text = textwrap.dedent(chunk_delim_text) + chunk_delim_text = chunk_delim_text.format(lang=session.lang, + session='"{0}"'.format(session.name) if session.name is not None else '', + total_chunks=len(session.code_chunks)) + chunk_start_delim = '\n' + '='*delim_border_n_chars + '\n' + chunk_delim_text + '-'*delim_border_n_chars + '\n' + else: + if stage_num == stage_tot_num: + delim_text = '{stage}: {lang}, session {session}\n' else: - try: - proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - except FileNotFoundError: - proc = FailedProcess(args, stdout=b'', stderr=failed_proc_stderr) + delim_text = '{stage} ({num}/{tot}): {lang}, session {session}\n' + delim_text = delim_text.format(stage=stage, num=stage_num, tot=stage_tot_num, + lang=session.lang, + session='"{0}"'.format(session.name) if session.name is not None else '') + stage_start_delim = '\n' + '#'*delim_border_n_chars + '\n' + delim_text + '#'*delim_border_n_chars + '\n' + print(stage_start_delim, end='', file=sys.stderr, flush=True) + + def stream_reader(stream, buffer, is_stdout, is_stderr, stdout_lock=None, stderr_lock=None): + ''' + Read bytes from a stream (stdout or stderr) and pass them on to a + buffer (list) and print queue (queue.Queue). Bytes are accumulated in + a local buffer, and only passed on a line at a time. This allows + Codebraid delims to be filtered out. + ''' + local_buffer = [] + while True: + output = stream.read(1) + if not output: + if local_buffer: + line = b''.join(local_buffer) + buffer.append(line) + print_queue.put(line) + break + if output == b'\n': + if not local_buffer: + # Could be leading `\n` from Codebraid delim + local_buffer.append(output) + continue + local_buffer.append(output) + line = b''.join(local_buffer) + buffer.append(line) + local_buffer = [] + # Codebraid delim starts with `\n`, but that will be used + # by whatever was printed just before it when a trailing + # newline has been omitted. + if (stage == 'run' and + (line.startswith(b'CodebraidStd') or line.startswith(b'\nCodebraidStd')) and + hash_bytes in line): + # When stdout_is_stderr == False, locks are used to + # ensure that stdout and stderr stay in sync. Lock + # usage is based on the stdout delim always being + # printed immediately before the stderr delim. + if not is_stderr: + stdout_lock.acquire() + stderr_lock.release() + elif not is_stdout: + stderr_lock.acquire() + stdout_lock.release() + if is_stderr: + chunk_number = int(line.split(b'chunk=', 1)[1].split(b',', 1)[0]) + cc = session.code_chunks[chunk_number] + print_queue.put(chunk_start_delim.format(source=cc.source_name, + line=cc.source_start_line_number, + chunk=cc.session_index+1)) + else: + print_queue.put(line) + continue + if output == b'\r': + local_buffer.append(output) + line = b''.join(local_buffer) + buffer.append(line) + print_queue.put(line) + local_buffer = [] + continue + local_buffer.append(output) + + if stderr_is_stdout: + std_buffer = [] + try: + popen = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + std_thread = threading.Thread(target=stream_reader, + args=(popen.stdout, std_buffer, True, True)) + std_thread.daemon = True + std_thread.start() + while popen.poll() is None: + try: + line = print_queue.get(block=True, timeout=0.1) + except queue.Empty: + continue + if isinstance(line, bytes): + line = line.decode(encoding=encoding, errors='backslashreplace') + print(line, end='', file=sys.stderr, flush=True) + std_thread.join() + while True: + try: + line = print_queue.get(block=False) + except queue.Empty: + break + if isinstance(line, bytes): + line = line.decode(encoding=encoding, errors='backslashreplace') + print(line, end='', file=sys.stderr, flush=True) + print('\n', end='', file=sys.stderr, flush=True) + proc = subprocess.CompletedProcess(popen.args, popen.returncode, b''.join(std_buffer), b'') + except FileNotFoundError: + proc = FailedProcess(args, stdout=b'', stderr=failed_proc_stderr) else: - # When stdout and stderr are stored in files rather than accessed - # through pipes, the files are named using a session-derived hash - # as a precaution against code accessing them and against - # collisions. - stdout_path = tmpdir_path / '{0}.stdout'.format(hash) - stderr_path = tmpdir_path / '{0}.stderr'.format(hash) - if stderr_is_stdout: - with open(str(stdout_path), 'wb') as fout: + stdout_lock = threading.Lock() + stderr_lock = threading.Lock() + stdout_buffer = [] + stderr_buffer = [] + try: + popen = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout_thread = threading.Thread(target=stream_reader, + args=(popen.stdout, stdout_buffer, True, False, stdout_lock, stderr_lock)) + stdout_thread.daemon = True + stderr_thread = threading.Thread(target=stream_reader, + args=(popen.stderr, stderr_buffer, False, True, stdout_lock, stderr_lock)) + stderr_thread.daemon = True + stdout_lock.acquire() + stdout_thread.start() + stderr_thread.start() + while popen.poll() is None: try: - proc = subprocess.run(args, stdout=fout, stderr=subprocess.STDOUT) - except FileNotFoundError: - proc = FailedProcess(args, stdout=failed_proc_stderr) - if not isinstance(proc, FailedProcess): - proc.stdout = stdout_path.read_bytes() - else: - with open(str(stdout_path), 'wb') as fout: - with open(str(stderr_path), 'wb') as ferr: - try: - proc = subprocess.run(args, stdout=fout, stderr=ferr) - except FileNotFoundError: - proc = FailedProcess(args, stdout=b'', stderr=failed_proc_stderr) - if not isinstance(proc, FailedProcess): - proc.stdout = stdout_path.read_bytes() - proc.stderr = stderr_path.read_bytes() + line = print_queue.get(block=True, timeout=0.1) + except queue.Empty: + continue + if isinstance(line, bytes): + line = line.decode(encoding=encoding, errors='backslashreplace') + print(line, end='', file=sys.stderr, flush=True) + stdout_thread.join() + stderr_thread.join() + while True: + try: + line = print_queue.get(block=False) + except queue.Empty: + break + if isinstance(line, bytes): + line = line.decode(encoding=encoding, errors='backslashreplace') + print(line, end='', file=sys.stderr, flush=True) + print('\n', end='', file=sys.stderr, flush=True) + proc = subprocess.CompletedProcess(popen.args, popen.returncode, b''.join(stdout_buffer), b''.join(stderr_buffer)) + except FileNotFoundError: + proc = FailedProcess(args, stdout=b'', stderr=failed_proc_stderr) + return proc def _run(self, session): stdstream_delim_start = 'CodebraidStd' - stdstream_delim = r'{0}(hash="{1}", chunk={{0}})'.format(stdstream_delim_start, session.hash[:64]) + stdstream_delim = r'{0}(hash="{1}", chunk={{chunk}}, output_chunk={{output_chunk}},)'.format(stdstream_delim_start, session.hash[:64]) stdstream_delim_escaped = stdstream_delim.replace('"', '\\"') stdstream_delim_start_hash = stdstream_delim.split(',', 1)[0] expression_delim_start = 'CodebraidExpr' - expression_delim = r'{0}(hash="{1}")'.format(expression_delim_start, session.hash[64:]) + expression_delim = r'{0}(hash="{1}", chunk={{chunk}}, output_chunk={{output_chunk}},)'.format(expression_delim_start, session.hash[64:]) expression_delim_escaped = expression_delim.replace('"', '\\"') + expression_delim_start_hash = expression_delim.split(',', 1)[0] run_code_list = [] run_code_line_number = 1 user_code_line_number = 1 @@ -698,7 +842,7 @@ def _run(self, session): last_cc = None expected_stdstream_delims = [] # Track expected chunk numbers for cc in session.code_chunks: - delim = stdstream_delim_escaped.format(cc.session_output_index) + delim = stdstream_delim_escaped.format(chunk=cc.session_index, output_chunk=cc.session_output_index) if last_cc is None: if not cc.options['outside_main']: run_code_list.append(chunk_wrapper_before.format(stdout_delim=delim, stderr_delim=delim)) @@ -719,8 +863,9 @@ def _run(self, session): # check expr compatibility with `complete`, etc.; that's # handled in creating sessions. if cc.is_expr: - expr_code = session.lang_def.inline_expression_formatter.format(stdout_delim=expression_delim_escaped, - stderr_delim=expression_delim_escaped, + expr_delim = expression_delim_escaped.format(chunk=cc.session_index, output_chunk=cc.session_output_index) + expr_code = session.lang_def.inline_expression_formatter.format(stdout_delim=expr_delim, + stderr_delim=expr_delim, temp_suffix=session.temp_suffix, code=cc.code) run_code_list.append(expr_code) @@ -760,36 +905,53 @@ def _run(self, session): 'source_dir': source_dir_path.as_posix(), 'source_without_extension': (source_dir_path / source_name).as_posix()} - for cmd_template in session.lang_def.pre_run_commands: + live_output = session.code_chunks[0].options['first_chunk_options'].get('live_output', False) + if live_output: + subproc = self._subproc_live_output + else: + subproc = self._subproc_default + + for n, cmd_template in enumerate(session.lang_def.pre_run_commands): if error: break - pre_proc = self._subproc(cmd_template.format(**template_dict), source_dir_path, session.hash_root, stderr_is_stdout=True) + encoding = session.lang_def.pre_run_encoding or locale.getpreferredencoding(False) + pre_proc = subproc(session=session, + stage='pre-run', stage_num=n+1, stage_tot_num=len(session.lang_def.pre_run_commands), + cmd=cmd_template.format(**template_dict), + encoding=encoding, + stderr_is_stdout=True) if pre_proc.returncode != 0: error = True session.pre_run_errors = True - encoding = session.lang_def.pre_run_encoding or locale.getpreferredencoding(False) stdout_str = io.TextIOWrapper(io.BytesIO(pre_proc.stdout), encoding=encoding, errors='backslashreplace').read() session.pre_run_error_lines = util.splitlines_lf(stdout_str) - for cmd_template in session.lang_def.compile_commands: + for n, cmd_template in enumerate(session.lang_def.compile_commands): if error: break - comp_proc = self._subproc(cmd_template.format(**template_dict), source_dir_path, session.hash_root, stderr_is_stdout=True) + encoding = session.lang_def.compile_encoding or locale.getpreferredencoding(False) + comp_proc = subproc(session=session, + stage='compile', stage_num=n+1, stage_tot_num=len(session.lang_def.compile_commands), + cmd=cmd_template.format(**template_dict), + encoding=encoding, + stderr_is_stdout=True) if comp_proc.returncode != 0: error = True session.compile_errors = True - encoding = session.lang_def.compile_encoding or locale.getpreferredencoding(False) stdout_lines = [] stdout_str = io.TextIOWrapper(io.BytesIO(comp_proc.stdout), encoding=encoding, errors='backslashreplace').read() stderr_lines = util.splitlines_lf(stdout_str) if not error: cmd_template = session.lang_def.run_command - run_proc = self._subproc(cmd_template.format(**template_dict), source_dir_path, session.hash_root) + encoding = session.lang_def.run_encoding or locale.getpreferredencoding(False) + run_proc = subproc(session=session, + stage='run', stage_num=1, stage_tot_num=1, + cmd=cmd_template.format(**template_dict), + encoding=encoding) if run_proc.returncode != 0: error = True session.run_errors = True - encoding = session.lang_def.run_encoding or locale.getpreferredencoding(False) try: stdout_str = io.TextIOWrapper(io.BytesIO(run_proc.stdout), encoding=encoding).read() stderr_str = io.TextIOWrapper(io.BytesIO(run_proc.stderr), encoding=encoding).read() @@ -800,14 +962,18 @@ def _run(self, session): stdout_lines = util.splitlines_lf(stdout_str) stderr_lines = util.splitlines_lf(stderr_str) - for cmd_template in session.lang_def.post_run_commands: + for n, cmd_template in enumerate(session.lang_def.post_run_commands): if error: break - post_proc = self._subproc(cmd_template.format(**template_dict), source_dir_path, session.hash_root, stderr_is_stdout=True) + encoding = session.lang_def.post_run_encoding or locale.getpreferredencoding(False) + post_proc = subproc(session=session, + stage='post-run', stage_num=n+1, stage_tot_num=len(session.lang_def.post_run_commands), + cmd=cmd_template.format(**template_dict), + encoding=encoding, + stderr_is_stdout=True) if post_proc.returncode != 0: error = True session.post_run_errors = True - encoding = session.lang_def.post_run_encoding or locale.getpreferredencoding(False) stdout_str = io.TextIOWrapper(io.BytesIO(post_proc.stdout), encoding=encoding, errors='backslashreplace').read() session.post_run_error_lines = util.splitlines_lf(stdout_str) @@ -831,7 +997,7 @@ def _run(self, session): # Ensure that there's at least one delimiter to serve as a # sentinel, even if the code never ran due to something like a # syntax error or compilation error - sentinel_delim = stdstream_delim.format(-1) + sentinel_delim = stdstream_delim.format(chunk=-1,output_chunk=-1) stdout_lines.append('') stdout_lines.append(sentinel_delim) stderr_lines.append('') @@ -860,7 +1026,7 @@ def _run(self, session): chunk_end_index = 0 for index, line in enumerate(stdout_lines): if line.startswith(stdstream_delim_start) and line.startswith(stdstream_delim_start_hash): - next_session_output_index = int(line.split('chunk=', 1)[1].split(')', 1)[0]) + next_session_output_index = int(line.split('output_chunk=', 1)[1].split(',', 1)[0]) if index > 0: chunk_end_index = index - 1 if stdout_lines[chunk_end_index]: @@ -869,7 +1035,7 @@ def _run(self, session): if session_output_index >= 0 and session.code_chunks[session_output_index].is_expr: combined_lines = stdout_lines[chunk_start_index:chunk_end_index] for combined_index, combined_line in enumerate(combined_lines): - if combined_line.startswith(expression_delim_start) and combined_line.startswith(expression_delim): + if combined_line.startswith(expression_delim_start) and combined_line.startswith(expression_delim_start_hash): if combined_index + 1 < len(combined_lines): chunk_expr_dict[session_output_index] = combined_lines[combined_index+1:] if not combined_lines[combined_index-1]: @@ -900,7 +1066,7 @@ def _run(self, session): expected_stdstream_delims_iter = iter(expected_stdstream_delims) for index, line in enumerate(stderr_lines): if line.startswith(stdstream_delim_start) and line.startswith(stdstream_delim_start_hash): - next_session_output_index = int(line.split('chunk=', 1)[1].split(')', 1)[0]) + next_session_output_index = int(line.split('output_chunk=', 1)[1].split(',', 1)[0]) if next_session_output_index == session_output_index and next_session_output_index >= 0: # A code chunk that is not actually complete was run # with the default `complete=true`, and this resulted @@ -950,7 +1116,7 @@ def _run(self, session): cc_stderr_lines = stderr_lines[chunk_start_index:chunk_end_index] if session_output_index >= 0 and session.code_chunks[session_output_index].is_expr: for combined_index, combined_line in enumerate(cc_stderr_lines): - if combined_line.startswith(expression_delim_start) and combined_line.startswith(expression_delim): + if combined_line.startswith(expression_delim_start) and combined_line.startswith(expression_delim_start_hash): if cc_stderr_lines[combined_index-1]: del cc_stderr_lines[combined_index] else: diff --git a/codebraid/converters/base.py b/codebraid/converters/base.py index 249f410..dd8bf30 100644 --- a/codebraid/converters/base.py +++ b/codebraid/converters/base.py @@ -319,7 +319,7 @@ def finalize_after_copy(self): for kw in ('first_number', 'line_numbers', 'rewrap_lines', 'rewrap_width', 'expand_tabs', 'tab_size')]) _first_chunk_execute_keywords = set(['executable', 'jupyter_kernel']) _first_chunk_save_keywords = set(['save', 'save_as']) - _first_chunk_other_keywords = set(['jupyter_timeout']) + _first_chunk_other_keywords = set(['jupyter_timeout', 'live_output']) _first_chunk_keywords = _first_chunk_execute_keywords | _first_chunk_save_keywords | _first_chunk_other_keywords keywords = _base_keywords | _layout_keywords | _first_chunk_keywords @@ -432,6 +432,7 @@ def _option_first_chunk_int_warning(self, key, value): _option_jupyter_timeout = _option_first_chunk_int_warning _option_save = _option_first_chunk_bool_error _option_save_as = _option_first_chunk_string_error + _option_live_output = _option_first_chunk_bool_error _option_example = _option_bool_warning _option_lang = _option_str_error diff --git a/codebraid/converters/pandoc.py b/codebraid/converters/pandoc.py index 7dd2426..debd928 100644 --- a/codebraid/converters/pandoc.py +++ b/codebraid/converters/pandoc.py @@ -187,6 +187,7 @@ def keyval_namespace(code_chunk, options, key, value): 'jupyter_timeout': keyval_int, **line_anchors, **line_numbers, + 'live_output': keyval_bool, 'name': keyval_generic, 'outside_main': keyval_bool, **rewrap_lines, diff --git a/codebraid/languages/python.bespon b/codebraid/languages/python.bespon index 6b23f91..588469b 100644 --- a/codebraid/languages/python.bespon +++ b/codebraid/languages/python.bespon @@ -18,7 +18,7 @@ source_template = chunk_wrapper = |``` print('\n{stdout_delim}', flush=True) - sys.stderr.write('\n{stderr_delim}\n') + print('\n{stderr_delim}', file=sys.stderr, flush=True) {code} |```/ diff --git a/codebraid/version.py b/codebraid/version.py index ad73495..2901fb5 100644 --- a/codebraid/version.py +++ b/codebraid/version.py @@ -1,4 +1,4 @@ # -*- coding: utf-8 -*- from .fmtversion import get_version_plus_info -__version__, __version_info__ = get_version_plus_info(0, 5, 0, 'dev', 7) +__version__, __version_info__ = get_version_plus_info(0, 5, 0, 'dev', 8)