鄂州市网站建设_网站建设公司_轮播图_seo优化
2025/12/17 14:22:45 网站建设 项目流程

go语言定时任务工具类,支持crontab(精确到秒)和 timer 两种模式。
本文介绍了一个基于Go语言的定时任务工具类,支持crontab(精确到秒)和timer两种模式。工具类使用github.com/robfig/cron/v3包实现,主要功能包括:

提供NamedCronJobTask接口定义定时任务,支持通过cron表达式或时间间隔两种调度方式
实现了任务添加(AddTask)、更新(UpdateTask)等功能
内部使用map管理任务,支持同名任务替换
对cron和ticker两种调度方式进行了封装,自动处理panic恢复
提供了任务日志记录功能

该工具类设计灵活,可以方便地集成到各种需要定时任务调度的Go应用中。

引入包:github.com/robfig/cron/v3我的版本 v3.0.1

直接见代码:
1、cron_job_conf.go

packagejobimport("xxx/utils"// 自定义工具包,包内容见下"fmt""log""sync""time""github.com/google/uuid""github.com/robfig/cron/v3")// 定时任务接口typeNamedCronJobTaskinterface{// 任务名称Name()string// returns a cron expression and/or a time interval.// Scheduling priority:// - If cronSpec is non-empty, it is used (interval is ignored).// - Else, if interval > 0, a ticker-based scheduler runs every 'interval'.// - Otherwise, the task is invalid.// Example:// return "*/5 * * * * *", 0 // cron every 5 seconds// return "", 2*time.Hour // ticker every 2 hours// return "0 0 * * *", 10*time.Minute // cron used (every day at 00:00)SpecOrInterval()(cronSpecstring,interval time.Duration)// 执行方法Execute()}// taskType 任务类型typetaskTypeintconst(taskTypeCron taskType=iotataskTypeTicker)// cronTask 内部任务元数据typecronTaskstruct{specstring// cron 表达式 或 "[TICKER:...]" 占位符(仅用于日志)fnfunc()id cron.EntryID// cron 用ticker*time.Ticker// ticker 用stopChanchanstruct{}// 用于安全停止 ticker goroutinetaskType taskType}var(c*cron.Cron cronTaskMap=make(map[string]*cronTask)cronTaskMapMutex=&sync.RWMutex{})// init 初始化 cron 调度器(带秒支持)funcinit(){c=cron.New(cron.WithSeconds())c.Start()log.Println("cron scheduler started")}// AddTask 添加定时任务。如果之前已存在同名任务则会被覆盖。funcAddTask(named NamedCronJobTask)(string,error){ifnamed==nil{return"",fmt.Errorf("namedCronJobTask is nil")}name:=named.Name()ifname==""{name="anonymous:"+uuid.New().String()}returnAddTaskWithName(name,named.SpecOrInterval,named.Execute)}// AddTaskWithName 添加命名任务,如果之前已存在同名任务则会被覆盖。// specOrInterval 返回 (cronSpec, interval),规则:// - 若 cronSpec != "" → 使用 cron(忽略 interval)// - 否则若 interval > 0 → 使用 ticker// - 否则返回错误funcAddTaskWithName(namestring,specOrIntervalfunc()(specstring,interval time.Duration),fnfunc(),)(string,error){iffn==nil{returnname,fmt.Errorf("task function cannot be nil")}ifspecOrInterval==nil{returnname,fmt.Errorf("specOrInterval function cannot be nil")}cronSpec,interval:=specOrInterval()ifname==""{name="anonymous:"+uuid.New().String()}ifcronSpec==""&&interval<=0{returnname,fmt.Errorf("invalid scheduling policy for task %q: must return non-empty cronSpec or interval > 0",name,)}wrappedFn:=func(){deferutils.RecoverPanic()fn()}cronTaskMapMutex.Lock()defercronTaskMapMutex.Unlock()// 替换已存在的同名任务ifold,ok:=cronTaskMap[name];ok{removeTask(old)log.Printf("Replaced existing task: %s",name)}ifcronSpec!=""{// 使用 cronentryID,err:=c.AddFunc(cronSpec,wrappedFn)iferr!=nil{returnname,fmt.Errorf("invalid cron spec %q: %w",cronSpec,err)}cronTaskMap[name]=&cronTask{spec:cronSpec,fn:fn,id:entryID,taskType:taskTypeCron,}log.Printf("Added cron task: %s (spec: %s, ID: %d)",name,cronSpec,int64(entryID))}elseifinterval>0{// 使用 tickerstopChan:=make(chanstruct{})ticker:=time.NewTicker(interval)gofunc(){deferfunc(){ifr:=recover();r!=nil{log.Printf("Recovered panic in ticker task %s: %v",name,r)}ticker.Stop()}()for{select{case<-ticker.C:wrappedFn()case<-stopChan:return}}}()cronTaskMap[name]=&cronTask{spec:fmt.Sprintf("[TICKER:%v]",interval),fn:fn,ticker:ticker,stopChan:stopChan,taskType:taskTypeTicker,}log.Printf("Added ticker task: %s (every %v)",name,interval)}returnname,nil}// AddTaskWithName 添加匿名任务,通过函数动态获取调度策略。// specOrInterval 返回 (cronSpec, interval),规则:// - 若 cronSpec != "" → 使用 cron(忽略 interval)// - 否则若 interval > 0 → 使用 ticker// - 否则返回错误funcAddTaskWithoutName(specOrIntervalfunc()(specstring,interval time.Duration),fnfunc())(string,error){name:="anonymous:"+uuid.New().String()returnAddTaskWithName(name,specOrInterval,fn)}// UpdateTask 更新任务调度策略,支持在 cron 和 ticker 之间切换。// newSpecOrInterval 应返回新的 (cronSpec, interval)。// 规则:// - 若 cronSpec != "" → 使用 cron(忽略 interval)// - 否则若 interval > 0 → 使用 ticker// - 否则返回错误//// 要求任务必须已存在。funcUpdateTask(namestring,newSpecOrIntervalfunc()(newSpecstring,newInterval time.Duration))error{ifname==""{returnfmt.Errorf("task name cannot be empty")}ifnewSpecOrInterval==nil{returnfmt.Errorf("newSpecOrInterval function cannot be nil")}newSpec,newInterval:=newSpecOrInterval()ifnewSpec==""&&newInterval<=0{returnfmt.Errorf("newSpecOrInterval returned invalid spec (%q) and interval (%v): at least one must be valid",newSpec,newInterval)}cronTaskMapMutex.Lock()defercronTaskMapMutex.Unlock()oldTask,exists:=cronTaskMap[name]if!exists{returnfmt.Errorf("task not found: %s",name)}ifoldTask.fn==nil{returnfmt.Errorf("task function is nil for %s",name)}// 安全停止旧任务removeTask(oldTask)ifnewSpec!=""{// 切换为 cronwrappedFn:=func(){deferutils.RecoverPanic()oldTask.fn()}entryID,e:=c.AddFunc(newSpec,wrappedFn)ife!=nil{delete(cronTaskMap,name)// 防止残留无效条目returnfmt.Errorf("failed to parse new cron spec %q: %w",newSpec,e)}cronTaskMap[name]=&cronTask{spec:newSpec,fn:oldTask.fn,id:entryID,taskType:taskTypeCron,}log.Printf("Updated task %s to cron (spec: %s)",name,newSpec)}elseifnewInterval>0{// 切换为 tickerstopChan:=make(chanstruct{})ticker:=time.NewTicker(newInterval)gofunc(){deferfunc(){ifr:=recover();r!=nil{log.Printf("Recovered panic in updated ticker task %s: %v",name,r)}ticker.Stop()}()for{select{case<-ticker.C:func(){deferutils.RecoverPanic()oldTask.fn()}()case<-stopChan:return}}}()cronTaskMap[name]=&cronTask{spec:fmt.Sprintf("[TICKER:%v]",newInterval),fn:oldTask.fn,ticker:ticker,stopChan:stopChan,taskType:taskTypeTicker,}log.Printf("Updated task %s to ticker (interval: %v)",name,newInterval)}returnnil}// RemoveTaskByName 删除任务(幂等:任务不存在也返回 true)funcRemoveTaskByName(namestring)bool{ifname==""{returntrue}cronTaskMapMutex.Lock()defercronTaskMapMutex.Unlock()iftask,ok:=cronTaskMap[name];ok{removeTask(task)delete(cronTaskMap,name)log.Printf("Removed task: %s",name)}returntrue}// Exists 检查任务是否存在funcExists(namestring)bool{cronTaskMapMutex.RLock()defercronTaskMapMutex.RUnlock()_,ok:=cronTaskMap[name]returnok}// GetAllTaskNames 获取所有任务名funcGetAllTaskNames()[]string{cronTaskMapMutex.RLock()defercronTaskMapMutex.RUnlock()names:=make([]string,0,len(cronTaskMap))forname:=rangecronTaskMap{names=append(names,name)}returnnames}// Stop 停止整个调度器funcStop(){log.Println("Stopping cron scheduler...")c.Stop()// 停止 cron 调度器(不再触发新任务)cronTaskMapMutex.Lock()defercronTaskMapMutex.Unlock()// 复用 removeTask 清理所有任务资源for_,task:=rangecronTaskMap{removeTask(task)}log.Println("All scheduled tasks stopped")}// (内部使用)安全移除一个任务funcremoveTask(task*cronTask){switchtask.taskType{casetaskTypeCron:c.Remove(task.id)casetaskTypeTicker:iftask.ticker!=nil{task.ticker.Stop()}iftask.stopChan!=nil{close(task.stopChan)// 唯一关闭点task.stopChan=nil// 防止重复 close(虽已加锁,但更安全)}}}

2、panic_tookit.go

packageutilsimport("log""runtime/debug")// 捕获 PanicfuncRecoverPanic(){ifr:=recover();r!=nil{log.Printf("panic recovered: %v\n%s",r,debug.Stack())// 后期这里还可以 SendMetrics、SendAlert、Sentry.Capture等指标}}// 在捕获 Panic 下运行某个方法funcRunWithRecoverPanic[T any](data T,fnfunc(T)){deferRecoverPanic()// 增加一层防火墙fn(data)}// 在捕获 Panic 下运行某个方法(无入参)funcRunWithRecoverPanic2(fnfunc()){deferRecoverPanic()// 增加一层防火墙fn()}// 在捕获 Panic 下 异步运行(goroutine) 某个方法funcAsyncRunWithRecoverPanic[T any](data T,fnfunc(T)){gofunc(d T){deferRecoverPanic()// 增加一层防火墙fn(d)}(data)}// 在捕获 Panic 下 异步运行(goroutine) 某个方法(无入参)funcAsyncRunWithRecoverPanic2(fnfunc()){gofunc(){deferRecoverPanic()// 增加一层防火墙fn()}()}

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询