Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I cannot queue Task #1908

Closed
alejandro-celada opened this issue Aug 13, 2021 · 5 comments · Fixed by #1921
Closed

I cannot queue Task #1908

alejandro-celada opened this issue Aug 13, 2021 · 5 comments · Fixed by #1921
Labels
bug Something isn't working

Comments

@alejandro-celada
Copy link

alejandro-celada commented Aug 13, 2021

I can not enqued the tasks if we use:

await task.enqueue("sayHello", {
            message: "Hello!",
        })

If I use enqueueIN or enqueueAT work correctly:

await task.enqueueat(100, "sayHello", {
            message: "Hello!",
        })

await task.enqueueIn(100, "sayHello", {
            message: "Hello!",
        })
  • Actionhero Version: 26.1.1
  • Node.js Version: 14.16.0
  • Operating System: (OSX)

Please I need your help. Thanks

@alejandro-celada alejandro-celada added the bug Something isn't working label Aug 13, 2021
@evantahler
Copy link
Member

Can you share any errors you may be seeing?

@alejandro-celada
Copy link
Author

No error appears when we use task.enqueue. But the task is not executed either. On the other hand, if we use task.enqueueat or task.enqueueIn logs appear and it is executed successfully.

Any idea where the error may be?

Below you can see our config for src/config/task.ts:

export const DEFAULT = {
  tasks: (config) => {
    console.log(`Started as ${!!parseInt(process.env.SCHEDULER) ? 'SCHEDULER' : 'WORKER'}`)
    return {
      // Should this node run a scheduler to promote delayed tasks?
      scheduler: !!parseInt(process.env.SCHEDULER),
      // Should this node schedule cron jobs?
      cronScheduler: !!parseInt(process.env.CRON_SCHEDULER),
      // what queues should the taskProcessors work?
      queues: process.env.QUEUES ? process.env.QUEUES.split(',') : ['high', 'classify', 'high-electron', 'electron', 'default', 'low-electron'],
      // Logging levels of task workers
      workerLogging: {
        failure: 'error', // task failure
        success: 'info',  // task success
        start: 'info',
        end: 'info',
        cleaning_worker: 'info',
        poll: 'debug',
        job: 'info',
        pause: 'debug',
        internalError: 'error',
        multiWorkerAction: 'debug'
      },
      // Logging levels of the task scheduler
      schedulerLogging: {
        start: 'info',
        end: 'info',
        poll: 'debug',
        enqueue: 'info',
        reEnqueue: 'debug',
        working_timestamp: 'debug',
        transferred_job: 'debug'
      },
      // how long to sleep between jobs / scheduler checks
      timeout: 5000,
      // at minimum, how many parallel taskProcessors should this node spawn?
      // (have number > 0 to enable, and < 1 to disable)
      minTaskProcessors: 0,
      // at maximum, how many parallel taskProcessors should this node spawn?
      maxTaskProcessors: 1,
      // how often should we check the event loop to spawn more taskProcessors?
      checkTimeout: 500,
      // how many ms would constitue an event loop delay to halt taskProcessors spawning?
      maxEventLoopDelay: 5,
      // When we kill off a taskProcessor, should we disonnect that local redis connection?
      toDisconnectProcessors: true,
      // What redis server should we connect to for tasks / delayed jobs?
      redis: config.redis,
      //Elastic Beanstalk app names that are running for a particular cluster
      ebNames: process.env.EB_ENV_NAMES ? process.env.EB_ENV_NAMES.split(',') : []
    };
  },
};

export const test = {
  tasks: (config) => {
    return {
      timeout: 100,
      checkTimeout: 50,
    };
  },
};

@evantahler
Copy link
Member

Your config looks fine (other than some extra stuff that I'll bet is for your application (ebNames, cronScheduler, etc).

Can you share a way to reproduce the problem where enqueuing a task directly doesn't work?

Here are some debugging ideas:

  • Confirm that the queue used in both cases is the same. Is the queue listed in config.tasks.queues?
  • Check that the enqueued jobs really get to redis each time with the redis-cli
  • Does the problem occur in all environments? Is there something specific about "production" that cause the problem?
  • Are all of your instances (the workers, scheduler, and application servers enqueuing the job) all pointing to the same redis and same database?

@alejandro-celada
Copy link
Author

alejandro-celada commented Aug 16, 2021

Thank you very much for your support and prompt response.

We have finally found the cause of the error. It is when we try to add the middleware to the tasks through an initializer.

src/initializers/XXXX.ts:

import _ = require("lodash");
import { log, Initializer, task, api } from "actionhero";

export class taskMiddleware extends Initializer {
  constructor() {
    super();
    this.name = "customMiddleware";
  }

  async initialize() {
    const middlewareTask = {
      name: 'duration',
      global: true,
      priority: 90,
      preProcessor: async function () {
        const worker = this.worker
        worker.startTime = process.hrtime()
      },
      postProcessor: async function () {
        const worker = this.worker
        const elapsed = process.hrtime(worker.startTime)
        const seconds = elapsed[0]
        const millis = elapsed[1] / 1000000
        log(worker.job.class + ' done in ' + seconds + ' s and ' + millis + ' ms.', 'info')
      },
      preEnqueue: async function () {
        const arg = this.args[0]
        return (arg === 'ok') // returning `false` will prevent the task from enqueueing
      },
      postEnqueue: async function () {
        log("Task successfully enqueued!")
      }
    }

    task.addMiddleware(middlewareTask);
    log("I initialized", "debug", this.name);
  }
}

We have removed the initialization and everything works correctly. Although, I do not understand what we are doing wrong when adding the middleware since the example is the same as the official model

https://github.com/actionhero/actionhero/blob/main/src/modules/task.ts

@evantahler
Copy link
Member

Thank you for this detail!

The example was confusing, and is fixed in #1921 - the arguments were the inputs to the task... it's clearer now just to return true.

More interesting, is the difference in behavior you were seeing with task.enqueue and task.enqueueIn/At - all methods should be running the preEnqueue middleware at some point in their lifecyle. I've opened a new bug to investigate this specifically with #1922

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants