| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 | package common/**import中必须有git.ikuban.com/server/kratos-utils/commonimport (	"context"	"fmt"	"time"	"git.ikuban.com/server/kratos-utils/common")func main() {	test()	go test()	common.GlobalWaitGroup.Stop()}func test() {	ok := common.GlobalWaitGroup.Add(1)	if !ok {		return	}	common.GlobalWaitGroup.SetTimeout(time.Now().Unix() + 12)	go func() {		fmt.Println("任务开始")		time.Sleep(time.Second * 11)		fmt.Println("任务完成")		common.GlobalWaitGroup.Done()	}()}*/import (	"fmt"	"sync"	"sync/atomic"	"time")var GlobalWaitGroup *globalWaitGroupfunc init() {	GlobalWaitGroup = NewGlobalWaitGroup()}type globalWaitGroup struct {	wg       sync.WaitGroup	timeout  int64 //超时时间, 时间戳	isStop   bool	lock     sync.Mutex	ok       chan bool	num      int64	taskMap  sync.Map	taskLock sync.Mutex}func NewGlobalWaitGroup() *globalWaitGroup {	this := &globalWaitGroup{		wg:     sync.WaitGroup{},		isStop: false,		ok:     make(chan bool, 1),	}	return this}func (this *globalWaitGroup) Stop() {	this.isStop = true	this.Wait()}func (this *globalWaitGroup) Wait() {	go func() {		this.wait()	}()	diff := this.timeout - time.Now().Unix()	if diff <= 0 {		diff = 10	}	select {	case <-this.ok:		fmt.Println("全局waitgroup,无任务结束")		return	case <-time.After(time.Second * time.Duration(diff)):		text := fmt.Sprintf("全局waitgroup,超时结束,正在执行任务数:%d个,", this.num)		if this.num > 0 {			text = text + "["		}		this.taskMap.Range(func(key, value interface{}) bool {			task := fmt.Sprintf("\"task_name:%s,num:%d\"", key, value.(int64))			text = text + task + ","			return true		})		text = text[:len(text)-1] + "]"		fmt.Println(text)		return	}}func (this *globalWaitGroup) wait() {	this.wg.Wait()	this.ok <- true}func (this *globalWaitGroup) Done(taskName string) {	this.wg.Done()	atomic.AddInt64(&this.num, -1)	num, ok := this.taskMap.Load(taskName)	if !ok {		return	}	if num.(int64) <= 1 {		this.taskMap.Delete(taskName)	}	this.taskMap.Store(taskName, num.(int64)-1)}func (this *globalWaitGroup) SetTimeout(timeout time.Time) {	if timeout.Unix() <= 0 {		return	}	this.lock.Lock()	if this.timeout < timeout.Unix() {		this.timeout = timeout.Unix()	}	this.lock.Unlock()}func (this *globalWaitGroup) Add(taskName string, delta int) bool {	if this.isStop {		fmt.Println("已结束,不能在添加任务")		return false	}	num, ok := this.taskMap.Load(taskName)	if ok {		this.taskMap.Store(taskName, num.(int64)+1)	} else {		this.taskMap.Store(taskName, int64(1))	}	this.wg.Add(delta)	atomic.AddInt64(&this.num, 1)	return true}
 |