global_wait_group.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package common
  2. /**
  3. import中必须有git.ikuban.com/server/kratos-utils/common
  4. import (
  5. "context"
  6. "fmt"
  7. "time"
  8. "git.ikuban.com/server/kratos-utils/common"
  9. )
  10. func main() {
  11. ctx := context.Background()
  12. common.GlobalWaitGroup.SetContext(ctx)
  13. test()
  14. go test()
  15. common.GlobalWaitGroup.Wait()
  16. }
  17. func test() {
  18. ok := common.GlobalWaitGroup.Add(1)
  19. if !ok {
  20. return
  21. }
  22. common.GlobalWaitGroup.SetTimeout(time.Now().Unix() + 12)
  23. go func() {
  24. fmt.Println("任务开始")
  25. time.Sleep(time.Second * 11)
  26. fmt.Println("任务完成")
  27. common.GlobalWaitGroup.Done()
  28. }()
  29. }
  30. */
  31. import (
  32. "context"
  33. "fmt"
  34. "sync"
  35. "time"
  36. )
  37. var GlobalWaitGroup *globalWaitGroup
  38. func init() {
  39. GlobalWaitGroup = NewGlobalWaitGroup(context.Background())
  40. }
  41. type globalWaitGroup struct {
  42. wg sync.WaitGroup
  43. ctx context.Context
  44. timeout int64 //超时时间, 时间戳
  45. isStop bool
  46. lock sync.Mutex
  47. ok chan bool
  48. }
  49. func NewGlobalWaitGroup(ctx context.Context) *globalWaitGroup {
  50. this := &globalWaitGroup{
  51. wg: sync.WaitGroup{},
  52. ctx: ctx,
  53. isStop: false,
  54. ok: make(chan bool, 1),
  55. }
  56. go func() {
  57. <-this.ctx.Done()
  58. this.isStop = true
  59. }()
  60. return this
  61. }
  62. func (this *globalWaitGroup) SetContext(ctx context.Context) {
  63. this.ctx, _ = context.WithCancel(ctx)
  64. }
  65. func (this *globalWaitGroup) Stop() {
  66. this.isStop = true
  67. this.Wait()
  68. }
  69. func (this *globalWaitGroup) Wait() {
  70. go func() {
  71. this.wait()
  72. }()
  73. diff := this.timeout - time.Now().Unix()
  74. if diff <= 0 {
  75. diff = 10
  76. }
  77. select {
  78. case <-this.ok:
  79. fmt.Println("没有任务而结束")
  80. return
  81. case <-time.After(time.Second * time.Duration(diff)):
  82. fmt.Println("超时结束")
  83. return
  84. }
  85. }
  86. func (this *globalWaitGroup) wait() {
  87. this.wg.Wait()
  88. this.ok <- true
  89. }
  90. func (this *globalWaitGroup) Done() {
  91. this.wg.Done()
  92. }
  93. func (this *globalWaitGroup) SetTimeout(timeout int64) {
  94. if timeout <= 0 {
  95. return
  96. }
  97. this.lock.Lock()
  98. if this.timeout < timeout {
  99. this.timeout = timeout
  100. }
  101. this.lock.Unlock()
  102. }
  103. func (this *globalWaitGroup) Add(delta int) bool {
  104. if this.isStop {
  105. fmt.Println("已结束,不能在添加任务")
  106. return false
  107. }
  108. this.wg.Add(delta)
  109. return true
  110. }