Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functions to interrupt processing in a specific thread/context #803

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions capi/geos_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ extern "C" {
geos::util::Interrupt::request();
}

void
GEOS_interruptThread()
{
geos::util::Interrupt::requestForCurrentThread();
}

void
GEOS_interruptCancel()
{
Expand Down
47 changes: 42 additions & 5 deletions capi/geos_c.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ typedef int (*GEOSTransformXYCallback)(
/* ========== Interruption ========== */

/**
* Callback function for use in interruption. The callback will be invoked _before_ checking for
* interruption, so can be used to request it.
* Callback function for use in interruption. The callback will be invoked at each
* possible interruption point and can be used to request interruption.
*
* \see GEOS_interruptRegisterCallback
* \see GEOS_interruptRequest
Expand All @@ -306,19 +306,56 @@ typedef int (*GEOSTransformXYCallback)(
typedef void (GEOSInterruptCallback)(void);

/**
* Register a function to be called when processing is interrupted.
* Callback function for use in interruption. The callback will be invoked at each
* possible interruption point and can be used to request interruption.
*
* \see GEOS_interruptRegisterThreadCallback
* \see GEOS_interruptThread
*/
typedef void (GEOSInterruptThreadCallback)(void*);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid the word thread in these new APIs and associated documentation. Maybe this one could be named GEOSContextInterruptCallback and similar for other new symbols.


/**
* Register a function to be called when a possible interruption point is reached
* on any thread. The function may be used to request interruption.
*
* \param cb Callback function to invoke
* \return the previously configured callback
* \return the previously registered callback, or NULL
* \see GEOSInterruptCallback
* \see GEOSContext_setInterruptCallback_r
*/
extern GEOSInterruptCallback GEOS_DLL *GEOS_interruptRegisterCallback(
GEOSInterruptCallback* cb);

/**
* Request safe interruption of operations
* Register a function to be called when a possible interruption point is reached
* in code executed in the specified context. The function can interrupt the
* thread if desired by calling GEOS_interruptThread.
*
* \param extHandle the context returned by \ref GEOS_init_r.
* \param cb Callback function to invoke
* \param userData optional data to be pe provided as argument to callback
* \return the previously registered callback, or NULL
* \see GEOSInterruptThreadCallback
*/
Comment on lines +330 to +339
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we say that you should not start other GEOS operations inside of the callback? I ccould imagine folks wanting to make the decision based on the result of some geos operation, but that would risk clobbering the thread_local state used to store the interrupt state.

extern GEOSInterruptThreadCallback GEOS_DLL *GEOSContext_setInterruptCallback_r(
GEOSContextHandle_t extHandle,
GEOSInterruptThreadCallback* cb,
void* userData);
Comment on lines +340 to +343
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbaston what do you think about making this callback return a value that indicates whether an interrupt is desired and eliminating GEOS_interruptThread()?


/**
* Request safe interruption of operations. The next thread to check for an
* interrupt will be interrupted. To request interruption of a specific thread,
* instead call GEOS_interruptThread() from a callback executed by that thread.
*/
extern void GEOS_DLL GEOS_interruptRequest(void);

/**
* Request safe interruption of operations in the current thread. This function
* should be called from a callback registered by GEOS_interruptRegisterThreadCallback()
* or GEOS_interruptRegisterCallback().
*/
extern void GEOS_DLL GEOS_interruptThread(void);

/**
* Cancel a pending interruption request
*/
Expand Down
94 changes: 76 additions & 18 deletions capi/geos_ts_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ typedef struct GEOSContextHandle_HS {
void* noticeData;
GEOSMessageHandler errorMessageOld;
GEOSMessageHandler_r errorMessageNew;
GEOSInterruptThreadCallback* interrupt_cb;
void* interrupt_cb_data;
void* errorData;
uint8_t WKBOutputDims;
int WKBByteOrder;
Expand All @@ -218,6 +220,8 @@ typedef struct GEOSContextHandle_HS {
noticeData(nullptr),
errorMessageOld(nullptr),
errorMessageNew(nullptr),
interrupt_cb(nullptr),
interrupt_cb_data(nullptr),
errorData(nullptr),
point2d(nullptr)
{
Expand Down Expand Up @@ -275,6 +279,15 @@ typedef struct GEOSContextHandle_HS {
return f;
}

GEOSInterruptThreadCallback*
setInterruptHandler(GEOSInterruptThreadCallback* cb, void* userData)
{
auto old = interrupt_cb;
interrupt_cb = cb;
interrupt_cb_data = userData;
return old;
}

void
NOTICE_MESSAGE(const char *fmt, ...)
{
Expand Down Expand Up @@ -375,12 +388,37 @@ gstrdup(std::string const& str)
return gstrdup_s(str.c_str(), str.size());
}

struct InterruptManager {
InterruptManager(GEOSContextHandle_t handle) :
cb(handle->interrupt_cb),
cb_data(handle->interrupt_cb_data) {
if (cb) {
geos::util::Interrupt::registerThreadCallback(cb, cb_data);
}
}

~InterruptManager() {
if (cb != nullptr) {
geos::util::Interrupt::registerThreadCallback(nullptr, nullptr);
}
}

GEOSInterruptThreadCallback* cb;
void* cb_data;
};

struct NotInterruptible {
NotInterruptible(GEOSContextHandle_t handle) {
(void) handle;
}
};

} // namespace anonymous

// Execute a lambda, using the given context handle to process errors.
// Return errval on error.
// Errval should be of the type returned by f, unless f returns a bool in which case we promote to char.
template<typename F>
template<typename InterruptManagerType=InterruptManager, typename F>
inline auto execute(
GEOSContextHandle_t extHandle,
typename std::conditional<std::is_same<decltype(std::declval<F>()()),bool>::value,
Expand All @@ -396,6 +434,8 @@ inline auto execute(
return errval;
}

InterruptManagerType ic(handle);

try {
return f();
} catch (const std::exception& e) {
Expand All @@ -409,7 +449,7 @@ inline auto execute(

// Execute a lambda, using the given context handle to process errors.
// Return nullptr on error.
template<typename F, typename std::enable_if<!std::is_void<decltype(std::declval<F>()())>::value, std::nullptr_t>::type = nullptr>
template<typename InterruptManagerType=InterruptManager, typename F, typename std::enable_if<!std::is_void<decltype(std::declval<F>()())>::value, std::nullptr_t>::type = nullptr>
inline auto execute(GEOSContextHandle_t extHandle, F&& f) -> decltype(f()) {
if (extHandle == nullptr) {
return nullptr;
Expand All @@ -420,6 +460,8 @@ inline auto execute(GEOSContextHandle_t extHandle, F&& f) -> decltype(f()) {
return nullptr;
}

InterruptManagerType ic(handle);

try {
return f();
} catch (const std::exception& e) {
Expand All @@ -433,9 +475,14 @@ inline auto execute(GEOSContextHandle_t extHandle, F&& f) -> decltype(f()) {

// Execute a lambda, using the given context handle to process errors.
// No return value.
template<typename F, typename std::enable_if<std::is_void<decltype(std::declval<F>()())>::value, std::nullptr_t>::type = nullptr>
template<typename InterruptManagerType=InterruptManager, typename F, typename std::enable_if<std::is_void<decltype(std::declval<F>()())>::value, std::nullptr_t>::type = nullptr>
inline void execute(GEOSContextHandle_t extHandle, F&& f) {
GEOSContextHandleInternal_t* handle = reinterpret_cast<GEOSContextHandleInternal_t*>(extHandle);

if (handle != nullptr) {
InterruptManagerType ic(handle);
}

try {
f();
} catch (const std::exception& e) {
Expand Down Expand Up @@ -514,6 +561,17 @@ extern "C" {
return handle->setErrorHandler(ef, userData);
}

GEOSInterruptThreadCallback*
GEOSContext_setInterruptCallback_r(GEOSContextHandle_t extHandle, GEOSInterruptThreadCallback* cb, void* userData)
{
GEOSContextHandleInternal_t* handle = reinterpret_cast<GEOSContextHandleInternal_t*>(extHandle);
if(0 == handle->initialized) {
return nullptr;
}

return handle->setInterruptHandler(cb, userData);
}

void
finishGEOS_r(GEOSContextHandle_t extHandle)
{
Expand Down Expand Up @@ -879,7 +937,7 @@ extern "C" {
int
GEOSArea_r(GEOSContextHandle_t extHandle, const Geometry* g, double* area)
{
return execute(extHandle, 0, [&]() {
return execute<NotInterruptible>(extHandle, 0, [&]() {
*area = g->getArea();
return 1;
});
Expand All @@ -888,7 +946,7 @@ extern "C" {
int
GEOSLength_r(GEOSContextHandle_t extHandle, const Geometry* g, double* length)
{
return execute(extHandle, 0, [&]() {
return execute<NotInterruptible>(extHandle, 0, [&]() {
*length = g->getLength();
return 1;
});
Expand Down Expand Up @@ -1640,7 +1698,7 @@ extern "C" {
int
GEOSGetNumInteriorRings_r(GEOSContextHandle_t extHandle, const Geometry* g1)
{
return execute(extHandle, -1, [&]() {
return execute<NotInterruptible>(extHandle, -1, [&]() {
const Polygon* p = dynamic_cast<const Polygon*>(g1);
if(!p) {
throw IllegalArgumentException("Argument is not a Polygon");
Expand All @@ -1654,7 +1712,7 @@ extern "C" {
int
GEOSGetNumGeometries_r(GEOSContextHandle_t extHandle, const Geometry* g1)
{
return execute(extHandle, -1, [&]() {
return execute<NotInterruptible>(extHandle, -1, [&]() {
return static_cast<int>(g1->getNumGeometries());
});
}
Expand All @@ -1667,7 +1725,7 @@ extern "C" {
const Geometry*
GEOSGetGeometryN_r(GEOSContextHandle_t extHandle, const Geometry* g1, int n)
{
return execute(extHandle, [&]() {
return execute<NotInterruptible>(extHandle, [&]() {
if(n < 0) {
throw IllegalArgumentException("Index must be non-negative.");
}
Expand Down Expand Up @@ -1856,7 +1914,7 @@ extern "C" {
const Geometry*
GEOSGetExteriorRing_r(GEOSContextHandle_t extHandle, const Geometry* g1)
{
return execute(extHandle, [&]() {
return execute<NotInterruptible>(extHandle, [&]() {
const Polygon* p = dynamic_cast<const Polygon*>(g1);
if(!p) {
throw IllegalArgumentException("Invalid argument (must be a Polygon)");
Expand All @@ -1872,7 +1930,7 @@ extern "C" {
const Geometry*
GEOSGetInteriorRingN_r(GEOSContextHandle_t extHandle, const Geometry* g1, int n)
{
return execute(extHandle, [&]() {
return execute<NotInterruptible>(extHandle, [&]() {
const Polygon* p = dynamic_cast<const Polygon*>(g1);
if(!p) {
throw IllegalArgumentException("Invalid argument (must be a Polygon)");
Expand Down Expand Up @@ -2573,7 +2631,7 @@ extern "C" {
GEOSCoordSeq_setOrdinate_r(GEOSContextHandle_t extHandle, CoordinateSequence* cs,
unsigned int idx, unsigned int dim, double val)
{
return execute(extHandle, 0, [&]() {
return execute<NotInterruptible>(extHandle, 0, [&]() {
cs->setOrdinate(idx, dim, val);
return 1;
});
Expand All @@ -2600,7 +2658,7 @@ extern "C" {
int
GEOSCoordSeq_setXY_r(GEOSContextHandle_t extHandle, CoordinateSequence* cs, unsigned int idx, double x, double y)
{
return execute(extHandle, 0, [&]() {
return execute<NotInterruptible>(extHandle, 0, [&]() {
cs->setAt(CoordinateXY{x, y}, idx);
return 1;
});
Expand All @@ -2609,7 +2667,7 @@ extern "C" {
int
GEOSCoordSeq_setXYZ_r(GEOSContextHandle_t extHandle, CoordinateSequence* cs, unsigned int idx, double x, double y, double z)
{
return execute(extHandle, 0, [&]() {
return execute<NotInterruptible>(extHandle, 0, [&]() {
cs->setAt(Coordinate{x, y, z}, idx);
return 1;
});
Expand All @@ -2627,7 +2685,7 @@ extern "C" {
GEOSCoordSeq_getOrdinate_r(GEOSContextHandle_t extHandle, const CoordinateSequence* cs,
unsigned int idx, unsigned int dim, double* val)
{
return execute(extHandle, 0, [&]() {
return execute<NotInterruptible>(extHandle, 0, [&]() {
*val = cs->getOrdinate(idx, dim);
return 1;
});
Expand All @@ -2654,7 +2712,7 @@ extern "C" {
int
GEOSCoordSeq_getXY_r(GEOSContextHandle_t extHandle, const CoordinateSequence* cs, unsigned int idx, double* x, double* y)
{
return execute(extHandle, 0, [&]() {
return execute<NotInterruptible>(extHandle, 0, [&]() {
auto& c = cs->getAt(idx);
*x = c.x;
*y = c.y;
Expand All @@ -2665,7 +2723,7 @@ extern "C" {
int
GEOSCoordSeq_getXYZ_r(GEOSContextHandle_t extHandle, const CoordinateSequence* cs, unsigned int idx, double* x, double* y, double* z)
{
return execute(extHandle, 0, [&]() {
return execute<NotInterruptible>(extHandle, 0, [&]() {
auto& c = cs->getAt(idx);
*x = c.x;
*y = c.y;
Expand All @@ -2677,7 +2735,7 @@ extern "C" {
int
GEOSCoordSeq_getSize_r(GEOSContextHandle_t extHandle, const CoordinateSequence* cs, unsigned int* size)
{
return execute(extHandle, 0, [&]() {
return execute<NotInterruptible>(extHandle, 0, [&]() {
const std::size_t sz = cs->getSize();
*size = static_cast<unsigned int>(sz);
return 1;
Expand All @@ -2687,7 +2745,7 @@ extern "C" {
int
GEOSCoordSeq_getDimensions_r(GEOSContextHandle_t extHandle, const CoordinateSequence* cs, unsigned int* dims)
{
return execute(extHandle, 0, [&]() {
return execute<NotInterruptible>(extHandle, 0, [&]() {
const std::size_t dim = cs->getDimension();
*dims = static_cast<unsigned int>(dim);

Expand Down
Loading