Skip to content

Commit

Permalink
Fix celery configuration not being applied sometimes
Browse files Browse the repository at this point in the history
The module dispatching the celery tasks did not import
our celery worker module, which resulted in the celery configuration not
being applied in some cases, e.g. when using uvicorn's hot reload
functionality.

By referencing the celery tasks directly, we make sure that the celery
config used for defining the tasks is also used for dispatching them.

See https://docs.celeryq.dev/en/stable/userguide/application.html#breaking-the-chain
for more information.
  • Loading branch information
raffomania committed Nov 16, 2023
1 parent 02880dd commit 7fc48f1
Showing 1 changed file with 17 additions and 42 deletions.
59 changes: 17 additions & 42 deletions backend/src/app/celery/background_jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,36 @@
from typing import Any, List

# noinspection PyUnresolvedReferences,PyProtectedMember
from celery.canvas import Signature

from app.celery.background_jobs.tasks import (
execute_audio_preprocessing_pipeline_task,
execute_image_preprocessing_pipeline_task,
execute_text_preprocessing_pipeline_task,
execute_video_preprocessing_pipeline_task,
import_uploaded_archive,
start_crawler_job,
start_export_job,
)
from app.core.data.crawler.crawler_service import CrawlerService
from app.core.data.dto.crawler_job import CrawlerJobParameters, CrawlerJobRead
from app.core.data.dto.export_job import ExportJobParameters, ExportJobRead
from app.core.data.export.export_service import ExportService
from app.preprocessing.pipeline.model.pipeline_cargo import PipelineCargo

import_uploaded_archive_task = (
"app.celery.background_jobs.tasks.import_uploaded_archive"
)
start_export_job_task = "app.celery.background_jobs.tasks.start_export_job"
start_crawler_job_task = "app.celery.background_jobs.tasks.start_crawler_job"

execute_text_preprocessing_pipeline_task = (
"app.celery.background_jobs.tasks.execute_text_preprocessing_pipeline_task"
)
execute_image_preprocessing_pipeline_task = (
"app.celery.background_jobs.tasks.execute_image_preprocessing_pipeline_task"
)
execute_audio_preprocessing_pipeline_task = (
"app.celery.background_jobs.tasks.execute_audio_preprocessing_pipeline_task"
)
execute_video_preprocessing_pipeline_task = (
"app.celery.background_jobs.tasks.execute_video_preprocessing_pipeline_task"
)


def import_uploaded_archive_apply_async(
archive_file_path: Path, project_id: int
) -> Any:
archive_preprocessing = Signature(
import_uploaded_archive_task,
return import_uploaded_archive.apply_async(
kwargs={"archive_file_path_and_project_id": (archive_file_path, project_id)},
)
return archive_preprocessing.apply_async()


def prepare_and_start_export_job_async(
export_params: ExportJobParameters,
) -> ExportJobRead:
exs: ExportService = ExportService()
ex_job = exs.prepare_export_job(export_params)
start_export_job = Signature(start_export_job_task, kwargs={"export_job": ex_job})
start_export_job.apply_async()

return ex_job
return start_export_job.apply_async(kwargs={"export_job": ex_job})


def prepare_and_start_crawling_job_async(
Expand All @@ -58,11 +41,11 @@ def prepare_and_start_crawling_job_async(
cj = cs.prepare_crawler_job(crawler_params)
start_export_job = (
# crawl the data via scrapy and zip the data
Signature(start_crawler_job_task, kwargs={"crawler_job": cj})
start_crawler_job.signature(kwargs={"crawler_job": cj})
|
# import the zip
# TODO create a PPJ for the import
Signature(import_uploaded_archive_task)
import_uploaded_archive.signature()
)
start_export_job.apply_async()

Expand All @@ -73,33 +56,25 @@ def execute_text_preprocessing_pipeline_apply_async(
cargos: List[PipelineCargo],
) -> None:
for cargo in cargos:
Signature(
execute_text_preprocessing_pipeline_task, kwargs={"cargo": cargo}
).apply_async()
execute_text_preprocessing_pipeline_task.apply_async(kwargs={"cargo": cargo})


def execute_image_preprocessing_pipeline_apply_async(
cargos: List[PipelineCargo],
) -> None:
for cargo in cargos:
Signature(
execute_image_preprocessing_pipeline_task, kwargs={"cargo": cargo}
).apply_async()
execute_image_preprocessing_pipeline_task.apply_async(kwargs={"cargo": cargo})


def execute_audio_preprocessing_pipeline_apply_async(
cargos: List[PipelineCargo],
) -> None:
for cargo in cargos:
Signature(
execute_audio_preprocessing_pipeline_task, kwargs={"cargo": cargo}
).apply_async()
execute_audio_preprocessing_pipeline_task.apply_async(kwargs={"cargo": cargo})


def execute_video_preprocessing_pipeline_apply_async(
cargos: List[PipelineCargo],
) -> None:
for cargo in cargos:
Signature(
execute_video_preprocessing_pipeline_task, kwargs={"cargo": cargo}
).apply_async()
execute_video_preprocessing_pipeline_task.apply_async(kwargs={"cargo": cargo})

0 comments on commit 7fc48f1

Please sign in to comment.