Skip to content

anchan828/nest-cloud-run-queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

@anchan828/nest-cloud-run-queue

Create a Queue/Worker for NestJS application in Cloud Run.

nest-cloud-run-queue

Overview

I am wondering how to implement Queue when running an application with Cloud Run. While @nestjs/bull is a very good library, Cloud Run, which is serverless, cannot be used because the server is not always running. You can use "CPU always allocated" for resolve this issue, but it doesn't make sense to use Cloud Run.

Therefore, I used Cloud Pub/Sub or Cloud Tasks so that I could implement Queue via HTTP requests. This package supports both, so you can choose whichever you prefer.

Choose Cloud Tasks or Pub/Sub

Of course, these packages can work without using Cloud Run on workers since they handle tasks via HTTP requests.

Demo

See https://github.com/anchan828/nest-cloud-run-queue/tree/master/packages/demo

This demo uses an emulator, which runs PubSub and Tasks locally. Please see [docker compose.yml](https://github.com/anchan828/nest-cloud-run-queue/blob/master/docker compose.yml) if you are interested.

Packages

There are two types of packages.

Publisher

Package Description
@anchan828/nest-cloud-run-queue-pubsub-publisher Library for sending messages using Cloud Pub/Sub.
@anchan828/nest-cloud-run-queue-tasks-publisher Library for sending messages using Cloud Tasks.

Worker

Package Description
@anchan828/nest-cloud-run-queue-worker Library for creating applications that receive and process messages.

Getting started

1. Create publisher application

Using Cloud Pub/Sub

See: @anchan828/nest-cloud-run-queue-pubsub-publisher - README.md

Using Cloud Tasks

See: @anchan828/nest-cloud-run-queue-tasks-publisher - README.md

2. Create worker application

Import worker module

@Module({
  imports: [QueueWorkerModule.register()],
})
export class WorkerAppModule {}

Create worker provider

@QueueWorker("Worker name")
// @QueueWorker({ name: "Worker name" })
class Worker {
  @QueueWorkerProcess()
  public async process(message: string | object, raw: QueueWorkerRawMessage): Promise<void> {
    console.log("Message:", message);
    console.log("Raw message:", raw);
  }
}

Add as provider

@Module({
  imports: [QueueWorkerModule.register()],
  providers: [Worker],
})
export class WorkerAppModule {}

Customize worker controller

The Controller who receives the message is automatically defined. You can customize it.

@Module({
  imports: [
    QueueWorkerModule.register({
      workerController: {
        method: RequestMethod.GET,
        path: "/worker",
      },

      // Default
      // workerController: {
      //   method: RequestMethod.POST,
      //   path: "/",
      // },
    }),
  ],
  providers: [Worker],
})
export class WorkerAppModule {}

You can also define your own Controller. In that case, set workerController to null.

@Controller("/worker")
class WorkerController {
  constructor(private readonly service: QueueWorkerService) {}

  @Post()
  public async execute(@Body() body: QueueWorkerReceivedMessage): Promise<void> {
    const results = await this.service.execute(body.message);

    for (const result of results) {
      console.log(result.success);
    }

    // or

    // const decodedMessage = decodeMessage(body.message);
    // const results = await this.service.execute(decodedMessage);
  }
}

@Module({
  controllers: [WorkerController],
  imports: [
    QueueWorkerModule.register({
      workerController: null,
    }),
  ],
  providers: [Worker],
})
export class WorkerAppModule {}

Create bootstrap function

async function bootstrap(): Promise<void> {
  const app = await NestFactory.create(WorkerAppModule);
  await app.listen(process.env.PORT || 8080);
}

bootstrap();

Disable worker/provider

You can disable worker/provider by setting enabled to false. For example, if you are reusing the same application, you can disable process on Cloud Run Service and enable it to run on Cloud Run Job.

@QueueWorker({ name: "Worker name", enabled: config.isEnabledWorker })
class Worker {
  @QueueWorkerProcess({ enabled: config.isEnabledProcess })
  public async process(message: string | object, raw: QueueWorkerRawMessage): Promise<void> {}

  @QueueWorkerProcess()
  public async process2(message: string | object, raw: QueueWorkerRawMessage): Promise<void> {
    console.log("Message:", message);
    console.log("Raw message:", raw);
  }
}

Execute worker/processor manually

You can execute worker/processor manually.

@Controller("/worker")
class WorkerController {
  constructor(private readonly service: QueueWorkerService) {}

  @Post()
  public async execute(@Body() body: QueueWorkerReceivedMessage): Promise<void> {
    const workers = await this.service.getWorkers(body.message);

    for (const worker of workers) {
      const processors = worker.getProcessors();

      for (const processor of processors) {
        const result = await processor.execute();

        if (result.success) {
          console.log("Success");
        } else {
          console.log("Failed:" + result.error.message);
        }
      }
    }
  }
}

Using Cloud Scheduler

You can use Cloud Scheduler as trigger.

Payload is JSON string {"name": "worker name", "data": "str"}

Using as standalone

There may be times when you want to use it for a one-time call, such as Cloud Run jobs.

import { QueueWorkerService } from "@anchan828/nest-cloud-run-queue-worker";

async function bootstrap(): Promise<void> {
  const app = await NestFactory.createApplicationContext(WorkerAppModule);
  await app.get(QueueWorkerService).execute({
    name: "worker name",
    data: "str",
  });
}

bootstrap();