三十的博客

GVM 脚手架探险记07:跟着 GVM 学习 Cron

发布时间
阅读量 加载中...

首先推荐阅读 Cron 的基本知识,了解 Cron 表达式的语法和功能。

Cron 官方文档

前置说明

timer.NewTimerTask() 会在程序启动时,main 函数执行之前被调用。具体来说:

  1. 程序启动
  2. 初始化所有导入的包(按依赖顺序)
  3. 初始化当前包的全局变量(按声明顺序)
  4. 执行 main 函数

关键特点:

  1. 线程安全: 全局变量初始化在单线程环境下完成,不存在并发问题
  2. 只执行一次: timer.NewTimerTask() 只会执行这一次初始化
  3. 顺序保证: 如果其他全局变量依赖 GVA_Timer,只要声明在后面就能保证可用

代码示例

corn-demo/
├── ctrl/
 └── ctrl.go
├── view/
 └── view.go
├── go.mod
└── main.go
go
// corn-demo/main.go

package main

import (
	"corn-demo/view"
	"fmt"
)

func main() {
	fmt.Println("main function")
	fmt.Println(view.A)
}

// 执行结果
// ctrl package mound...
// init view a
// view init 01
// view init 02
// ctrl say...
// main function
// 1
go
// corn-demo/view/view.go

package view

import (
	"corn-demo/ctrl"
	"fmt"
)

func init() {
	fmt.Println("view init 01 ")
}

func init() {
	fmt.Println("view init 02 ")
	ctrl.Say()
}

var (
	A = initA()
)

func initA() int {
	fmt.Println("init view a")
	return 1
}
go
// corn-demo/ctrl/ctrl.go

package ctrl

import "fmt"

func init() {
	fmt.Println("ctrl package mound...")
}

func Say() {
	fmt.Println("ctrl say...")
}

初始化定时任务

通过全局变量 GVA_Timer 来初始化定时任务。

go
// global/global.go

var (
	// ...
	GVA_LOG                 *zap.Logger
	GVA_Timer               timer.Timer = timer.NewTimerTask()
  // ...
)
go
// utils/timer/timed_task.go

// NewTimerTask 创建一个新的定时任务管理器实例
func NewTimerTask() Timer {
	return &timer{cronList: make(map[string]*taskManager)}
}

定时任务相关结构体解析

在文件 utils/timer/timed_task.go 中:

go
// task 存储单个任务的基本信息
type task struct {
	EntryID  cron.EntryID // 任务的唯一ID
	Spec     string       // cron 表达式,定义任务执行时间
	TaskName string       // 任务名称
}
go
// taskManager 管理一组任务
// 管理一组相关的定时任务,每个 taskManager 对应一个 cron 实例和一组任务
type taskManager struct {
	corn  *cron.Cron             // cron 实例
	tasks map[cron.EntryID]*task // 任务映射表
}
go
// timer 主定时任务管理器实现
// 主定时任务管理器实现,使用互斥锁保证并发安全
type timer struct {
	cronList   map[string]*taskManager // 按名称组织的任务管理器集合
	sync.Mutex                         // 互斥锁,保证线程安全
}

添加定时任务方法

通过函数的方法

通过函数的方法添加任务

go
// AddTaskByFunc 通过函数的方法添加任务
func (t *timer) AddTaskByFunc(cronName string, spec string, fun func(), taskName string, option ...cron.Option) (cron.EntryID, error) {
	// 1. 加锁保证线程安全
	t.Lock()
	defer t.Unlock()

	// 2. 检查是否已存在对应名称的 cron 实例,不存在则创建
	if _, ok := t.cronList[cronName]; !ok {
		tasks := make(map[cron.EntryID]*task)
		t.cronList[cronName] = &taskManager{
			corn:  cron.New(option...),
			tasks: tasks,
		}
	}
	// 3. 添加定时任务函数
	id, err := t.cronList[cronName].corn.AddFunc(spec, fun)
	// 4. 启动 cron
	t.cronList[cronName].corn.Start()
	// 5. 记录任务信息
	t.cronList[cronName].tasks[id] = &task{
		EntryID:  id,
		Spec:     spec,
		TaskName: taskName,
	}
	// 6. 返回任务 ID 和可能的错误
	return id, err
}

