# 内置任务

内置定时任务能力来着与midwayjs (opens new window)

定时任务一般存放在 schedule 目录下,可以配置定时任务的属性和要执行的方法。例如:

// xxx/schedule/hello.ts
import { Provide, Inject, Schedule, CommonSchedule } from '@midwayjs/decorator';
import { Context } from 'egg';

@Provide()
@Schedule({
  interval: 2333, // 2.333s 间隔
  type: 'worker', // 指定某一个 worker 执行
  //cron: '0 0 */3 * * *',
})
export class HelloCron implements CommonSchedule {
  
  @Inject()
  ctx: Context;

  // 定时执行的具体任务
  async exec() {
    this.ctx.logger.info(process.pid, 'hello');
  }
}

# 类型type

框架提供的定时任务默认支持两种类型,worker 和 all。worker 和 all 都支持上面的两种定时方式,只是当到执行时机时,会执行定时任务的 worker 不同:

  • worker 类型:每台机器上只有一个 worker 会执行这个定时任务,每次执行定时任务的 worker 的选择是随机的。
  • all 类型:每台机器上的每个 worker 都会执行这个定时任务。

# 定时规则cron

*    *    *    *    *    *
┬    ┬    ┬    ┬    ┬    ┬
│    │    │    │    │    |
│    │    │    │    │    └ day of week (0 - 7) (0 or 7 is Sun)
│    │    │    │    └───── month (1 - 12)
│    │    │    └────────── day of month (1 - 31)
│    │    └─────────────── hour (0 - 23)
│    └──────────────────── minute (0 - 59)
└───────────────────────── second (0 - 59, optional)

WARNING

注意:该方式在多实例部署的情况下无法做到任务之前的协同,任务存在重复执行的可能

# 分布式任务

midwayjs (opens new window)自带的定时任务在多个服务实例的情况下,无法做到协同,任务有可能重复执行。为了解决该问题,cool-admin提供了一个分布式任务的方案,该方案利用redis作为协同。因此要开启该功能需要安装两个插件,midwayjs-cool-redismidwayjs-cool-queue

# 安装插件

// 安装redis插件
yarn add midwayjs-cool-redis

// 安装任务与队列插件
yarn add midwayjs-cool-queue

# 启用插件

import { App, Configuration } from '@midwayjs/decorator';
import { ILifeCycle, IMidwayContainer } from '@midwayjs/core';
import { Application } from 'egg';
import * as orm from '@midwayjs/orm';
import * as cool from 'midwayjs-cool-core';
import * as wxpay from 'midwayjs-cool-wxpay';
import * as oss from 'midwayjs-cool-oss';
import * as redis from 'midwayjs-cool-redis';
import * as queue from 'midwayjs-cool-queue';
import * as alipay from 'midwayjs-cool-alipay';
//import * as socket from 'midwayjs-cool-socket';

@Configuration({
  // 注意组件顺序 cool 有依赖orm组件, 所以必须放在,orm组件之后 cool的其他组件必须放在cool 核心组件之后
  imports: [
    // 必须,不可移除, https://typeorm.io  打不开? https://typeorm.biunav.com/zh/
    orm,
    // 必须,不可移除, cool-admin 官方组件 https://www.cool-js.com
    cool,
    // oss插件,需要到后台配置之后才有用,默认是本地上传
    oss,
    // 将缓存替换成redis
    redis,
    // 队列
    queue,
    // 微信支付
    wxpay,
    // 支付宝支付
    alipay,
    // socket
    //socket
  ],
})
export class ContainerLifeCycle implements ILifeCycle {
  @App()
  app: Application;
  // 应用启动完成
  async onReady(container?: IMidwayContainer) {}
  // 应用停止
  async onStop() {}
}

# 配置插件

登录后台 插件管理/插件列表 配置redis插件, 配置完最好重新启动下服务使其生效,如果插件列表

# 配置多个redis

[
	{
		"host": "192.168.0.103",
		"port": 7000,
		"password": "",
		"db": 0
	},
	{
		"host": "192.168.0.103",
		"port": 7001,
		"password": "",
		"db": 0
	},
	{
		"host": "192.168.0.103",
		"port": 7002,
		"password": "",
		"db": 0
	},
	{
		"host": "192.168.0.103",
		"port": 7003,
		"password": "",
		"db": 0
	},
	{
		"host": "192.168.0.103",
		"port": 7004,
		"password": "",
		"db": 0
	},
	{
		"host": "192.168.0.103",
		"port": 7005,
		"password": "",
		"db": 0
	}
]

WARNING

