Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bullmq Worker not reusing configured active redis connection #4545

Closed
Stancobridge opened this issue Apr 25, 2024 · 0 comments
Closed

Bullmq Worker not reusing configured active redis connection #4545

Stancobridge opened this issue Apr 25, 2024 · 0 comments

Comments

@Stancobridge
Copy link

Package version

^6.6.0

Describe the bug

Am trying to use bullmq with adonisjs but have run into some issues which I have resolved.
First of there is no documentation on how to set up BullMQ in adonisjs, so what I did is that I created a provider and added the code in ready method of the provider as below.

import type { ApplicationService } from '@adonisjs/core/types';
import mail from '@adonisjs/mail/services/main';
import { Queue, QueueEvents, Worker } from 'bullmq';

export default class EmailQueueProvider {
  constructor(protected app: ApplicationService) {}

  /**
   * Register bindings to the container
   */
  register() {}

  /**
   * The container bindings have booted
   */
  async boot() {}

  /**
   * The application has been booted
   */
  async start() {}

  /**
   * The process has been started
   */
  async ready() {
    const emailsQueue = new Queue('emails');

    mail.setMessenger(mailer => {
      return {
        async queue(mailMessage, config) {
          await emailsQueue.add('send_email';
        },
      };
    });

    const worker = new Worker(
      'emails',
      async job => {
        if (job.name === 'send_email') {
          const { mailMessage, config, mailerName } = job.data;

          await mail.use(mailerName).sendCompiled(mailMessage, config);
        }
      },
      {
        connection: {
          host: '127.0.0.1',
          port: 6379,
        },
      },
    );

    worker.on('completed', job => {
      console.log(`${job.id} has completed!`);
    });

    worker.on('failed', (job, err) => {
      console.log(`${job?.id} has failed with ${err.message}`);
    });

    const queueEvents = new QueueEvents('emails');

    queueEvents.on('waiting', ({ jobId }) => {
      console.log(`A job with ID ${jobId} is waiting`);
    });

    queueEvents.on('active', ({ jobId, prev }) => {
      console.log(`Job ${jobId} is now active; previous status was ${prev}`);
    });

    queueEvents.on('completed', ({ jobId, returnvalue }) => {
      console.log(`${jobId} has completed and returned ${returnvalue}`);
    });

    queueEvents.on('failed', ({ jobId, failedReason }) => {
      console.log(`${jobId} has failed with reason ${failedReason}`);
    });
  }

  /**
   * Preparing to shutdown the app
   */
  async shutdown() {}
}

but am getting this error

update providers/email_queue_provider.ts
╭─────────────────────────────────────────────────╮
│                                                 │
│    Server address: http://localhost:3333        │
│    File system watcher: enabled                 │
│    Ready in: 701 ms                             │
│                                                 │
╰─────────────────────────────────────────────────╯

   Error: Worker requires a connection

   at EmailQueueProvider.ready providers/email_queue_provider.ts:41
   36|            });
   37|          },
   38|        };
   39|      });
   40|
 ❯ 41|      const worker = new Worker(
   42|        'emails',
   43|        async job => {
   44|          if (job.name === 'send_email') {
   45|            const { mailMessage, config, mailerName } = job.data;
   46|

   ⁃ Worker
     node_modules/.pnpm/bullmq@5.7.5/node_modules/bullmq/src/classes/worker.ts:210
   ⁃ ProvidersManager.ready
     node_modules/.pnpm/@adonisjs+application@8.2.1_@adonisjs+config@5.0.1_@adonisjs+fold@10.1.1/node_modules/@adonisjs/application/src/managers/providers.ts:197
   ⁃ Application.start
     node_modules/.pnpm/@adonisjs+application@8.2.1_@adonisjs+config@5.0.1_@adonisjs+fold@10.1.1/node_modules/@adonisjs/application/src/application.ts:563

[18:42:54.894] INFO (73086): started HTTP server on localhost:3333

If I manually add the connection host and port it works fine

import type { ApplicationService } from '@adonisjs/core/types';
import mail from '@adonisjs/mail/services/main';
import { Queue, QueueEvents, Worker } from 'bullmq';

export default class EmailQueueProvider {
  constructor(protected app: ApplicationService) {}

  /**
   * Register bindings to the container
   */
  register() {}

  /**
   * The container bindings have booted
   */
  async boot() {}

  /**
   * The application has been booted
   */
  async start() {}

  /**
   * The process has been started
   */
  async ready() {
    const emailsQueue = new Queue('emails');

    mail.setMessenger(mailer => {
      return {
        async queue(mailMessage, config) {
          await emailsQueue.add('send_email', {
            mailMessage,
            config,
            mailerName: mailer.name,
          });
        },
      };
    });

    const worker = new Worker(
      'emails',
      async job => {
        if (job.name === 'send_email') {
          const { mailMessage, config, mailerName } = job.data;

          await mail.use(mailerName).sendCompiled(mailMessage, config);
        }
      },
      {
        connection: {
          host: '127.0.0.1',
          port: 6379,
        },
      },
    );

    worker.on('completed', job => {
      console.log(`${job.id} has completed!`);
    });

    worker.on('failed', (job, err) => {
      console.log(`${job?.id} has failed with ${err.message}`);
    });

    const queueEvents = new QueueEvents('emails');

    queueEvents.on('waiting', ({ jobId }) => {
      console.log(`A job with ID ${jobId} is waiting`);
    });

    queueEvents.on('active', ({ jobId, prev }) => {
      console.log(`Job ${jobId} is now active; previous status was ${prev}`);
    });

    queueEvents.on('completed', ({ jobId, returnvalue }) => {
      console.log(`${jobId} has completed and returned ${returnvalue}`);
    });

    queueEvents.on('failed', ({ jobId, failedReason }) => {
      console.log(`${jobId} has failed with reason ${failedReason}`);
    });
  }

  /**
   * Preparing to shutdown the app
   */
  async shutdown() {}
}

Is it not supposed to work with the connection provided by redis that I have already set.
here is the redis config.

import env from '#start/env'
import { defineConfig } from '@adonisjs/redis'
import { InferConnections } from '@adonisjs/redis/types'

const redisConfig = defineConfig({
  connection: 'main',

  connections: {
    /*
    |--------------------------------------------------------------------------
    | The default connection
    |--------------------------------------------------------------------------
    |
    | The main connection you want to use to execute redis commands. The same
    | connection will be used by the session provider, if you rely on the
    | redis driver.
    |
    */
    main: {
      host: env.get('REDIS_HOST'),
      port: env.get('REDIS_PORT'),
      password: env.get('REDIS_PASSWORD', ''),
      db: 0,
      keyPrefix: '',
      retryStrategy(times) {
        return times > 10 ? null : times * 50
      },
    },
  },
})

export default redisConfig

declare module '@adonisjs/redis/types' {
  export interface RedisConnections extends InferConnections<typeof redisConfig> {}
}

Redis connection is working fine

Reproduction repo

No response

@adonisjs adonisjs locked and limited conversation to collaborators May 1, 2024
@thetutlage thetutlage converted this issue into discussion #4554 May 1, 2024

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant