-
Notifications
You must be signed in to change notification settings - Fork 13
/
join.go
28 lines (23 loc) · 654 Bytes
/
join.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package pipeline
import "context"
type join[A, B, C any] struct {
a Processor[A, B]
b Processor[B, C]
}
func (j *join[A, B, C]) Process(ctx context.Context, a A) (C, error) {
var zero C
if b, err := j.a.Process(ctx, a); err != nil {
j.a.Cancel(a, err)
return zero, err
} else if c, err := j.b.Process(ctx, b); err != nil {
j.b.Cancel(b, err)
return zero, err
} else {
return c, nil
}
}
func (j *join[A, B, C]) Cancel(_ A, _ error) {}
// Join connects two processes where the output of the first is the input of the second
func Join[A, B, C any](a Processor[A, B], b Processor[B, C]) Processor[A, C] {
return &join[A, B, C]{a, b}
}