Skip to content

The worker pool library abstracts the setup around creating worker pools, so all you need to take care of is the actual business logic.

License

Notifications You must be signed in to change notification settings

ynori7/workerpool

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Worker Pool GoDoc Build Status Coverage Status Go Report Card

The worker pool library abstracts the setup around creating worker pools, so all you need to take care of is the actual business logic.

How it works

The work will be evenly distributed to N workers which process in parallel. Successful responses are passed back through a success channel and errors through an error channel. A callback can be specified to provide behavior for successes and errors. The actual logic of the worker is specified by providing a function which handles the logic and returns either a result or an error.

Usage

The worker pool is defined as follow:

func NewWorkerPool(
	workerCount int,
	onSuccess SuccessFunc,
	onError ErrorFunc,
	doWork DoWorkFunc,
)
  • The workerCount is the number of workers which will process jobs in parallel.
  • onSuccess is the callback which is called for each successful result
  • onError is the callback which is called for each failed result
  • doWork is the function which is called to process each job. This is the part which works concurrently.

Here is a short example:

successes := make([]int, 0)

workerPool := NewWorkerPool(
    func(result interface{}) { //On success
        r := result.(int)
        successes = append(successes, r) 
    },
    func(err error) { //On error
        log.Println(err.Error())
    },
    func(job interface{}) (result interface{}, err error) { //Do work
        j := job.(int)
        if j > 4 {
            return nil, fmt.Errorf("number too big: %d", j)
        }
        return j, nil
    })

//Do the work
if err := workerPool.Work(
	    ctx,
	    3, //The number of workers which should work in parallel
	    []int{1, 2, 3, 4, 5, 6, 7}, //The items to be processed
	); err != nil {
    log.Println(err.Error())
}

Cancelling the jobs

Sometimes you may want to stop processing early if, for example, enough results have been found. This can be done by canceling the context passed into the worker pool:

ctx, cancel := context.WithCancel(context.Background())

successes := make([]int, 0)

workerPool := NewWorkerPool(
    3, //The number of workers which should work in parallel
    func(result interface{}) { //On success
        r := result.(int)
        successes = append(successes, r) 
        if len(successes) > 3 {
            cancel() //we have enough results
        }
    },
    func(err error) { //On error
        log.Println(err.Error())
    },
    func(job interface{}) (result interface{}, err error) { //Do work
        j := job.(int)
        if j > 4 {
            return nil, fmt.Errorf("number too big: %d", j)
        }
        return j, nil
    })

//Do the work
if err := workerPool.Work(ctx, 3, []int{1, 2, 3, 4, 5, 6, 7}); err != nil {
    log.Println(err.Error())
}

In the above code, once 3 successes have been found, it will signal the worker pool to stop processing futher.

About

The worker pool library abstracts the setup around creating worker pools, so all you need to take care of is the actual business logic.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published