Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add main_thread_only execmodel #243

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
2.1.0 (UNRELEASED)
------------------

* `#243 <https://github.com/pytest-dev/execnet/pull/243>`__: Added ``main_thread_only``
execmodel which is derived from the thread execmodel and only executes ``remote_exec``
calls in the main thread.

Callers of ``remote_exec`` must use the returned channel to wait for a task to complete
before they call remote_exec again, otherwise the ``remote_exec`` call will fail with a
``concurrent remote_exec would cause deadlock`` error. The main_thread_only execmodel
provides solutions for `#96 <https://github.com/pytest-dev/execnet/issues/96>`__ and
`pytest-dev/pytest-xdist#620 <https://github.com/pytest-dev/pytest-xdist/issues/620>`__
(pending a new `pytest-xdist` release).

Also fixed ``init_popen_io`` to use ``closefd=False`` for shared stdin and stdout file
zmedico marked this conversation as resolved.
Show resolved Hide resolved
descriptors, preventing ``Bad file descriptor`` errors triggered by test_stdouterrin_setnull.
* Removed support for Python 3.7.
* Added official support for Python 3.12.


2.0.2 (2023-07-09)
------------------

Expand Down
10 changes: 5 additions & 5 deletions doc/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ processes then you often want to call ``group.terminate()``
yourself and specify a larger or not timeout.


threading models: gevent, eventlet, thread
===========================================
threading models: gevent, eventlet, thread, main_thread_only
====================================================================

.. versionadded:: 1.2 (status: experimental!)

execnet supports "thread", "eventlet" and "gevent" as thread models
on each of the two sides. You need to decide which model to use
before you create any gateways::
execnet supports "main_thread_only", "thread", "eventlet" and "gevent"
as thread models on each of the two sides. You need to decide which
model to use before you create any gateways::

# content of threadmodel.py
import execnet
Expand Down
79 changes: 66 additions & 13 deletions src/execnet/gateway_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def sleep(self, delay):
raise NotImplementedError()

@abc.abstractmethod
def fdopen(self, fd, mode, bufsize=1):
def fdopen(self, fd, mode, bufsize=1, closefd=True):
raise NotImplementedError()

@abc.abstractmethod
Expand Down Expand Up @@ -113,10 +113,10 @@ def start(self, func, args=()):

return _thread.start_new_thread(func, args)

def fdopen(self, fd, mode, bufsize=1):
def fdopen(self, fd, mode, bufsize=1, closefd=True):
import os

return os.fdopen(fd, mode, bufsize, encoding="utf-8")
return os.fdopen(fd, mode, bufsize, encoding="utf-8", closefd=closefd)

def Lock(self):
import threading
Expand All @@ -134,6 +134,10 @@ def Event(self):
return threading.Event()


class MainThreadOnlyExecModel(ThreadExecModel):
backend = "main_thread_only"


class EventletExecModel(ExecModel):
backend = "eventlet"

Expand Down Expand Up @@ -170,10 +174,10 @@ def start(self, func, args=()):

return eventlet.spawn_n(func, *args)

def fdopen(self, fd, mode, bufsize=1):
def fdopen(self, fd, mode, bufsize=1, closefd=True):
import eventlet.green.os

return eventlet.green.os.fdopen(fd, mode, bufsize)
return eventlet.green.os.fdopen(fd, mode, bufsize, closefd=closefd)

def Lock(self):
import eventlet.green.threading
Expand Down Expand Up @@ -227,11 +231,11 @@ def start(self, func, args=()):

return gevent.spawn(func, *args)

def fdopen(self, fd, mode, bufsize=1):
def fdopen(self, fd, mode, bufsize=1, closefd=True):
# XXX
import gevent.fileobject

return gevent.fileobject.FileObjectThread(fd, mode, bufsize)
return gevent.fileobject.FileObjectThread(fd, mode, bufsize, closefd=closefd)

def Lock(self):
import gevent.lock
Expand All @@ -254,6 +258,8 @@ def get_execmodel(backend):
return backend
if backend == "thread":
return ThreadExecModel()
elif backend == "main_thread_only":
return MainThreadOnlyExecModel()
elif backend == "eventlet":
return EventletExecModel()
elif backend == "gevent":
Expand Down Expand Up @@ -322,7 +328,7 @@ def __init__(self, execmodel, hasprimary=False):
self._shuttingdown = False
self._waitall_events = []
if hasprimary:
if self.execmodel.backend != "thread":
if self.execmodel.backend not in ("thread", "main_thread_only"):
raise ValueError("hasprimary=True requires thread model")
self._primary_thread_task_ready = self.execmodel.Event()
else:
Expand All @@ -332,7 +338,7 @@ def integrate_as_primary_thread(self):
"""integrate the thread with which we are called as a primary
thread for executing functions triggered with spawn().
"""
assert self.execmodel.backend == "thread", self.execmodel
assert self.execmodel.backend in ("thread", "main_thread_only"), self.execmodel
primary_thread_task_ready = self._primary_thread_task_ready
# interacts with code at REF1
while 1:
Expand All @@ -345,7 +351,11 @@ def integrate_as_primary_thread(self):
with self._running_lock:
if self._shuttingdown:
break
primary_thread_task_ready.clear()
# Only clear if _try_send_to_primary_thread has not
# yet set the next self._primary_thread_task reply
# after waiting for this one to complete.
if reply is self._primary_thread_task:
primary_thread_task_ready.clear()

