global_wait_group.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package common
  2. /**
  3. import中必须有git.ikuban.com/server/kratos-utils/common
  4. import (
  5. "context"
  6. "fmt"
  7. "time"
  8. "git.ikuban.com/server/kratos-utils/common"
  9. )
  10. func main() {
  11. test()
  12. go test()
  13. common.GlobalWaitGroup.Stop()
  14. }
  15. func test() {
  16. ok := common.GlobalWaitGroup.Add(1)
  17. if !ok {
  18. return
  19. }
  20. common.GlobalWaitGroup.SetTimeout(time.Now().Unix() + 12)
  21. go func() {
  22. fmt.Println("任务开始")
  23. time.Sleep(time.Second * 11)
  24. fmt.Println("任务完成")
  25. common.GlobalWaitGroup.Done()
  26. }()
  27. }
  28. */
  29. import (
  30. "fmt"
  31. "sync"
  32. "sync/atomic"
  33. "time"
  34. )
  35. var GlobalWaitGroup *globalWaitGroup
  36. func init() {
  37. GlobalWaitGroup = NewGlobalWaitGroup()
  38. }
  39. type globalWaitGroup struct {
  40. wg sync.WaitGroup
  41. timeout int64 //超时时间, 时间戳
  42. isStop bool
  43. lock sync.Mutex
  44. ok chan bool
  45. num int64
  46. taskMap sync.Map
  47. taskLock sync.Mutex
  48. }
  49. func NewGlobalWaitGroup() *globalWaitGroup {
  50. this := &globalWaitGroup{
  51. wg: sync.WaitGroup{},
  52. isStop: false,
  53. ok: make(chan bool, 1),
  54. }
  55. return this
  56. }
  57. func (this *globalWaitGroup) Stop() {
  58. this.isStop = true
  59. this.Wait()
  60. }
  61. func (this *globalWaitGroup) Wait() {
  62. go func() {
  63. this.wait()
  64. }()
  65. diff := this.timeout - time.Now().Unix()
  66. if diff <= 0 {
  67. diff = 10
  68. }
  69. select {
  70. case <-this.ok:
  71. fmt.Println("全局waitgroup,无任务结束")
  72. return
  73. case <-time.After(time.Second * time.Duration(diff)):
  74. text := fmt.Sprintf("全局waitgroup,超时结束,正在执行任务数:%d个,", this.num)
  75. if this.num > 0 {
  76. text = text + "["
  77. }
  78. this.taskMap.Range(func(key, value interface{}) bool {
  79. task := fmt.Sprintf("\"task_name:%s,num:%d\"", key, value.(int64))
  80. text = text + task + ","
  81. return true
  82. })
  83. text = text[:len(text)-1] + "]"
  84. fmt.Println(text)
  85. return
  86. }
  87. }
  88. func (this *globalWaitGroup) wait() {
  89. this.wg.Wait()
  90. this.ok <- true
  91. }
  92. func (this *globalWaitGroup) Done(taskName string) {
  93. this.wg.Done()
  94. atomic.AddInt64(&this.num, -1)
  95. num, ok := this.taskMap.Load(taskName)
  96. if !ok {
  97. return
  98. }
  99. if num.(int64) <= 1 {
  100. this.taskMap.Delete(taskName)
  101. }
  102. this.taskMap.Store(taskName, num.(int64)-1)
  103. }
  104. func (this *globalWaitGroup) SetTimeout(timeout time.Time) {
  105. if timeout.Unix() <= 0 {
  106. return
  107. }
  108. this.lock.Lock()
  109. if this.timeout < timeout.Unix() {
  110. this.timeout = timeout.Unix()
  111. }
  112. this.lock.Unlock()
  113. }
  114. func (this *globalWaitGroup) Add(taskName string, delta int) bool {
  115. if this.isStop {
  116. fmt.Println("已结束,不能在添加任务")
  117. return false
  118. }
  119. num, ok := this.taskMap.Load(taskName)
  120. if ok {
  121. this.taskMap.Store(taskName, num.(int64)+1)
  122. } else {
  123. this.taskMap.Store(taskName, int64(1))
  124. }
  125. this.wg.Add(delta)
  126. atomic.AddInt64(&this.num, 1)
  127. return true
  128. }