Use for loop and ticker
1// someTask function that we call periodically.
2func someTask() {
3 fmt.Println(rand.Int() * rand.Int())
4}
5
6// PeriodicTask runs someTask every 1 second.
7// If canceled goroutine should be stopped.
8func PeriodicTask(ctx context.Context) {
9 // Create a new ticker with a period of 1 second.
10 ticker := time.NewTicker(time.Second)
11 for {
12 select {
13 case <-ticker.C:
14 someTask()
15 case <-ctx.Done():
16 fmt.Println("stopping PeriodicTask")
17 ticker.Stop()
18 return
19 }
20 }
21}
22
23func main() {
24 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
25 defer cancel()
26
27 go PeriodicTask(ctx)
28
29 // Create a channel to receive signals from the operating system.
30 sigCh := make(chan os.Signal, 1)
31 signal.Notify(sigCh, syscall.SIGTERM)
32
33 // The code blocks until a signal is received (e.g. Ctrl+C).
34 <-sigCh
35}
Use errorGroup to do this
1// errFailure some custom error.
2var errFailure = errors.New("some error")
3
4func main() {
5 // Create errgroup with context.
6 group, qctx := errgroup.WithContext(context.Background())
7
8 // Run first periodic task.
9 group.Go(func() error {
10 firstTask(qctx)
11 return nil
12 })
13
14 // Run second task.
15 group.Go(func() error {
16 if err := secondTask(); err != nil {
17 return err
18 }
19 return nil
20 })
21
22 // Wait for all tasks to complete or the error to appear.
23 if err := group.Wait(); err != nil {
24 fmt.Printf("errgroup tasks ended up with an error: %v", err)
25 }
26}
27
28func firstTask(ctx context.Context) {
29 var counter int
30 for {
31 select {
32 case <-ctx.Done():
33 return
34 case <-time.After(500 * time.Millisecond):
35 fmt.Println("some task")
36 if counter > 10 {
37 return
38 }
39 counter++
40 }
41 }
42}
43
44func secondTask() error {
45 time.Sleep(3 * time.Second)
46 return errFailure
47}
batch size to go routines
1 jobWorkerBuffer := make(chan struct{}, batchSize)
2 var wg sync.WaitGroup
3 for _, it := range jobs.Items {
4 wg.Add(1) // add one job to waiting
5 jobWorkerBuffer <- struct{}{} // waiting when buffer is full
6 task := it // before enter the go routines to save don't use in routines
7 go func() {
8 defer wg.Done() // complete a job when goroute is exited
9 task.Do()
10 <-restartBuffer // release a placeholder when task is completed
11 }()
12 }
13 wg.Wait() // waiting complete of all
go routines pool
1var payLoad = make(chan sources.TaskPayload, 10) // first level cache to every receive 10 task to work
2var pool = tools.NewPool(20, PutTask) // second level cache to every 20 go routines to concurrency do it
3
4for v := range payLoad {
5 pool.Run(v)
6}
7pool.Wait()
8
9// // Create a new pool
10// pool := NewPool(10, func(data interface{}) {
11// fmt.Println(data)
12// })
13
14// // Add data to the pool
15// for i := 0; i < 100; i++ {
16// pool.Run(i)
17// }
18
19// // Wait for all goroutines to finish
20// pool.Wait()
21// Pool represents a pool of goroutines
22type Pool struct {
23 // Maximum number of goroutines allowed in the pool
24 MaxGoroutines int
25
26 // Stack of available goroutines
27 stack chan struct{}
28
29 // Function to execute
30 f func(interface{}) error
31}
32
33// NewPool creates a new Pool
34func NewPool(maxGoroutines int, f func(interface{}) error) *Pool {
35 return &Pool{
36 MaxGoroutines: maxGoroutines,
37 stack: make(chan struct{}, maxGoroutines),
38 f: f,
39 }
40}
41
42// Run starts the goroutine pool
43func (p *Pool) Run(data interface{}) {
44 // Add one goroutine to the stack
45 p.stack <- struct{}{}
46
47 // Execute the function
48 go func() {
49 p.f(data)
50
51 // Remove one goroutine from the stack
52 <-p.stack
53 }()
54}
55
56// Wait waits for all goroutines to finish
57func (p *Pool) Wait() {
58 for i := 0; i < p.MaxGoroutines; i++ {
59 p.stack <- struct{}{}
60 }
61 log.Info().Msg("Waiting for pool completed")
62}
To waiting all parallelly go routines complete and recevies results
1func main() {
2 ch1 := make(chan string)
3 ch2 := make(chan string)
4
5 go func() {
6 for {
7 ch1 <- "from ch1"
8 time.Sleep(2 * time.Second)
9 }
10 }()
11
12 go func() {
13 for {
14 ch2 <- "from ch2"
15 time.Sleep(3 * time.Second)
16 }
17 }()
18
19 go func() {
20 for {
21 select {
22 case msg1 := <-ch1:
23 fmt.Println(msg1)
24 case msg2 := <-ch2:
25 fmt.Println(msg2)
26 }
27 }
28 }()
29
30 select {} // keep the main function alive
31}
With deadline to testing
1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "time"
8)
9
10func main() {
11 // create root context
12 ctx := context.Background()
13 ch := make(chan int)
14 // takes parent context, timeout --- return new ctx and timeout
15 // set deadline of 2 secs from current time now, then timeout
16 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(3*time.Second))
17
18 go func() {
19 time.AfterFunc(5*time.Second, func() {
20 cancel()
21 ch <- 1
22 })
23 }()
24
25 select {
26 // func is taking 4 sec to response
27 case <-time.After(4 * time.Second):
28 fmt.Println("Hello")
29 // but timing out after 2 sec, result context deadline exceed
30 case <-ctx.Done():
31 log.Println("has error", ctx.Err().Error())
32 }
33 <-ch
34}
Reference