def trigger_shutdown(self):
with self._running_lock:
Expand Down Expand Up @@ -376,6 +386,19 @@ def _try_send_to_primary_thread(self, reply):
# wake up primary thread
primary_thread_task_ready.set()
return True
elif (
self.execmodel.backend == "main_thread_only"
and self._primary_thread_task is not None
):
self._primary_thread_task.waitfinish()
self._primary_thread_task = reply
# wake up primary thread (it's okay if this is already set
# because we waited for the previous task to finish above
# and integrate_as_primary_thread will not clear it when
# it enters self._running_lock if it detects that a new
# task is available)
primary_thread_task_ready.set()
return True
zmedico marked this conversation as resolved.
Show resolved Hide resolved
return False

def spawn(self, func, *args, **kwargs):
Expand Down Expand Up @@ -857,6 +880,9 @@ def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False):

ENDMARKER = object()
INTERRUPT_TEXT = "keyboard-interrupted"
MAIN_THREAD_ONLY_DEADLOCK_TEXT = (
"concurrent remote_exec would cause deadlock for main_thread_only execmodel"
)


class ChannelFactory:
Expand Down Expand Up @@ -1105,6 +1131,20 @@ def join(self, timeout=None):

class WorkerGateway(BaseGateway):
def _local_schedulexec(self, channel, sourcetask):
if self._execpool.execmodel.backend == "main_thread_only":
# It's necessary to wait for a short time in order to ensure
# that we do not report a false-positive deadlock error, since
# channel close does not elicit a response that would provide
# a guarantee to remote_exec callers that the previous task
# has released the main thread. If the timeout expires then it
# should be practically impossible to report a false-positive.
if not self._executetask_complete.wait(timeout=1):
channel.close(MAIN_THREAD_ONLY_DEADLOCK_TEXT)
return
# It's only safe to clear here because the above wait proves
# that there is not a previous task about to set it again.
self._executetask_complete.clear()

sourcetask = loads_internal(sourcetask)
self._execpool.spawn(self.executetask, (channel, sourcetask))

Expand Down Expand Up @@ -1132,8 +1172,14 @@ def serve(self):
def trace(msg):
self._trace("[serve] " + msg)

hasprimary = self.execmodel.backend == "thread"
hasprimary = self.execmodel.backend in ("thread", "main_thread_only")
self._execpool = WorkerPool(self.execmodel, hasprimary=hasprimary)
self._executetask_complete = None
if self.execmodel.backend == "main_thread_only":
self._executetask_complete = self.execmodel.Event()
# Initialize state to indicate that there is no previous task
# executing so that we don't need a separate flag to track this.
self._executetask_complete.set()
trace("spawning receiver thread")
self._initreceive()
try:
Expand Down Expand Up @@ -1176,6 +1222,11 @@ def executetask(self, item):
return
self._trace("ignoring EOFError because receiving finished")
channel.close()
if self._executetask_complete is not None:
# Indicate that this task has finished executing, meaning
# that there is no possibility of it triggering a deadlock
# for the next spawn call.
self._executetask_complete.set()


#
Expand Down Expand Up @@ -1631,8 +1682,10 @@ def init_popen_io(execmodel):
os.dup2(fd, 2)
os.close(fd)
io = Popen2IO(stdout, stdin, execmodel)
sys.stdin = execmodel.fdopen(0, "r", 1)
sys.stdout = execmodel.fdopen(1, "w", 1)
# Use closefd=False since 0 and 1 are shared with
# sys.__stdin__ and sys.__stdout__.
sys.stdin = execmodel.fdopen(0, "r", 1, closefd=False)
sys.stdout = execmodel.fdopen(1, "w", 1, closefd=False)
return io
zmedico marked this conversation as resolved.
Show resolved Hide resolved


Expand Down
2 changes: 1 addition & 1 deletion src/execnet/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def makegateway(self, spec=None):

id=<string> specifies the gateway id
python=<path> specifies which python interpreter to execute
execmodel=model 'thread', 'eventlet', 'gevent' model for execution
execmodel=model 'thread', 'main_thread_only', 'eventlet', 'gevent' model for execution
chdir=<path> specifies to which directory to change
nice=<path> specifies process priority of new process
env:NAME=value specifies a remote environment variable setting.
Expand Down
8 changes: 5 additions & 3 deletions testing/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def anypython(request):
pytest.skip(f"no {name} found")
if "execmodel" in request.fixturenames and name != "sys.executable":
backend = request.getfixturevalue("execmodel").backend
if backend != "thread":
if backend not in ("thread", "main_thread_only"):
pytest.xfail(f"cannot run {backend!r} execmodel with bare {name}")
return executable