通过函数的方法添加秒级精度任务

go
// AddTaskByFuncWithSecond 支持秒级精度的定时任务
func (t *timer) AddTaskByFuncWithSecond(cronName string, spec string, fun func(), taskName string, option ...cron.Option) (cron.EntryID, error) {
	// 1. 加锁保证线程安全
	t.Lock()
	defer t.Unlock()
	option = append(option, cron.WithSeconds())
	// 2. 检查是否已存在对应名称的 cron 实例,不存在则创建
	if _, ok := t.cronList[cronName]; !ok {
		tasks := make(map[cron.EntryID]*task)
		t.cronList[cronName] = &taskManager{
			corn:  cron.New(option...),
			tasks: tasks,
		}
	}
	// 3. 添加定时任务函数
	id, err := t.cronList[cronName].corn.AddFunc(spec, fun)
	// 4. 启动 cron
	t.cronList[cronName].corn.Start()
	// 5. 记录任务信息
	t.cronList[cronName].tasks[id] = &task{
		EntryID:  id,
		Spec:     spec,
		TaskName: taskName,
	}
	// 6. 返回任务 ID 和可能的错误
	return id, err
}

通过接口的方法

通过接口添加任务(需要实现 Run 方法)

go
// AddTaskByJob 通过接口添加任务(需要实现 Run 方法)
func (t *timer) AddTaskByJob(cronName string, spec string, job interface{ Run() }, taskName string, option ...cron.Option) (cron.EntryID, error) {
	// 1. 加锁保证线程安全
	t.Lock()
	defer t.Unlock()
	// 2. 检查是否已存在对应名称的 cron 实例,不存在则创建
	if _, ok := t.cronList[cronName]; !ok {
		tasks := make(map[cron.EntryID]*task)
		t.cronList[cronName] = &taskManager{
			corn:  cron.New(option...),
			tasks: tasks,
		}
	}
	// 3. 添加定时任务函数
	id, err := t.cronList[cronName].corn.AddJob(spec, job)
	// 4. 启动 cron
	t.cronList[cronName].corn.Start()
	// 5. 记录任务信息
	t.cronList[cronName].tasks[id] = &task{
		EntryID:  id,
		Spec:     spec,
		TaskName: taskName,
	}
	// 6. 返回任务 ID 和可能的错误
	return id, err
}

通过接口添加秒级精度任务(需要实现 Run 方法)

go
// AddTaskByJobWithSeconds 通过接口添加秒级精度任务
func (t *timer) AddTaskByJobWithSeconds(cronName string, spec string, job interface{ Run() }, taskName string, option ...cron.Option) (cron.EntryID, error) {
	// 1. 加锁保证线程安全
	t.Lock()
	defer t.Unlock()
	option = append(option, cron.WithSeconds())
	// 2. 检查是否已存在对应名称的 cron 实例,不存在则创建
	if _, ok := t.cronList[cronName]; !ok {
		tasks := make(map[cron.EntryID]*task)
		t.cronList[cronName] = &taskManager{
			corn:  cron.New(option...),
			tasks: tasks,
		}
	}
	// 3. 添加定时任务函数
	id, err := t.cronList[cronName].corn.AddJob(spec, job)
	// 4. 启动 cron
	t.cronList[cronName].corn.Start()
	// 5. 记录任务信息
	t.cronList[cronName].tasks[id] = &task{
		EntryID:  id,
		Spec:     spec,
		TaskName: taskName,
	}
	// 6. 返回任务 ID 和可能的错误
	return id, err
}

查找任务

查找单个任务

