Skip to content

Commit

Permalink
Add main_thread_only execmodel
Browse files Browse the repository at this point in the history
In order to prevent tasks from running in a non-main thread,
wait for the previous task inside _try_send_to_primary_thread,
then schedule the next task. Add a main_thread_only execmodel
to distinguish this new behavior from the existing thread
execmodel, since users of the thread execmodel expect that
tasks can run in multiple threads concurrently.
  • Loading branch information
zmedico committed Feb 17, 2024
1 parent 372168e commit 180d3ea
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 16 deletions.
10 changes: 5 additions & 5 deletions doc/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ processes then you often want to call ``group.terminate()``
yourself and specify a larger or not timeout.


threading models: gevent, eventlet, thread
===========================================
threading models: gevent, eventlet, thread, main_thread_only
====================================================================

.. versionadded:: 1.2 (status: experimental!)

execnet supports "thread", "eventlet" and "gevent" as thread models
on each of the two sides. You need to decide which model to use
before you create any gateways::
execnet supports "main_thread_only", "thread", "eventlet" and "gevent"
as thread models on each of the two sides. You need to decide which
model to use before you create any gateways::

# content of threadmodel.py
import execnet
Expand Down
31 changes: 27 additions & 4 deletions src/execnet/gateway_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ def Event(self):
return threading.Event()


class MainThreadOnlyExecModel(ThreadExecModel):
backend = "main_thread_only"


class EventletExecModel(ExecModel):
backend = "eventlet"

Expand Down Expand Up @@ -254,6 +258,8 @@ def get_execmodel(backend):
return backend
if backend == "thread":
return ThreadExecModel()
elif backend == "main_thread_only":
return MainThreadOnlyExecModel()
elif backend == "eventlet":
return EventletExecModel()
elif backend == "gevent":
Expand Down Expand Up @@ -322,7 +328,7 @@ def __init__(self, execmodel, hasprimary=False):
self._shuttingdown = False
self._waitall_events = []
if hasprimary:
if self.execmodel.backend != "thread":
if self.execmodel.backend not in ("thread", "main_thread_only"):
raise ValueError("hasprimary=True requires thread model")
self._primary_thread_task_ready = self.execmodel.Event()
else:
Expand All @@ -332,7 +338,7 @@ def integrate_as_primary_thread(self):
"""integrate the thread with which we are called as a primary
thread for executing functions triggered with spawn().
"""
assert self.execmodel.backend == "thread", self.execmodel
assert self.execmodel.backend in ("thread", "main_thread_only"), self.execmodel
primary_thread_task_ready = self._primary_thread_task_ready
# interacts with code at REF1
while 1:
Expand All @@ -345,7 +351,11 @@ def integrate_as_primary_thread(self):
with self._running_lock:
if self._shuttingdown:
break
primary_thread_task_ready.clear()
# Only clear if _try_send_to_primary_thread has not
# yet set the next self._primary_thread_task reply
# after waiting for this one to complete.
if reply is self._primary_thread_task:
primary_thread_task_ready.clear()

def trigger_shutdown(self):
with self._running_lock:
Expand Down Expand Up @@ -376,6 +386,19 @@ def _try_send_to_primary_thread(self, reply):
# wake up primary thread
primary_thread_task_ready.set()
return True
elif (
self.execmodel.backend == "main_thread_only"
and self._primary_thread_task is not None
):
self._primary_thread_task.waitfinish()
self._primary_thread_task = reply
# wake up primary thread (it's okay if this is already set
# because we waited for the previous task to finish above
# and integrate_as_primary_thread will not clear it when
# it enters self._running_lock if it detects that a new
# task is available)
primary_thread_task_ready.set()
return True
return False

def spawn(self, func, *args, **kwargs):
Expand Down Expand Up @@ -1132,7 +1155,7 @@ def serve(self):
def trace(msg):
self._trace("[serve] " + msg)

hasprimary = self.execmodel.backend == "thread"
hasprimary = self.execmodel.backend in ("thread", "main_thread_only")
self._execpool = WorkerPool(self.execmodel, hasprimary=hasprimary)
trace("spawning receiver thread")
self._initreceive()
Expand Down
2 changes: 1 addition & 1 deletion src/execnet/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def makegateway(self, spec=None):
id=<string> specifies the gateway id
python=<path> specifies which python interpreter to execute
execmodel=model 'thread', 'eventlet', 'gevent' model for execution
execmodel=model 'thread', 'main_thread_only', 'eventlet', 'gevent' model for execution
chdir=<path> specifies to which directory to change
nice=<path> specifies process priority of new process
env:NAME=value specifies a remote environment variable setting.
Expand Down
8 changes: 5 additions & 3 deletions testing/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def anypython(request):
pytest.skip(f"no {name} found")
if "execmodel" in request.fixturenames and name != "sys.executable":
backend = request.getfixturevalue("execmodel").backend
if backend != "thread":
if backend not in ("thread", "main_thread_only"):
pytest.xfail(f"cannot run {backend!r} execmodel with bare {name}")
return executable

Expand Down Expand Up @@ -173,9 +173,11 @@ def gw(request, execmodel, group):
return gw


@pytest.fixture(params=["thread", "eventlet", "gevent"], scope="session")
@pytest.fixture(
params=["thread", "main_thread_only", "eventlet", "gevent"], scope="session"
)
def execmodel(request):
if request.param != "thread":
if request.param not in ("thread", "main_thread_only"):
pytest.importorskip(request.param)
if request.param in ("eventlet", "gevent") and sys.platform == "win32":
pytest.xfail(request.param + " does not work on win32")
Expand Down
30 changes: 30 additions & 0 deletions testing/test_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,3 +525,33 @@ def sendback(channel):
if interleave_getstatus:
print(gw.remote_status())
assert ch.receive(timeout=0.5) == 1234


def test_assert_main_thread_only(execmodel, makegateway):
if execmodel.backend != "main_thread_only":
pytest.skip("can only run with main_thread_only")

gw = makegateway(spec=f"execmodel={execmodel.backend}//popen")

# Create multiple channels at once and assert that all tasks
# execute in the main thread.
channels = []
for i in range(10):
channels.append(
gw.remote_exec(
"""
import time, threading
time.sleep(0.02)
channel.send(threading.current_thread() is threading.main_thread())
"""
)
)

for ch in channels:
res = ch.receive()
ch.close()
if res != True:
pytest.fail("remote raised\n%s" % res)

gw.exit()
gw.join()
2 changes: 1 addition & 1 deletion testing/test_termination.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def doit():


def test_endmarker_delivery_on_remote_killterm(makegateway, execmodel):
if execmodel.backend != "thread":
if execmodel.backend not in ("thread", "main_thread_only"):
pytest.xfail("test and execnet not compatible to greenlets yet")
gw = makegateway("popen")
q = execmodel.queue.Queue()
Expand Down
4 changes: 2 additions & 2 deletions testing/test_threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def wait_then_put():


def test_primary_thread_integration(execmodel):
if execmodel.backend != "thread":
if execmodel.backend not in ("thread", "main_thread_only"):
with pytest.raises(ValueError):
WorkerPool(execmodel=execmodel, hasprimary=True)
return
Expand All @@ -188,7 +188,7 @@ def func():


def test_primary_thread_integration_shutdown(execmodel):
if execmodel.backend != "thread":
if execmodel.backend not in ("thread", "main_thread_only"):
pytest.skip("can only run with threading")
pool = WorkerPool(execmodel=execmodel, hasprimary=True)
queue = execmodel.queue.Queue()
Expand Down

0 comments on commit 180d3ea

Please sign in to comment.