Skip to content

cilium/workerpool

Repository files navigation

Workerpool

Go Reference CI Go Report Card

Package workerpool implements a concurrency limiting worker pool. Worker routines are spawned on demand as tasks are submitted; up to the configured limit of concurrent workers.

When the limit of concurrently running workers is reached, submitting a task blocks until a worker is able to pick it up. This behavior is intentional as it prevents from accumulating tasks which could grow unbounded. Therefore, it is the responsibility of the caller to queue up tasks if that's the intended behavior.

One caveat is that while the number of concurrently running workers is limited, task results are not and they accumulate until they are collected. Therefore, if a large number of tasks can be expected, the workerpool should be periodically drained (e.g. every 10k tasks).

This package is mostly useful when tasks are CPU bound and spawning too many routines would be detrimental to performance. It features a straightforward API and no external dependencies. See the section below for a usage example.

Example

package main

import (
	"context"
	"fmt"
	"os"
	"runtime"

	"github.com/cilium/workerpool"
)

// IsPrime returns true if n is prime, false otherwise.
func IsPrime(n int64) bool {
	if n < 2 {
		return false
	}
	for p := int64(2); p*p <= n; p++ {
		if n%p == 0 {
			return false
		}
	}
	return true
}

func main() {
	wp := workerpool.New(runtime.NumCPU())
	for i, n := 0, int64(1_000_000_000_000_000_000); n < 1_000_000_000_000_000_100; i, n = i+1, n+1 {
		n := n // https://golang.org/doc/faq#closures_and_goroutines
		id := fmt.Sprintf("task #%d", i)
		// Use Submit to submit tasks for processing. Submit blocks when no
		// worker is available to pick up the task.
		err := wp.Submit(id, func(_ context.Context) error {
			fmt.Println("isprime", n)
			if IsPrime(n) {
				fmt.Println(n, "is prime!")
			}
			return nil
		})
		// Submit fails when the pool is closed (ErrClosed) or being drained
		// (ErrDrained). Check for the error when appropriate.
		if err != nil {
			fmt.Fprintln(os.Stderr, err)
			return
		}
	}

	// Drain prevents submitting new tasks and blocks until all submitted tasks
	// complete.
	tasks, err := wp.Drain()
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}

	// Iterating over the results is useful if non-nil errors can be expected.
	for _, task := range tasks {
		// Err returns the error that the task returned after execution.
		if err := task.Err(); err != nil {
			fmt.Println("task", task, "failed:", err)
		}
	}

	// Close should be called once the worker pool is no longer necessary.
	if err := wp.Close(); err != nil {
		fmt.Fprintln(os.Stderr, err)
	}
}