-
Notifications
You must be signed in to change notification settings - Fork 1
/
pooler.go
147 lines (130 loc) · 3.5 KB
/
pooler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright 2017, The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE.md file.
package main
import (
"fmt"
"strings"
"sync"
"time"
)
// poolMonitor polls the ZFS pools to ensure that they are healthy.
type poolMonitor struct {
zs *zsyncer
pool string
target execTarget
signal chan struct{}
timer *time.Timer
statusMu sync.Mutex
status poolStatus
}
type poolStatus struct {
State int // -2: unhealthy, -1: maybe unhealthy, 0: unknown, +1: maybe healthy, +2 healthy
}
func (zs *zsyncer) RegisterPoolMonitors(src dataset, dsts []dataset) {
if _, ok := zs.poolMonitors[src.PoolPath()]; !ok {
zs.registerPoolMonitor(src)
}
for _, dst := range dsts {
if _, ok := zs.poolMonitors[dst.PoolPath()]; !ok {
zs.registerPoolMonitor(dst)
}
}
}
func (zs *zsyncer) registerPoolMonitor(ds dataset) {
pool := ds.name
if i := strings.IndexByte(pool, '/'); i >= 0 {
pool = pool[:i]
}
pm := &poolMonitor{
zs: zs,
pool: pool,
target: ds.target,
signal: make(chan struct{}, 1),
timer: time.NewTimer(0),
}
id := dataset{name: pool, target: ds.target}.PoolPath()
if _, ok := zs.poolMonitors[id]; ok {
zs.log.Fatalf("%s already registered", id)
}
zs.poolMonitors[id] = pm
}
func (pm *poolMonitor) Status() poolStatus {
pm.statusMu.Lock()
defer pm.statusMu.Unlock()
return pm.status
}
func (pm *poolMonitor) Run() {
// Cache the executor for efficiency purposes since the pool monitor
// checks for the status relatively frequently.
var exec *executor
tryCloseExec := func() {
if exec != nil {
exec.Close()
exec = nil
}
}
defer tryCloseExec()
var retryDelay time.Duration
for {
select {
case <-pm.signal:
case <-pm.timer.C:
case <-pm.zs.ctx.Done():
return
}
func() {
defer recoverError(func(err error) {
pm.statusMu.Lock()
switch pm.status.State {
case -2:
pm.status.State = -1 // maybe unhealthy
case +2:
pm.status.State = +1 // maybe healthy
}
pm.statusMu.Unlock()
id := dataset{name: pm.pool, target: pm.target}.PoolPath()
pm.zs.log.Printf("pool %s: unexpected error: %v", id, err)
retryDelay = timeoutAfter(retryDelay)
pm.timer.Reset(retryDelay)
tryCloseExec()
})
// Query for the pool status.
if exec == nil {
var err error
exec, err = openExecutor(pm.zs.ctx, pm.target)
checkError(err)
}
out, err := exec.Exec("zpool", "status", "-P", "-x", pm.pool)
checkError(err) // Unhealthy pools are not a exec error
// Parse the pool status.
pm.statusMu.Lock()
if strings.Contains(strings.Split(out, "\n")[0], "is healthy") {
if pm.status.State <= 0 {
id := dataset{name: pm.pool, target: pm.target}.PoolPath()
if pm.status.State < 0 {
if err := sendEmail(pm.zs.smtp, fmt.Sprintf("Pool %q became healthy", id), ""); err != nil {
pm.zs.log.Printf("unable to send email: %v", err)
}
}
pm.zs.log.Printf("pool %q is healthy", id)
}
pm.status.State = +2
} else {
if pm.status.State >= 0 {
id := dataset{name: pm.pool, target: pm.target}.PoolPath()
if pm.status.State > 0 {
if err := sendEmail(pm.zs.smtp, fmt.Sprintf("Pool %q became unhealthy", id), "<pre>"+out+"</pre>"); err != nil {
pm.zs.log.Printf("unable to send email: %v", err)
}
}
pm.zs.log.Printf("pool %q is unhealthy\n%s", id, indentLines(out))
}
pm.status.State = -2
}
pm.statusMu.Unlock()
retryDelay = 0
pm.timer.Reset(10 * time.Minute)
}()
}
}