go
// FindCron 获取对应 cronName 的 cron 可能会为空
func (t *timer) FindCron(cronName string) (*taskManager, bool) {
	t.Lock()
	defer t.Unlock()
	v, ok := t.cronList[cronName]
	return v, ok
}

// FindTask 查找任务
func (t *timer) FindTask(cronName string, taskName string) (*task, bool) {
	t.Lock()
	defer t.Unlock()
	v, ok := t.cronList[cronName]
	if !ok {
		return nil, ok
	}
	for _, t2 := range v.tasks {
		if t2.TaskName == taskName {
			return t2, true
		}
	}
	return nil, false
}

查找任务组中的所有任务

go
// FindCronList 获取所有的任务列表
func (t *timer) FindCronList() map[string]*taskManager {
	t.Lock()
	defer t.Unlock()
	return t.cronList
}

启动/删除/停止/清理任务

启动任务

go
// StartCron 启动任务
func (t *timer) StartCron(cronName string) {
	t.Lock()
	defer t.Unlock()
	if v, ok := t.cronList[cronName]; ok {
		v.corn.Start()
	}
}

停止任务

go
// StopCron 停止任务
func (t *timer) StopCron(cronName string) {
	t.Lock()
	defer t.Unlock()
	if v, ok := t.cronList[cronName]; ok {
		v.corn.Stop()
	}
}

删除任务

go
// RemoveTask 删除任务
func (t *timer) RemoveTask(cronName string, id int) {
	t.Lock()
	defer t.Unlock()
	if v, ok := t.cronList[cronName]; ok {
		v.corn.Remove(cron.EntryID(id))
		delete(v.tasks, cron.EntryID(id))
	}
}

// RemoveTaskByName 从 cronName 使用 taskName 删除指定任务
func (t *timer) RemoveTaskByName(cronName string, taskName string) {
	fTask, ok := t.FindTask(cronName, taskName)
	if !ok {
		return
	}
	t.RemoveTask(cronName, int(fTask.EntryID))
}

清除指定任务

go
// Clear 清除任务
func (t *timer) Clear(cronName string) {
	t.Lock()
	defer t.Unlock()
	if v, ok := t.cronList[cronName]; ok {
		v.corn.Stop()
		delete(t.cronList, cronName)
	}
}

关闭所有任务

go
// Close 释放资源
func (t *timer) Close() {
	t.Lock()
	defer t.Unlock()
	for _, v := range t.cronList {
		v.corn.Stop()
	}
}

GVM 任务管理接口参考

go
type Timer interface {
	// FindCronList 寻找所有 Cron
	FindCronList() map[string]*taskManager
	// AddTaskByFuncWithSecond 添加 Task 方法形式以秒的形式加入
	AddTaskByFuncWithSecond(cronName string, spec string, fun func(), taskName string, option ...cron.Option) (cron.EntryID, error)
	// AddTaskByJobWithSeconds 添加 Task 接口形式以秒的形式加入 要实现一个带有 Run 方法的接口触发
	AddTaskByJobWithSeconds(cronName string, spec string, job interface{ Run() }, taskName string, option ...cron.Option) (cron.EntryID, error)
	// AddTaskByFunc 通过函数的方法添加任务
	AddTaskByFunc(cronName string, spec string, task func(), taskName string, option ...cron.Option) (cron.EntryID, error)
	// AddTaskByJob 通过接口的方法添加任务 要实现一个带有 Run 方法的接口触发
	AddTaskByJob(cronName string, spec string, job interface{ Run() }, taskName string, option ...cron.Option) (cron.EntryID, error)
	// FindCron 获取对应 taskName 的 cron 可能会为空
	FindCron(cronName string) (*taskManager, bool)
	// StartCron 指定 cron 开始执行
	StartCron(cronName string)
	// StopCron 指定 cron 停止执行
	StopCron(cronName string)
	// FindTask 查找指定 cron 下的指定 task
	FindTask(cronName string, taskName string) (*task, bool)
	// RemoveTask 根据 id 删除指定 cron 下的指定 task
	RemoveTask(cronName string, id int)
	// RemoveTaskByName 根据 taskName 删除指定 cron 下的指定 task
	RemoveTaskByName(cronName string, taskName string)
	// Clear 清理掉指定 cronName
	Clear(cronName string)
	// Close 停止所有的 cron
	Close()
}

