|
|
@@ -38,6 +38,7 @@ func test() {
|
|
|
import (
|
|
|
"fmt"
|
|
|
"sync"
|
|
|
+ "sync/atomic"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
@@ -48,11 +49,14 @@ func init() {
|
|
|
}
|
|
|
|
|
|
type globalWaitGroup struct {
|
|
|
- wg sync.WaitGroup
|
|
|
- timeout int64 //超时时间, 时间戳
|
|
|
- isStop bool
|
|
|
- lock sync.Mutex
|
|
|
- ok chan bool
|
|
|
+ wg sync.WaitGroup
|
|
|
+ timeout int64 //超时时间, 时间戳
|
|
|
+ isStop bool
|
|
|
+ lock sync.Mutex
|
|
|
+ ok chan bool
|
|
|
+ num int64
|
|
|
+ taskMap sync.Map
|
|
|
+ taskLock sync.Mutex
|
|
|
}
|
|
|
|
|
|
func NewGlobalWaitGroup() *globalWaitGroup {
|
|
|
@@ -80,9 +84,20 @@ func (this *globalWaitGroup) Wait() {
|
|
|
}
|
|
|
select {
|
|
|
case <-this.ok:
|
|
|
+ fmt.Println("全局waitgroup,无任务结束")
|
|
|
return
|
|
|
case <-time.After(time.Second * time.Duration(diff)):
|
|
|
- fmt.Println("全局waitgroup,超时结束")
|
|
|
+ text := fmt.Sprintf("全局waitgroup,超时结束,正在执行任务数:%d个,", this.num)
|
|
|
+ if this.num > 0 {
|
|
|
+ text = text + "["
|
|
|
+ }
|
|
|
+ this.taskMap.Range(func(key, value interface{}) bool {
|
|
|
+ task := fmt.Sprintf("\"task_name:%s,num:%d\"", key, value.(int64))
|
|
|
+ text = text + task + ","
|
|
|
+ return true
|
|
|
+ })
|
|
|
+ text = text[:len(text)-1] + "]"
|
|
|
+ fmt.Println(text)
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
@@ -92,8 +107,17 @@ func (this *globalWaitGroup) wait() {
|
|
|
this.ok <- true
|
|
|
}
|
|
|
|
|
|
-func (this *globalWaitGroup) Done() {
|
|
|
+func (this *globalWaitGroup) Done(taskName string) {
|
|
|
this.wg.Done()
|
|
|
+ atomic.AddInt64(&this.num, -1)
|
|
|
+ num, ok := this.taskMap.Load(taskName)
|
|
|
+ if !ok {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if num.(int64) <= 1 {
|
|
|
+ this.taskMap.Delete(taskName)
|
|
|
+ }
|
|
|
+ this.taskMap.Store(taskName, num.(int64)-1)
|
|
|
}
|
|
|
|
|
|
func (this *globalWaitGroup) SetTimeout(timeout time.Time) {
|
|
|
@@ -107,11 +131,18 @@ func (this *globalWaitGroup) SetTimeout(timeout time.Time) {
|
|
|
this.lock.Unlock()
|
|
|
}
|
|
|
|
|
|
-func (this *globalWaitGroup) Add(delta int) bool {
|
|
|
+func (this *globalWaitGroup) Add(taskName string, delta int) bool {
|
|
|
if this.isStop {
|
|
|
fmt.Println("已结束,不能在添加任务")
|
|
|
return false
|
|
|
}
|
|
|
+ num, ok := this.taskMap.Load(taskName)
|
|
|
+ if ok {
|
|
|
+ this.taskMap.Store(taskName, num.(int64)+1)
|
|
|
+ } else {
|
|
|
+ this.taskMap.Store(taskName, int64(1))
|
|
|
+ }
|
|
|
this.wg.Add(delta)
|
|
|
+ atomic.AddInt64(&this.num, 1)
|
|
|
return true
|
|
|
}
|