Skip to content

Commit

Permalink
Defer most signal handling code until after the end of the current tr…
Browse files Browse the repository at this point in the history
…ansaction

Non-database actions like deleting directories, sending webhooks, scheduling
reports should only be done after the current transaction is committed. If
we do it immediately, and the transaction is later aborted, then we will
have (for example) sent a webhook about an event that didn't actually
happen.

There's also a secondary benefit to moving this action outside of the
transaction; the less time we spend inside a transaction, the better, since
a transaction may lock out other clients from working on the affected DB
rows.

In addition, prevent the affected actions from crashing the view handler
with an exception (using the `robust=True` option). I don't think it's
reasonable to (for example) return a 500 response to a `PATCH` request just
because we failed to send the corresponding webhook.

There is one more type of action that should be modified in this way
(sending events), but it would be easier to do that after a refactoring
that I did in another patch, so I'll do it later.
  • Loading branch information
SpecLad committed Feb 9, 2024
1 parent b4eed78 commit 636c4d8
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 17 deletions.
6 changes: 6 additions & 0 deletions changelog.d/20240209_171518_roman_handle_after_transaction.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Fixed

- Side effects of data changes, such as the sending of webhooks,
are no longer triggered until after the changes have been committed
to the database
(<https://github.com/opencv/cvat/pull/7460>)
18 changes: 14 additions & 4 deletions cvat/apps/analytics_report/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: MIT

from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver

Expand All @@ -19,12 +20,21 @@
)
def __save_job__update_analytics_report(instance, created, **kwargs):
if isinstance(instance, Project):
AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(project=instance)
kwargs = {"project": instance}
elif isinstance(instance, Task):
AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(task=instance)
kwargs = {"task": instance}
elif isinstance(instance, Job):
AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(job=instance)
kwargs = {"job": instance}
elif isinstance(instance, Annotation):
AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(job=instance.job)
kwargs = {"job": instance.job}
else:
assert False

def schedule_autoupdate_job():
if any(v.id is None for v in kwargs.values()):
# The object may have been deleted after the on_commit call.
return

AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(**kwargs)

transaction.on_commit(schedule_autoupdate_job, robust=True)
21 changes: 15 additions & 6 deletions cvat/apps/engine/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# Copyright (C) 2023 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT
import functools
import shutil

from django.contrib.auth.models import User
from django.db import transaction
from django.db.models.signals import post_delete, post_save
from django.dispatch import receiver

Expand Down Expand Up @@ -45,17 +47,21 @@ def __save_user_handler(instance, **kwargs):
@receiver(post_delete, sender=Project,
dispatch_uid=__name__ + ".delete_project_handler")
def __delete_project_handler(instance, **kwargs):
shutil.rmtree(instance.get_dirname(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_dirname(), ignore_errors=True))

@receiver(post_delete, sender=Asset,
dispatch_uid=__name__ + ".__delete_asset_handler")
def __delete_asset_handler(instance, **kwargs):
shutil.rmtree(instance.get_asset_dir(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_asset_dir(), ignore_errors=True))

@receiver(post_delete, sender=Task,
dispatch_uid=__name__ + ".delete_task_handler")
def __delete_task_handler(instance, **kwargs):
shutil.rmtree(instance.get_dirname(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_dirname(), ignore_errors=True))

if instance.data and not instance.data.tasks.exists():
instance.data.delete()

Expand All @@ -69,14 +75,17 @@ def __delete_task_handler(instance, **kwargs):
@receiver(post_delete, sender=Job,
dispatch_uid=__name__ + ".delete_job_handler")
def __delete_job_handler(instance, **kwargs):
shutil.rmtree(instance.get_dirname(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_dirname(), ignore_errors=True))

@receiver(post_delete, sender=Data,
dispatch_uid=__name__ + ".delete_data_handler")
def __delete_data_handler(instance, **kwargs):
shutil.rmtree(instance.get_data_dirname(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_data_dirname(), ignore_errors=True))

@receiver(post_delete, sender=CloudStorage,
dispatch_uid=__name__ + ".delete_cloudstorage_handler")
def __delete_cloudstorage_handler(instance, **kwargs):
shutil.rmtree(instance.get_storage_dirname(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_storage_dirname(), ignore_errors=True))
8 changes: 6 additions & 2 deletions cvat/apps/engine/tests/test_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,8 @@ def test_api_v2_projects_delete_project_data_after_delete_project(self):
task_dir = task.get_dirname()
self.assertTrue(os.path.exists(task_dir))

self._check_api_v2_projects_id(self.admin)
with self.captureOnCommitCallbacks(execute=True):
self._check_api_v2_projects_id(self.admin)

for project in self.projects:
project_dir = project.get_dirname()
Expand Down Expand Up @@ -2019,7 +2020,10 @@ def test_api_v2_tasks_delete_task_data_after_delete_task(self):
for task in self.tasks:
task_dir = task.get_dirname()
self.assertTrue(os.path.exists(task_dir))
self._check_api_v2_tasks_id(self.admin)

with self.captureOnCommitCallbacks(execute=True):
self._check_api_v2_tasks_id(self.admin)

for task in self.tasks:
task_dir = task.get_dirname()
self.assertFalse(os.path.exists(task_dir))
Expand Down
12 changes: 10 additions & 2 deletions cvat/apps/quality_control/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: MIT

from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver

Expand Down Expand Up @@ -37,8 +38,15 @@ def __save_job__update_quality_metrics(instance, created, **kwargs):
else:
assert False

for task in tasks:
qc.QualityReportUpdateManager().schedule_quality_autoupdate_job(task)
def schedule_autoupdate_jobs():
for task in tasks:
if task.id is None:
# The task may have been deleted after the on_commit call.
continue

qc.QualityReportUpdateManager().schedule_quality_autoupdate_job(task)

transaction.on_commit(schedule_autoupdate_jobs, robust=True)


@receiver(post_save, sender=Task, dispatch_uid=__name__ + ".save_task-initialize_quality_settings")
Expand Down
13 changes: 10 additions & 3 deletions cvat/apps/webhooks/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import requests
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.db.models.signals import (post_delete, post_save, pre_delete,
pre_save)
from django.dispatch import Signal, receiver
Expand Down Expand Up @@ -186,7 +187,10 @@ def post_save_resource_event(sender, instance, created, **kwargs):
else:
return

batch_add_to_queue(filtered_webhooks, data)
transaction.on_commit(
lambda: batch_add_to_queue(filtered_webhooks, data),
robust=True,
)


@receiver(pre_delete, sender=Project, dispatch_uid=__name__ + ":project:pre_delete")
Expand Down Expand Up @@ -232,9 +236,12 @@ def post_delete_resource_event(sender, instance, **kwargs):
"sender": get_sender(instance),
}

batch_add_to_queue(filtered_webhooks, data)
related_webhooks = [webhook for webhook in getattr(instance, "_related_webhooks", []) if webhook.id not in map(lambda a: a.id, filtered_webhooks)]
batch_add_to_queue(related_webhooks, data)

transaction.on_commit(
lambda: batch_add_to_queue(filtered_webhooks + related_webhooks, data),
robust=True,
)


@receiver(signal_redelivery)
Expand Down

0 comments on commit 636c4d8

Please sign in to comment.