Expand Down Expand Up @@ -173,9 +173,11 @@ def gw(request, execmodel, group):
return gw


@pytest.fixture(params=["thread", "eventlet", "gevent"], scope="session")
@pytest.fixture(
params=["thread", "main_thread_only", "eventlet", "gevent"], scope="session"
)
def execmodel(request):
if request.param != "thread":
if request.param not in ("thread", "main_thread_only"):
pytest.importorskip(request.param)
if request.param in ("eventlet", "gevent") and sys.platform == "win32":
pytest.xfail(request.param + " does not work on win32")
Expand Down
25 changes: 19 additions & 6 deletions testing/test_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,25 @@ class Arg:

@pytest.mark.skipif("not hasattr(os, 'dup')")
def test_stdouterrin_setnull(execmodel, capfd):
gateway_base.init_popen_io(execmodel)
os.write(1, b"hello")
os.read(0, 1)
out, err = capfd.readouterr()
assert not out
assert not err
# Backup and restore stdin state, and rely on capfd to handle
# this for stdout and stderr.
orig_stdin = sys.stdin
orig_stdin_fd = os.dup(0)
try:
# The returned Popen2IO instance can be garbage collected
# prematurely since we don't hold a reference here, but we
# tolerate this because it is intended to leave behind a
# sane state afterwards.
gateway_base.init_popen_io(execmodel)
os.write(1, b"hello")
os.read(0, 1)
out, err = capfd.readouterr()
assert not out
assert not err
finally:
sys.stdin = orig_stdin
os.dup2(orig_stdin_fd, 0)
os.close(orig_stdin_fd)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I run the full test suite in a local venv including eventlet and gevent, for some reason without this stdin restoration test_stdouterrin_setnull tends trigger the "Bad file descriptor" error just for gevent which runs last. I don't understand why that happens even with closefd=False in init_popen_io, but anyway this stdin restoration appears to prevent it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are some occasional issues with coordination of gevent - the patching of stdio with fdopen of gevent is trouble in some cases

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in particular in the cases where it collides with pytest stdio capture



class PseudoChannel:
Expand Down
79 changes: 79 additions & 0 deletions testing/test_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,3 +525,82 @@ def sendback(channel):
if interleave_getstatus:
print(gw.remote_status())
assert ch.receive(timeout=0.5) == 1234


def test_assert_main_thread_only(execmodel, makegateway):
if execmodel.backend != "main_thread_only":
pytest.skip("can only run with main_thread_only")

gw = makegateway(spec=f"execmodel={execmodel.backend}//popen")

try:
# Submit multiple remote_exec requests in quick succession and
# assert that all tasks execute in the main thread. It is
# necessary to call receive on each channel before the next
# remote_exec call, since the channel will raise an error if
# concurrent remote_exec requests are submitted as in
# test_main_thread_only_concurrent_remote_exec_deadlock.
for i in range(10):
ch = gw.remote_exec(
"""
import time, threading
time.sleep(0.02)
channel.send(threading.current_thread() is threading.main_thread())
"""
)

try:
res = ch.receive()
finally:
ch.close()
# This doesn't actually block because we closed
# the channel already, but it does check for remote
# errors and raise them.
ch.waitclose()
zmedico marked this conversation as resolved.
Show resolved Hide resolved
if res is not True:
pytest.fail("remote raised\n%s" % res)
finally:
gw.exit()
gw.join()


def test_main_thread_only_concurrent_remote_exec_deadlock(execmodel, makegateway):
if execmodel.backend != "main_thread_only":
pytest.skip("can only run with main_thread_only")

gw = makegateway(spec=f"execmodel={execmodel.backend}//popen")
channels = []
try:
# Submit multiple remote_exec requests in quick succession and
# assert that MAIN_THREAD_ONLY_DEADLOCK_TEXT is raised if
# concurrent remote_exec requests are submitted for the
# main_thread_only execmodel (as compensation for the lack of
# back pressure in remote_exec calls which do not attempt to
# block until the remote main thread is idle).
for i in range(2):
channels.append(
gw.remote_exec(
"""
import threading
channel.send(threading.current_thread() is threading.main_thread())
# Wait forever, ensuring that the deadlock case triggers.
channel.gateway.execmodel.Event().wait()
"""
)
)

expected_results = (
True,
execnet.gateway_base.MAIN_THREAD_ONLY_DEADLOCK_TEXT,
)
for expected, ch in zip(expected_results, channels):
try:
res = ch.receive()
except execnet.RemoteError as e:
res = e.formatted
assert res == expected
finally:
for ch in channels:
ch.close()
gw.exit()
gw.join()
Loading
Loading