配套的测试文件赏析

代码阅读起来还是比较清晰的,主要是测试了一些基本的功能,比如添加任务、查找任务、删除任务。

go
package timer

import (
	"fmt"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
)

var job = mockJob{}

type mockJob struct{}

func (job mockJob) Run() {
	mockFunc()
}

func mockFunc() {
	time.Sleep(time.Second)
	fmt.Println("1s...")
}

func TestNewTimerTask(t *testing.T) {
	tm := NewTimerTask()
	_tm := tm.(*timer)

	{
		_, err := tm.AddTaskByFunc("func", "@every 1s", mockFunc, "测试mockfunc")
		assert.Nil(t, err)
		_, ok := _tm.cronList["func"]
		if !ok {
			t.Error("no find func")
		}
	}

	{
		_, err := tm.AddTaskByJob("job", "@every 1s", job, "测试job mockfunc")
		assert.Nil(t, err)
		_, ok := _tm.cronList["job"]
		if !ok {
			t.Error("no find job")
		}
	}

	{
		_, ok := tm.FindCron("func")
		if !ok {
			t.Error("no find func")
		}
		_, ok = tm.FindCron("job")
		if !ok {
			t.Error("no find job")
		}
		_, ok = tm.FindCron("none")
		if ok {
			t.Error("find none")
		}
	}
	{
		tm.Clear("func")
		_, ok := tm.FindCron("func")
		if ok {
			t.Error("find func")
		}
	}
	{
		a := tm.FindCronList()
		b, c := tm.FindCron("job")
		fmt.Println(a, b, c)
	}
}

清理数据库表数据任务赏析

入口

go
// main.go

func initializeSystem() {
  // ...
	// 初始化定时任务
	initialize.Timer()

	// 初始化 db-list 中数据库
  // ...
}

定时任务配置

go
// initialize/timer.go

func Timer() {
	go func() {
		var option []cron.Option
		option = append(option, cron.WithSeconds())
		// 清理 DB 定时任务
		_, err := global.GVA_Timer.AddTaskByFunc("ClearDB", "@daily", func() {
			err := task.ClearTable(global.GVA_DB) // 定时任务方法定在 task 文件包中
			if err != nil {
				fmt.Println("timer error:", err)
			}
		}, "定时清理数据库【日志,黑名单】内容", option...)
		if err != nil {
			fmt.Println("add timer error:", err)
		}
	}()
}

数据模型

go
// model/common/clearDB.go

package common

type ClearDB struct {
	TableName    string
	CompareField string
	Interval     string
}

具体实现

go
// task/clearTable.go

func ClearTable(db *gorm.DB) error {
	var ClearTableDetail []common.ClearDB

	ClearTableDetail = append(ClearTableDetail, common.ClearDB{
		TableName:    "sys_operation_records",
		CompareField: "created_at",
		Interval:     "2160h",
	})

	ClearTableDetail = append(ClearTableDetail, common.ClearDB{
		TableName:    "jwt_blacklists",
		CompareField: "created_at",
		Interval:     "168h",
	})

	if db == nil {
		return errors.New("db Cannot be empty")
	}

	for _, detail := range ClearTableDetail {
		duration, err := time.ParseDuration(detail.Interval)
		if err != nil {
			return err
		}
		if duration < 0 {
			return errors.New("parse duration < 0")
		}
    sql := fmt.Sprintf("DELETE FROM %s WHERE %s < ?", detail.TableName, detail.CompareField)
		err = db.Debug().Exec(sql, time.Now().Add(-duration)).Error
		if err != nil {
			return err
		}
	}
	return nil
}
#Gvm #Cron #Golang