Concurrency patterns in golang

Use for loop and ticker

 1// someTask function that we call periodically.
 2func someTask() {
 3  fmt.Println(rand.Int() * rand.Int())
 4}
 5
 6// PeriodicTask runs someTask every 1 second.
 7// If canceled goroutine should be stopped.
 8func PeriodicTask(ctx context.Context) {
 9  // Create a new ticker with a period of 1 second.
10  ticker := time.NewTicker(time.Second)
11  for {
12    select {
13    case <-ticker.C:
14      someTask()
15    case <-ctx.Done():
16      fmt.Println("stopping PeriodicTask")
17      ticker.Stop()
18      return
19    }
20  }
21}
22
23func main() {
24  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
25  defer cancel()
26
27  go PeriodicTask(ctx)
28
29  // Create a channel to receive signals from the operating system.
30  sigCh := make(chan os.Signal, 1)
31  signal.Notify(sigCh, syscall.SIGTERM)
32
33  // The code blocks until a signal is received (e.g. Ctrl+C).
34  <-sigCh
35}

Use errorGroup to do this

 1// errFailure some custom error.
 2var errFailure = errors.New("some error")
 3
 4func main() {
 5  // Create errgroup with context.
 6  group, qctx := errgroup.WithContext(context.Background())
 7
 8  // Run first periodic task.
 9  group.Go(func() error {
10     firstTask(qctx)
11     return nil
12  })
13
14  // Run second task.
15  group.Go(func() error {
16     if err := secondTask(); err != nil {
17        return err
18     }
19     return nil
20  })
21
22  // Wait for all tasks to complete or the error to appear.
23  if err := group.Wait(); err != nil {
24     fmt.Printf("errgroup tasks ended up with an error: %v", err)
25  }
26}
27
28func firstTask(ctx context.Context) {
29  var counter int
30  for {
31     select {
32     case <-ctx.Done():
33        return
34     case <-time.After(500 * time.Millisecond):
35        fmt.Println("some task")
36        if counter > 10 {
37            return
38        }
39        counter++
40     }
41  }
42}
43
44func secondTask() error {
45  time.Sleep(3 * time.Second)
46  return errFailure
47}

batch size to go routines

 1  jobWorkerBuffer := make(chan struct{}, batchSize)
 2	var wg sync.WaitGroup
 3	for _, it := range jobs.Items {
 4		wg.Add(1) // add one job to waiting
 5		jobWorkerBuffer <- struct{}{} // waiting when buffer is full
 6		task := it // before enter the go routines to save don't use in routines
 7		go func() {
 8			defer wg.Done() // complete a job when goroute is exited
 9			task.Do()
10			<-restartBuffer // release a placeholder when task is completed
11		}()
12	}
13	wg.Wait() // waiting complete of all 

go routines pool

 1var payLoad = make(chan sources.TaskPayload, 10) // first level cache to every receive 10 task to work
 2var pool  = tools.NewPool(20, PutTask) // second level cache to every 20 go routines to concurrency do it
 3 
 4for v := range payLoad {
 5		pool.Run(v)
 6}
 7pool.Wait()
 8
 9// // Create a new pool
10// pool := NewPool(10, func(data interface{}) {
11// 	fmt.Println(data)
12// })
13
14// // Add data to the pool
15// for i := 0; i < 100; i++ {
16// 	pool.Run(i)
17// }
18
19// // Wait for all goroutines to finish
20// pool.Wait()
21// Pool represents a pool of goroutines
22type Pool struct {
23	// Maximum number of goroutines allowed in the pool
24	MaxGoroutines int
25
26	// Stack of available goroutines
27	stack chan struct{}
28
29	// Function to execute
30	f func(interface{}) error
31}
32
33// NewPool creates a new Pool
34func NewPool(maxGoroutines int, f func(interface{}) error) *Pool {
35	return &Pool{
36		MaxGoroutines: maxGoroutines,
37		stack:         make(chan struct{}, maxGoroutines),
38		f:             f,
39	}
40}
41
42// Run starts the goroutine pool
43func (p *Pool) Run(data interface{}) {
44	// Add one goroutine to the stack
45	p.stack <- struct{}{}
46
47	// Execute the function
48	go func() {
49		p.f(data)
50
51		// Remove one goroutine from the stack
52		<-p.stack
53	}()
54}
55
56// Wait waits for all goroutines to finish
57func (p *Pool) Wait() {
58	for i := 0; i < p.MaxGoroutines; i++ {
59		p.stack <- struct{}{}
60	}
61	log.Info().Msg("Waiting for pool completed")
62}

To waiting all parallelly go routines complete and recevies results

 1func main() {
 2 ch1 := make(chan string)
 3 ch2 := make(chan string)
 4
 5 go func() {
 6  for {
 7   ch1 <- "from ch1"
 8   time.Sleep(2 * time.Second)
 9  }
10 }()
11
12 go func() {
13  for {
14   ch2 <- "from ch2"
15   time.Sleep(3 * time.Second)
16  }
17 }()
18
19 go func() {
20  for {
21   select {
22   case msg1 := <-ch1:
23    fmt.Println(msg1)
24   case msg2 := <-ch2:
25    fmt.Println(msg2)
26   }
27  }
28 }()
29
30 select {} // keep the main function alive
31}

With deadline to testing

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log"
 7	"time"
 8)
 9
10func main() {
11	// create root context
12	ctx := context.Background()
13	ch := make(chan int)
14	// takes parent context, timeout --- return new ctx and timeout
15	// set deadline of 2 secs from current time now, then timeout
16	ctx, cancel := context.WithDeadline(ctx, time.Now().Add(3*time.Second))
17
18	go func() {
19		time.AfterFunc(5*time.Second, func() {
20			cancel()
21			ch <- 1
22		})
23	}()
24
25	select {
26	// func is taking 4 sec to response
27	case <-time.After(4 * time.Second):
28		fmt.Println("Hello")
29		// but timing out after 2 sec, result context deadline exceed
30	case <-ctx.Done():
31		log.Println("has error", ctx.Err().Error())
32	}
33	<-ch
34}

Reference