Source: worker/main.js

'use strict';

/**
 * Redis based queue for jobs like tooting.
 * @module worker/main
 * @license MIT
 * @author Kai KRETSCHMANN <kai@kretschmann.consulting>
 */

const Worker = require('node-resque').Worker;
const Scheduler = require('node-resque').Scheduler;
const Queue = require('node-resque').Queue;

const log4js = require('log4js');
const logger = log4js.getLogger();
logger.level = process.env.LOGLEVEL || /* istanbul ignore next */ 'warn';

let queue;

const dotoot = require('./w_toot');

/**
 * Start worker.
 * @function boot
 * @async
 * @public
 */
async function boot () {
  const connectionDetails = {
    pkg: 'ioredis',
    host: process.env.REDIS_HOSTNAME || /* istanbul ignore next */ '127.0.0.1',
    password: process.env.REDIS_PASSWD || null,
    port: process.env.REDIS_PORT || 6379,
    database: process.env.REDIS_DBNUM || 0
  };

  const jobs = {
    addtoot: {
      perform: (t) => {
        queue.length('toot').then(function (l) {
          if (l > 1) {
            t = `${t} and ${l} more`;
          }
          dotoot(t);
          queue.delByFunction('toot', 'addtoot');
        });
      }
    }
  };

  // Start worker
  const worker = new Worker(
    { connection: connectionDetails, queues: ['toot', 'otherQueue'] },
    jobs
  );
  await worker.connect();
  worker.start();

  // Start scheduler
  const scheduler = new Scheduler({ connection: connectionDetails });
  await scheduler.connect();
  scheduler.start();

  // Register for events
  worker.on('start', () => /* istanbul ignore next */ {
    logger.info('worker started');
  });
  worker.on('end', () => /* istanbul ignore next */ {
    logger.info('worker ended');
  });
  worker.on('cleaning_worker', (w, pid) => /* istanbul ignore next */ {
    logger.debug(`cleaning old worker ${w}`);
  });
  worker.on('poll', (q) => {
    logger.debug(`worker polling ${q}`);
    queue.length(q).then(function (l) {
      logger.debug(`Q length=${l}`);
    });
  });
  worker.on('ping', (time) => {
    logger.debug(`worker check in @ ${time}`);
  });
  worker.on('job', (q, job) => {
    logger.debug(`working job ${q} ${JSON.stringify(job)}`);
  });
  worker.on('reEnqueue', (q, job, plugin) => /* istanbul ignore next */ {
    logger.debug(`reEnqueue job (${plugin}) ${q} ${JSON.stringify(job)}`);
  });
  worker.on('success', (q, job, result, duration) => {
    logger.debug(`job success ${q} ${JSON.stringify(job)} >> ${result} (${duration}ms)`);
  });
  worker.on('failure', (q, job, failure, duration) => /* istanbul ignore next */ {
    logger.debug(`job failure ${q} ${JSON.stringify(job)} >> ${failure} (${duration}ms)`);
  });
  worker.on('error', (error, q, job) => /* istanbul ignore next */ {
    logger.debug(`error ${q} ${JSON.stringify(job)}  >> ${error}`);
  });
  worker.on('pause', () => {
    logger.debug('worker paused');
  });

  scheduler.on('start', () => /* istanbul ignore next */ {
    logger.debug('scheduler started');
  });
  scheduler.on('end', () => /* istanbul ignore next */ {
    logger.debug('scheduler ended');
  });
  scheduler.on('poll', () => {
    logger.debug('scheduler polling');
  });
  scheduler.on('leader', () => {
    logger.debug('scheduler became leader');
  });
  scheduler.on('error', (error) => /* istanbul ignore next */ {
    logger.debug(`scheduler error >> ${error}`);
  });
  scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => /* istanbul ignore next */ {
    logger.debug(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`);
  });
  scheduler.on('workingTimestamp', (timestamp) => /* istanbul ignore next */ {
    logger.debug(`scheduler working timestamp ${timestamp}`);
  });
  scheduler.on('transferredJob', (timestamp, job) => /* istanbul ignore next */ {
    logger.debug(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`);
  });

  // connect to a queue
  queue = new Queue({ connection: connectionDetails }, jobs);
  queue.on('error', function (error) /* istanbul ignore next */ {
    logger.error(error);
  });
  await queue.connect();
}

/**
 * Start queue.
 * @function doqueue
 * @async
 * @public
 * @param {object} t mastodon toot object
 */
async function doqueue (t) {
  queue.enqueue('toot', 'addtoot', t);
}

boot();

exports.boot = boot;
exports.Worker = Worker;
exports.Scheduler = Scheduler;
exports.queue = queue;
exports.doqueue = doqueue;