Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock and DELETE/UPDATE unindexed datasets #3620

Merged
merged 5 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 70 additions & 36 deletions lib/pbench/server/api/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
MetadataBadKey,
MetadataError,
OperationName,
OperationState,
)
from pbench.server.database.models.server_settings import ServerSetting
from pbench.server.database.models.users import User
from pbench.server.sync import Sync


class APIAbort(Exception):
Expand Down Expand Up @@ -1419,6 +1421,51 @@ class UriBase:
host_value: str


@dataclass
class AuditContext:
"""Manage API audit context"""

audit: Optional[Audit] = None
webbnh marked this conversation as resolved.
Show resolved Hide resolved
finalize: bool = True
status: AuditStatus = AuditStatus.SUCCESS
reason: Optional[AuditReason] = None
attributes: Optional[JSONOBJECT] = None

def add_attribute(self, key: str, value: Any):
"""Add a single audit attribute

Args:
key: key name
value: key value
"""
if self.attributes is None:
self.attributes = {key: value}
else:
self.attributes[key] = value
webbnh marked this conversation as resolved.
Show resolved Hide resolved

def add_attributes(self, attr: JSONOBJECT):
"""Add multiple audit attributes as a JSON dict

Args:
attr: a JSON dict
"""
if self.attributes is None:
self.attributes = attr
else:
self.attributes.update(attr)

def set_error(self, error: str, reason: Optional[AuditReason] = None):
"""Set an audit error

Args:
error: error string
reason: audit failure reason
"""
self.add_attribute("error", error)
self.status = AuditStatus.FAILURE
self.reason = reason


class ApiBase(Resource):
"""A base class for Pbench queries that provides common parameter handling
behavior for specialized subclasses.
Expand Down Expand Up @@ -2031,67 +2078,54 @@ def _dispatch(
# wants to emit a special audit sequence it can disable "finalize"
# in the context. It can also pass "attributes" by setting that
# field.
auditing = {
"audit": audit,
"finalize": bool(audit),
webbnh marked this conversation as resolved.
Show resolved Hide resolved
"status": AuditStatus.SUCCESS,
"reason": None,
"attributes": None,
}

auditing = AuditContext(audit=audit)
context = {
"auditing": auditing,
"attributes": schema.attributes,
"raw_params": raw_params,
"sync": None,
}

response = None
sync_message = None
try:
response = execute(params, request, context)
except APIInternalError as e:
current_app.logger.exception("{} {}", api_name, e.details)
auditing.set_error(str(e), AuditReason.INTERNAL)
sync_message = str(e)
abort(e.http_status, message=str(e))
except APIAbort as e:
current_app.logger.warning(
"{} client error {}: '{}'", api_name, e.http_status, e
)
if auditing["finalize"]:
attr = auditing.get("attributes", {"message": str(e)})
try:
Audit.create(
root=auditing["audit"],
status=AuditStatus.FAILURE,
reason=auditing["reason"],
attributes=attr,
)
except Exception:
current_app.logger.error(
"Unexpected exception on audit: {}", auditing
)
auditing.set_error(str(e))
sync_message = str(e)
abort(e.http_status, message=str(e), **e.kwargs)
except Exception as e:
x = APIInternalError("Unexpected exception")
x.__cause__ = e
current_app.logger.exception(
"Exception {} API error: {}: {!r}", api_name, x, auditing
)
if auditing["finalize"]:
attr = auditing.get("attributes", {})
attr["message"] = str(e)
auditing.set_error(str(e), AuditReason.INTERNAL)
sync_message = str(e)
abort(x.http_status, message=x.message)
webbnh marked this conversation as resolved.
Show resolved Hide resolved
finally:
# If the operation created a Sync object, it will have been updated
# and removed unless the operation failed. This means we're here
# because of an exception, and one of the handlers has set an
# appropriate message to record in the operations table.
sync: Optional[Sync] = context.get("sync")
if sync:
sync.update(dataset, OperationState.FAILED, message=sync_message)
if auditing.audit and auditing.finalize:
Audit.create(
root=auditing["audit"],
status=AuditStatus.FAILURE,
reason=AuditReason.INTERNAL,
attributes=attr,
root=auditing.audit,
status=auditing.status,
reason=auditing.reason,
attributes=auditing.attributes,
)
abort(x.http_status, message=x.message)
if auditing["finalize"]:
Audit.create(
root=auditing["audit"],
status=auditing["status"],
reason=auditing["reason"],
attributes=auditing["attributes"],
)
return response

def _get(self, args: ApiParams, req: Request, context: ApiContext) -> Response:
Expand Down
7 changes: 4 additions & 3 deletions lib/pbench/server/api/resources/api_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ def _post(self, params: ApiParams, req: Request, context: ApiContext) -> Respons
status = HTTPStatus.OK
except Exception as e:
raise APIInternalError(str(e)) from e
context["auditing"]["attributes"] = key.as_json()
response = jsonify(key.as_json())
result = key.as_json()
context["auditing"].add_attributes(result)
response = jsonify(result)
response.status_code = status
return response

Expand Down Expand Up @@ -162,7 +163,7 @@ def _delete(self, params: ApiParams, req: Request, context: ApiContext) -> Respo
raise APIAbort(HTTPStatus.NOT_FOUND, "Requested key not found")
key = keys[0]
try:
context["auditing"]["attributes"] = key.as_json()
context["auditing"].add_attributes(key.as_json())
key.delete()
return "deleted", HTTPStatus.OK
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion lib/pbench/server/api/resources/datasets_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _put(self, params: ApiParams, req: Request, context: ApiContext) -> Response
dataset = params.uri["dataset"]
metadata = params.body["metadata"]

context["auditing"]["attributes"] = {"updated": metadata}
context["auditing"].add_attribute("updated", metadata)
webbnh marked this conversation as resolved.
Show resolved Hide resolved

# Validate the authenticated user's authorization for the combination
# of "owner" and "access".
Expand Down
40 changes: 18 additions & 22 deletions lib/pbench/server/api/resources/query_apis/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ParamType,
)
from pbench.server.api.resources.query_apis import ElasticBase
from pbench.server.database.models.datasets import Dataset, Metadata
from pbench.server.database.models.datasets import Dataset
from pbench.server.database.models.index_map import IndexMap
from pbench.server.database.models.templates import Template

Expand Down Expand Up @@ -95,40 +95,36 @@ def get_index(
) -> str:
"""Retrieve ES indices based on a given root_index_name.

