Routines

Routines was a fixed number thread pool to process the user task, and it would respawn a corresponding new thread when panic. It supports the sync process mode that would wait for the response, as well as the async mode, which would not wait for the response. The most commonly used scenario was to control the max concurrency when processing jobs.

Example 1: Process Tasks

For the full example, see: examples/counter

type incrProcessor struct{}

func (p *incrProcessor) Process(ctx context.Context, i interface{}) (interface{}, error) {
   v := i.(*atomic.Int32)
   v.Inc()
   return v, nil
}

func main() {
   // the pool would spawn 3 routines to process jobs and respawn
   // a new one when panic
   pool, err := routines.NewPool(3, &incrProcessor{})
   if err != nil {
      panic(err)
   }
   // Start the routine pool
   _ = pool.Start()

   // Send the counter task to process in other routines
   var wg sync.WaitGroup
   v := atomic.NewInt32(0)
   wg.Add(10)
   for i := 0; i < 10; i++ {
      go func() {
         defer wg.Done()
         for j := 0; j < 10; j++ {
            _, _ = pool.Process(context.Background(), v)
         }
      }()
   }
   wg.Wait()

   fmt.Println("Total: ", v.Load())
   // Stop the routine pool and wait for all routines exited
   _ = pool.Stop()
   pool.Wait()
}

Example 2: Async Process Tasks

For the full example, see: examples/counter

type incrProcessor struct{}

func (p *incrProcessor) Process(ctx context.Context, i interface{}) (interface{}, error) {
   v := i.(*atomic.Int32)
   v.Inc()
   return v.Load(), nil
}

func main() {
   // the pool would spawn 3 routines to process jobs and respawn
   // a new one when panic
   pool, err := routines.NewPool(3, &incrProcessor{})
   if err != nil {
      panic(err)
   }
   // Start the routine pool
   _ = pool.Start()

   // Send the counter task to process in other routines
   futures := make([]*routines.Future, 10)
   v := atomic.NewInt32(0)
   for i := 0; i < 10; i++ {
      futures[i], _ = pool.AsyncProcess(context.Background(), v)
   }
   for _, future := range futures {
      val, err := future.Get(context.Background())
      fmt.Println(val, err)
   }

   fmt.Println("Total: ", v.Load())
   // Stop the routine pool and wait for all routines exited
   _ = pool.Stop()
   pool.Wait()
}

License

Routines is under the MIT license. See the LICENSE file for details.

GitHub

https://github.com/git-hulk/routines