package common /** import中必须有git.ikuban.com/server/kratos-utils/common import ( "context" "fmt" "time" "git.ikuban.com/server/kratos-utils/common" ) func main() { test() go test() common.GlobalWaitGroup.Stop() } func test() { ok := common.GlobalWaitGroup.Add(1) if !ok { return } common.GlobalWaitGroup.SetTimeout(time.Now().Unix() + 12) go func() { fmt.Println("任务开始") time.Sleep(time.Second * 11) fmt.Println("任务完成") common.GlobalWaitGroup.Done() }() } */ import ( "fmt" "sync" "sync/atomic" "time" ) var GlobalWaitGroup *globalWaitGroup func init() { GlobalWaitGroup = NewGlobalWaitGroup() } type globalWaitGroup struct { wg sync.WaitGroup timeout int64 //超时时间, 时间戳 isStop bool lock sync.Mutex ok chan bool num int64 taskMap sync.Map taskLock sync.Mutex } func NewGlobalWaitGroup() *globalWaitGroup { this := &globalWaitGroup{ wg: sync.WaitGroup{}, isStop: false, ok: make(chan bool, 1), } return this } func (this *globalWaitGroup) Stop() { this.isStop = true this.Wait() } func (this *globalWaitGroup) Wait() { go func() { this.wait() }() diff := this.timeout - time.Now().Unix() if diff <= 0 { diff = 10 } select { case <-this.ok: fmt.Println("全局waitgroup,无任务结束") return case <-time.After(time.Second * time.Duration(diff)): text := fmt.Sprintf("全局waitgroup,超时结束,正在执行任务数:%d个,", this.num) if this.num > 0 { text = text + "[" } this.taskMap.Range(func(key, value interface{}) bool { task := fmt.Sprintf("\"task_name:%s,num:%d\"", key, value.(int64)) text = text + task + "," return true }) text = text[:len(text)-1] + "]" fmt.Println(text) return } } func (this *globalWaitGroup) wait() { this.wg.Wait() this.ok <- true } func (this *globalWaitGroup) Done(taskName string) { this.wg.Done() atomic.AddInt64(&this.num, -1) num, ok := this.taskMap.Load(taskName) if !ok { return } if num.(int64) <= 1 { this.taskMap.Delete(taskName) } this.taskMap.Store(taskName, num.(int64)-1) } func (this *globalWaitGroup) SetTimeout(timeout time.Time) { if timeout.Unix() <= 0 { return } this.lock.Lock() if this.timeout < timeout.Unix() { this.timeout = timeout.Unix() } this.lock.Unlock() } func (this *globalWaitGroup) Add(taskName string, delta int) bool { if this.isStop { fmt.Println("已结束,不能在添加任务") return false } num, ok := this.taskMap.Load(taskName) if ok { this.taskMap.Store(taskName, num.(int64)+1) } else { this.taskMap.Store(taskName, int64(1)) } this.wg.Add(delta) atomic.AddInt64(&this.num, 1) return true }