Datasets marked "archiveonly" aren't indexed, and can't be referenced
in most APIs that rely on Elasticsearch. Instead, we'll raise a
CONFLICT error.
Datasets without an index can't be referenced in most APIs that rely on
Elasticsearch. Instead, we'll raise a CONFLICT error. However, the
/api/v1/datasets API will specify ok_no_index as they need to operate
on the dataset regardless of whether indexing is enabled.
webbnh marked this conversation as resolved.
Show resolved Hide resolved

All indices are returned if root_index_name is omitted.

Args:
dataset: dataset object
root_index_name: A root index name like "run-data"
ok_no_index: Don't fail on an archiveonly dataset
ok_no_index: Don't fail if dataset has no indices

Raises:
APIAbort(CONFLICT) if indexing was disabled on the target dataset.
APIAbort(NOT_FOUND) if the dataset has no matching index data
APIAbort(NOT_FOUND) if index is required and the dataset has none

Returns:
A string that joins all selected indices with ",", suitable for use
in an Elasticsearch query URI.
"""

archive_only = Metadata.getvalue(dataset, Metadata.SERVER_ARCHIVE)
if archive_only:
if ok_no_index:
return ""
raise APIAbort(HTTPStatus.CONFLICT, "Dataset indexing was disabled")

index_keys = list(IndexMap.indices(dataset, root_index_name))
index_keys = IndexMap.indices(dataset, root_index_name)
if index_keys:
return ",".join(index_keys)
if ok_no_index:
return ""

if not index_keys:
raise APIAbort(
HTTPStatus.NOT_FOUND,
f"Dataset has no {root_index_name if root_index_name else 'indexed'!r} data",
)

indices = ",".join(index_keys)
return indices
raise APIAbort(
HTTPStatus.NOT_FOUND,
f"Dataset has no {root_index_name if root_index_name else 'indexed'!r} data",
)

def get_aggregatable_fields(
self, mappings: JSON, prefix: AnyStr = "", result: Union[List, None] = None
Expand Down
Loading
Loading