Skip to content

Commit

Permalink
Add PersHelper class to handle persistence with different serializer …
Browse files Browse the repository at this point in the history
…backends.
  • Loading branch information
user committed Feb 23, 2024
1 parent 946e22f commit 7b856fa
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 99 deletions.
77 changes: 28 additions & 49 deletions src/scorep_jupyter/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,10 @@
import os
import subprocess
import re
import json
from scorep_jupyter.userpersistence import extract_definitions, extract_variables_names
from scorep_jupyter.userpersistence import PersHelper, scorep_script_name

PYTHON_EXECUTABLE = sys.executable
READ_CHUNK_SIZE = 8
userpersistence_token = "scorep_jupyter.userpersistence"
scorep_script_name = "scorep_script.py"
jupyter_dump = "jupyter_dump.pkl"
subprocess_dump = "subprocess_dump.pkl"


class ScorepPythonKernel(IPythonKernel):
implementation = 'Python and Score-P'
Expand Down Expand Up @@ -47,6 +41,8 @@ def __init__(self, **kwargs):
self.bash_script = None
self.python_script = None

self.pershelper = PersHelper('dill')

def cell_output(self, string, stream='stdout'):
"""
Display string as cell output.
Expand All @@ -62,13 +58,17 @@ def standard_reply(self):
'user_expressions': {},
}

def comm_files_cleanup(self):
def switch_serializer(self, code):
"""
Clean up files used for transmitting persistence and running subprocess.
Switch serializer backend used for persistence in kernel.
"""
for aux_file in [scorep_script_name, jupyter_dump, subprocess_dump]:
if os.path.exists(aux_file):
os.remove(aux_file)
serializer = code.split('\n')[1]
if serializer == 'dill':
self.pershelper = PersHelper('dill')
elif serializer == 'cloudpickle':
self.pershelper = PersHelper('cloudpickle')
self.cell_output(f'Serializer backend switched to {serializer}, persistence was reset.')
return self.standard_reply()

def set_scorep_env(self, code):
"""
Expand Down Expand Up @@ -217,50 +217,28 @@ async def scorep_execute(self, code, silent, store_history=True, user_expression
"""
# Ghost cell - dump current Jupyter session for subprocess
# Run in a "silent" way to not increase cells counter
dump_jupyter = "import dill\n" + f"dill.dump_session('{jupyter_dump}')"
reply_status_dump = await super().do_execute(dump_jupyter, silent, store_history=False,
reply_status_dump = await super().do_execute(self.pershelper.jupyter_dump(), silent, store_history=False,
user_expressions=user_expressions, allow_stdin=allow_stdin, cell_id=cell_id)

if reply_status_dump['status'] != 'ok':
self.shell.execution_count += 1
reply_status_dump['execution_count'] = self.shell.execution_count - 1
self.comm_files_cleanup()
self.pershelper.pers_cleanup()
self.cell_output("KernelError: Failed to pickle previous notebook's persistence and variables.",
'stderr')
return reply_status_dump

# Prepare code for the Score-P instrumented execution as subprocess
# Transmit user persistence and updated sys.path from Jupyter notebook to subprocess
# After running code, transmit subprocess persistence back to Jupyter notebook

try:
user_variables = extract_variables_names(code)
except SyntaxError as e:
self.cell_output(f"SyntaxError: {e}", 'stderr')
return self.standard_reply()

sys_path_updated = json.dumps(sys.path)
scorep_code = "import scorep\n" + \
"with scorep.instrumenter.disable():\n" + \
f" from {userpersistence_token} import save_variables_values \n" + \
" import dill\n" + \
f" globals().update(dill.load_module_asdict('{jupyter_dump}'))\n" + \
" import sys\n" + \
" sys.path.clear()\n" + \
f" sys.path.extend({sys_path_updated})\n" + \
code + "\n" + \
"with scorep.instrumenter.disable():\n" + \
f" save_variables_values(globals(), {str(user_variables)}, '{subprocess_dump}')"

with open(scorep_script_name, 'w+') as file:
file.write(scorep_code)
file.write(self.pershelper.subprocess_wrapper(code))

# Launch subprocess with Jupyter notebook environment
cmd = [PYTHON_EXECUTABLE, "-m", "scorep"] + \
self.scorep_binding_args + [scorep_script_name]
proc_env = os.environ.copy()
proc_env.update(self.scorep_env)
proc_env.update({'PYTHONUNBUFFERED': 'x'}) # subprocess observation
proc_env = self.scorep_env.copy()
proc_env.update({'PATH': os.environ['PATH'], 'PYTHONUNBUFFERED': 'x'}) # scorep path, subprocess observation

incomplete_line = ''
endline_pattern = re.compile(r'(.*?[\r\n]|.+$)')
Expand Down Expand Up @@ -293,29 +271,26 @@ async def scorep_execute(self, code, silent, store_history=True, user_expression
proc.wait()

if proc.returncode:
self.comm_files_cleanup()
self.pershelper.pers_cleanup()
self.cell_output(
'KernelError: Cell execution failed, cell persistence and variables are not recorded.',
'stderr')
return self.standard_reply()

# Ghost cell - load subprocess definitions and persistence back to Jupyter notebook
# Run in a "silent" way to not increase cells counter
load_jupyter = extract_definitions(code) + "\n" + \
f"with open('{subprocess_dump}', 'rb') as file:\n" + \
" globals().update(dill.load(file))\n"
reply_status_load = await super().do_execute(load_jupyter, silent, store_history=False,
reply_status_update = await super().do_execute(self.pershelper.jupyter_update(code), silent, store_history=False,
user_expressions=user_expressions, allow_stdin=allow_stdin, cell_id=cell_id)

if reply_status_load['status'] != 'ok':
if reply_status_update['status'] != 'ok':
self.shell.execution_count += 1
reply_status_load['execution_count'] = self.shell.execution_count - 1
self.comm_files_cleanup()
reply_status_update['execution_count'] = self.shell.execution_count - 1
self.pershelper.pers_cleanup()
self.cell_output("KernelError: Failed to load cell's persistence and variables to the notebook.",
'stderr')
return reply_status_load
return reply_status_update

self.comm_files_cleanup()
self.pershelper.pers_cleanup()
if 'SCOREP_EXPERIMENT_DIRECTORY' in self.scorep_env:
scorep_folder = self.scorep_env['SCOREP_EXPERIMENT_DIRECTORY']
self.cell_output(
Expand Down Expand Up @@ -363,6 +338,7 @@ async def do_execute(self, code, silent, store_history=False, user_expressions=N
try:
reply_status = await self.scorep_execute(self.multicell_code, silent, store_history, user_expressions, allow_stdin, cell_id=cell_id)
except:
self.cell_output("KernelError: Multicell mode failed.",'stderr')
return self.standard_reply()
self.multicell_code = ""
self.multicellmode_cellcount = 0
Expand All @@ -379,6 +355,8 @@ async def do_execute(self, code, silent, store_history=False, user_expressions=N
elif code.startswith('%%enable_multicellmode'):
return self.enable_multicellmode()

elif code.startswith('%%switch_serializer'):
return self.switch_serializer(code)
elif code.startswith('%%scorep_env'):
return self.set_scorep_env(code)
elif code.startswith('%%scorep_python_binding_arguments'):
Expand All @@ -388,6 +366,7 @@ async def do_execute(self, code, silent, store_history=False, user_expressions=N
elif code.startswith('%%execute_with_scorep'):
return await self.scorep_execute(code.split("\n", 1)[1], silent, store_history, user_expressions, allow_stdin, cell_id=cell_id)
else:
self.pershelper.parse(code, 'jupyter')
return await super().do_execute(code, silent, store_history, user_expressions, allow_stdin, cell_id=cell_id)


Expand Down
187 changes: 173 additions & 14 deletions src/scorep_jupyter/userpersistence.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,185 @@
import os
import shutil
import ast
import astunparse
from textwrap import dedent

def save_variables_values(globs, variables, filename):
"""
Dump values of given variables into the file.
"""
import dill
user_variables = {k: v for k, v in globs.items() if k in variables}
scorep_script_name = "scorep_script.py"
jupyter_dump_dir = "jupyter_dump_"
subprocess_dump_dir = "subprocess_dump_"
full_dump = "full_dump.pkl"
os_env_dump = "os_env_dump.pkl"
sys_path_dump = "sys_path_dump.pkl"
var_dump = "var_dump.pkl"

class PersHelper:
def __init__(self, serializer='dill'):
self.jupyter_definitions = ""
self.jupyter_variables = []
self.serializer = serializer
self.subprocess_definitions = ""
self.subprocess_variables = []

# FIXME
def pers_cleanup(self):
"""
Clean up files used for transmitting persistence and running subprocess.
"""
for pers_path in [scorep_script_name,
*[dirname + filename for dirname in [jupyter_dump_dir, subprocess_dump_dir]
for filename in [full_dump, os_env_dump, sys_path_dump, var_dump]]]:
if os.path.exists(pers_path):
if os.path.isfile(pers_path):
os.remove(pers_path)
elif os.path.isdir(pers_path):
shutil.rmtree(pers_path)

def jupyter_dump(self):
"""
Generate code for kernel ghost cell to dump notebook persistence for subprocess.
"""
jupyter_dump_ = dedent(f"""\
import sys
import os
import {self.serializer}
from scorep_jupyter.userpersistence import pickle_runtime, pickle_variables
pickle_runtime(os.environ, sys.path, '{jupyter_dump_dir}', {self.serializer})
""")
if self.serializer == 'dill':
return jupyter_dump_ + f"dill.dump_session('{jupyter_dump_dir + full_dump}')"
elif self.serializer == 'cloudpickle':
return jupyter_dump_ + f"pickle_variables({str(self.jupyter_variables)}, globals(), '{jupyter_dump_dir}', {self.serializer})"

def subprocess_wrapper(self, code):
"""
Extract subprocess user variables and definitions.
"""
self.parse(code, 'subprocess')

subprocess_update = dedent(f"""\
import sys
import os
import {self.serializer}
from scorep_jupyter.userpersistence import pickle_runtime, pickle_variables, load_runtime, load_variables
load_runtime(os.environ, sys.path, '{jupyter_dump_dir}', {self.serializer})
""")
if self.serializer == 'dill':
subprocess_update += f"globals().update(dill.load_module_asdict('{jupyter_dump_dir + full_dump}'))"
elif self.serializer == 'cloudpickle':
subprocess_update += (self.jupyter_definitions + f"load_variables(globals(), '{jupyter_dump_dir}', {self.serializer})")
return subprocess_update + "\n" + code + \
dedent(f"""
pickle_runtime(os.environ, sys.path, '{subprocess_dump_dir}', {self.serializer})
pickle_variables({str(self.subprocess_variables)}, globals(), '{subprocess_dump_dir}', {self.serializer})
""")

def jupyter_update(self, code):
"""
Update aggregated storage of definitions and user variables for entire notebook.
"""
self.parse(code, 'jupyter')

return dedent(f"""\
import sys
import os
from scorep_jupyter.userpersistence import load_runtime, load_variables
load_runtime(os.environ, sys.path, '{subprocess_dump_dir}', {self.serializer})
{self.subprocess_definitions}
load_variables(globals(), '{subprocess_dump_dir}', {self.serializer})
""")

def parse(self, code, mode):
"""
Extract user variables names and definitions from the code.
"""
# Code with magics and shell commands is ignored,
# unless magics are from "white list" which execute code
# in "persistent" manner.
whitelist_prefixes_cell = ['%%prun', '%%capture']
whitelist_prefixes_line = ['%prun', '%time']

nomagic_code = '' # Code to be parsed for user variables
if not code.startswith(tuple(['%', '!'])): # No IPython magics and shell commands
nomagic_code = code
elif code.startswith(tuple(whitelist_prefixes_cell)): # Cell magic & executed cell, remove first line
nomagic_code = code.split("\n", 1)[1]
elif code.startswith(tuple(whitelist_prefixes_line)): # Line magic & executed cell, remove first word
nomagic_code = code.split(" ", 1)[1]
try:
user_definitions = extract_definitions(nomagic_code)
user_variables = extract_variables_names(nomagic_code)
except SyntaxError as e:
raise

if mode == 'subprocess':
# Parse definitions and user variables from subprocess code before running it.
self.subprocess_definitions = ""
self.subprocess_variables.clear()
self.subprocess_definitions += user_definitions
self.subprocess_variables.extend(user_variables)
elif mode == "jupyter" and self.serializer == "cloudpickle":
# Update aggregated storage of definitions and user variables for entire notebook.
# Not relevant for dill because of dump_session.
self.jupyter_definitions += user_definitions
self.jupyter_variables.extend(user_variables)

def pickle_runtime(os_environ_, sys_path_, dump_dir, serializer):
os_env_dump_ = dump_dir + os_env_dump
sys_path_dump_ = dump_dir + sys_path_dump

# Don't dump environment variables set by Score-P bindings.
# Will force it to re-initialize instead of calling reset_preload()
filtered_os_environ_ = {k: v for k, v in os_environ_.items() if not k.startswith('SCOREP_PYTHON_BINDINGS_')}
with open(os_env_dump_, 'wb+') as file:
serializer.dump(filtered_os_environ_, file)
with open(sys_path_dump_, 'wb+') as file:
serializer.dump(sys_path_, file)

def pickle_variables(variables_names, globals_, dump_dir, serializer):
var_dump_ = dump_dir + var_dump
user_variables = {k: v for k, v in globals_.items() if k in variables_names}

for el in user_variables.keys():
# if possible, exchange class of the object here with the class that is stored for persistence. This is
# valid since the classes should be the same and this does not affect the objects attribute dictionary
non_persistent_class = user_variables[el].__class__.__name__
if non_persistent_class in globals().keys():
user_variables[el].__class__ = globals()[non_persistent_class]
with open(filename, 'wb+') as file:
dill.dump(user_variables, file)

with open(var_dump_, 'wb+') as file:
serializer.dump(user_variables, file)

def load_runtime(os_environ_, sys_path_, dump_dir, serializer):
os_env_dump_ = dump_dir + os_env_dump
sys_path_dump_ = dump_dir + sys_path_dump

loaded_os_environ_ = {}
loaded_sys_path_ = []

if os.path.getsize(os_env_dump_) > 0:
with open(os_env_dump_, 'rb') as file:
loaded_os_environ_ = serializer.load(file)
if os.path.getsize(sys_path_dump_) > 0:
with open(sys_path_dump_, 'rb') as file:
loaded_sys_path_ = serializer.load(file)

#os_environ_.clear()
os_environ_.update(loaded_os_environ_)

#sys_path_.clear()
sys_path_.extend(loaded_sys_path_)

def load_variables(globals_, dump_dir, serializer):
var_dump_ = dump_dir + var_dump
if os.path.getsize(var_dump_) > 0:
with open(var_dump_, 'rb') as file:
globals_.update(serializer.load(file))

def extract_definitions(code):
"""
Extract imported modules and definitions of classes and functions from the code block.
"""
# can't use in kernel as import from userpersistence:
# can't use in kernel as import from scorep_jupyter.userpersistence:
# self-reference error during dill dump of notebook
root = ast.parse(code)
definitions = []
Expand All @@ -36,17 +195,17 @@ def extract_definitions(code):
ast.ImportFrom)):
definitions.append(top_node)

pers_string = ""
definitions_string = ""
for node in definitions:
pers_string += astunparse.unparse(node)
definitions_string += astunparse.unparse(node)

return pers_string
return definitions_string

def extract_variables_names(code):
"""
Extract user-assigned variables from code.
Unlike dir(), nothing coming from the imported modules is included.
Might contain non-variables as well from assignments, which are later filtered out in save_variables_values.
Might contain non-variables as well from assignments, which are later filtered out when dumping variables.
"""
root = ast.parse(code)

Expand All @@ -63,4 +222,4 @@ def extract_variables_names(code):
if isinstance(target_node, ast.Name):
variables.add(target_node.id)

return variables
return variables
Loading

0 comments on commit 7b856fa

Please sign in to comment.