如果插件列表找不到该插件,尝试重启你的服务重新刷新

# 创建执行任务的service

import { Provide } from '@midwayjs/decorator';
import { BaseService } from 'midwayjs-cool-core';
/**
 * 任务执行的demo示例
 */
@Provide()
export class DemoTaskService extends BaseService {
  /**
   * 测试任务执行
   * @param params 接收的参数 数组 [] 可不传
   */
  async test(params?: []) {
    // 需要登录后台任务管理配置任务
    console.log('任务执行了', params);
  }
}

# 配置定时任务

登录后台 任务管理/任务列表

WARNING

截图中的demoTaskService为上一步执行任务的service的实例ID,midwayjs默认为类名首字母小写!!!

任务调度基于redis,所有的任务都需要通过代码去维护任务的创建,启动,暂停。 所以直接改变数据库的任务状态是无效的,redis中的信息还未清空, 任务将继续执行。

# 队列

上文说到分布式任务调度,其实是利用了bull (opens new window)的重复队列机制。

在项目开发过程中特别是较大型、数据量较大、业务较复杂的场景下往往需要用到队列。 如:抢购、批量发送消息、分布式事务、订单2小时后失效等。

# 创建队列

一般放在名称为queue文件夹下

import { App, Provide } from '@midwayjs/decorator';
import { IMidwayWebApplication } from '@midwayjs/web';
import { ICoolQueue, Queue } from 'midwayjs-cool-queue';

/**
 * 任务
 */
@Queue()
@Provide()
export abstract class DemoQueue implements ICoolQueue {
  @App()
  app: IMidwayWebApplication;

  async data(job: any, done: any): Promise<void> {
    // 这边可以执行定时任务具体的业务或队列的业务
    console.log('数据', job);
    done();
  }
}

# 发送数据

import { Get, Inject, Post, Provide } from '@midwayjs/decorator';
import { CoolController, BaseController } from 'midwayjs-cool-core';
import { IQueue } from 'midwayjs-cool-queue';
import { DemoAppGoodsEntity } from '../../entity/goods';
import { DemoGoodsService } from '../../service/goods';

/**
 * 商品
 */
@Provide()
@CoolController({
  api: ['add', 'delete', 'update', 'info', 'list', 'page'],
  entity: DemoAppGoodsEntity,
  listQueryOp: {
    keyWordLikeFields: ['title'],
  },
})
export class DemoAppGoodsController extends BaseController {
  @Inject()
  demoGoodsService: DemoGoodsService;

  // 队列
  @Inject()
  demoQueue: IQueue;

  /**
   * 请求所有数据
   * @returns
   */
  @Get('/all')
  async all() {
    return this.ok(await this.demoGoodsService.all());
  }

  /**
   * 发送数据到队列
   */
  @Post('/queue')
  async queue(){
	// {a: 1} 为发送到队列的数据, 第二个参数是队列的配置
	// this.demoQueue.queue 获得的就是bull实例
    this.demoQueue.queue.add({ a: 1 }, {
      removeOnComplete: true,
      removeOnFail: true,
    });
  }
}

队列配置

interface JobOpts {
  priority: number; // Optional priority value. ranges from 1 (highest priority) to MAX_INT  (lowest priority). Note that
  // using priorities has a slight impact on performance, so do not use it if not required.

  delay: number; // An amount of milliseconds to wait until this job can be processed. Note that for accurate delays, both
  // server and clients should have their clocks synchronized. [optional].

  attempts: number; // The total number of attempts to try the job until it completes.

  repeat: RepeatOpts; // Repeat job according to a cron specification.

  backoff: number | BackoffOpts; // Backoff setting for automatic retries if the job fails, default strategy: `fixed`

  lifo: boolean; // if true, adds the job to the right of the queue instead of the left (default false)
  timeout: number; // The number of milliseconds after which the job should be fail with a timeout error [optional]

  jobId: number | string; // Override the job ID - by default, the job ID is a unique
  // integer, but you can use this setting to override it.
  // If you use this option, it is up to you to ensure the
  // jobId is unique. If you attempt to add a job with an id that
  // already exists, it will not be added.

  removeOnComplete: boolean | number; // If true, removes the job when it successfully
  // completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set.

  removeOnFail: boolean | number; // If true, removes the job when it fails after all attempts. A number specified the amount of jobs to keep
  // Default behavior is to keep the job in the failed set.
  stackTraceLimit: number; // Limits the amount of stack trace lines that will be recorded in the stacktrace.
}

TIP

this.demoQueue.queue 获得的就是bull实例,更多bull的高级用户可以查看bull文档 (opens new window)