Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.execd): Add option to not restart program on error #15271

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 22 additions & 7 deletions internal/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Process struct {
ReadStdoutFn func(io.Reader)
ReadStderrFn func(io.Reader)
RestartDelay time.Duration
StopOnError bool
Log telegraf.Logger

name string
Expand All @@ -31,6 +32,8 @@ type Process struct {
pid int32
cancel context.CancelFunc
mainLoopWg sync.WaitGroup

sync.Mutex
}

// New creates a new process wrapper
Expand Down Expand Up @@ -65,10 +68,10 @@ func (p *Process) Start() error {

p.mainLoopWg.Add(1)
go func() {
defer p.mainLoopWg.Done()
if err := p.cmdLoop(ctx); err != nil {
p.Log.Errorf("Process quit with message: %v", err)
}
p.mainLoopWg.Done()
}()

return nil
Expand All @@ -81,12 +84,24 @@ func (p *Process) Stop() {
p.cancel()
}
// close stdin so the app can shut down gracefully.
if err := p.Stdin.Close(); err != nil {
if err := p.Stdin.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
p.Log.Errorf("Stdin closed with message: %v", err)
}
p.mainLoopWg.Wait()
}

func (p *Process) Pid() int {
pid := atomic.LoadInt32(&p.pid)
return int(pid)
}

func (p *Process) State() (state *os.ProcessState, running bool) {
p.Lock()
defer p.Unlock()

return p.Cmd.ProcessState, p.Cmd.ProcessState.ExitCode() == -1
}

func (p *Process) cmdStart() error {
p.Cmd = exec.Command(p.name, p.args...)

Expand Down Expand Up @@ -119,15 +134,13 @@ func (p *Process) cmdStart() error {
return nil
}

func (p *Process) Pid() int {
pid := atomic.LoadInt32(&p.pid)
return int(pid)
}

// cmdLoop watches an already running process, restarting it when appropriate.
func (p *Process) cmdLoop(ctx context.Context) error {
for {
err := p.cmdWait(ctx)
if err != nil && p.StopOnError {
return err
}
if isQuitting(ctx) {
p.Log.Infof("Process %s shut down", p.Cmd.Path)
return nil
Expand Down Expand Up @@ -184,7 +197,9 @@ func (p *Process) cmdWait(ctx context.Context) error {
wg.Done()
}()

p.Lock()
err := p.Cmd.Wait()
p.Unlock()
processCancel()
wg.Wait()
return err
Expand Down
10 changes: 7 additions & 3 deletions plugins/inputs/execd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,24 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## "SIGHUP" : Send a HUP signal. Not available on Windows. (not recommended)
## "SIGUSR1" : Send a USR1 signal. Not available on Windows.
## "SIGUSR2" : Send a USR2 signal. Not available on Windows.
signal = "none"
# signal = "none"

## Delay before the process is restarted after an unexpected termination
restart_delay = "10s"
# restart_delay = "10s"

## Buffer size used to read from the command output stream
## Optional parameter. Default is 64 Kib, minimum is 16 bytes
# buffer_size = "64Kib"

## Disable automatic restart of the program and stop if the program exits
## with an error (i.e. non-zero error code)
# stop_on_error = false

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
# data_format = "influx"
```

## Example
Expand Down
8 changes: 5 additions & 3 deletions plugins/inputs/execd/execd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ var sampleConfig string
type Execd struct {
Command []string `toml:"command"`
Environment []string `toml:"environment"`
BufferSize config.Size `toml:"buffer_size"`
Signal string `toml:"signal"`
RestartDelay config.Duration `toml:"restart_delay"`
StopOnError bool `toml:"stop_on_error"`
Log telegraf.Logger `toml:"-"`
BufferSize config.Size `toml:"buffer_size"`

process *process.Process
acc telegraf.Accumulator
Expand Down Expand Up @@ -59,10 +60,11 @@ func (e *Execd) Start(acc telegraf.Accumulator) error {
if err != nil {
return fmt.Errorf("error creating new process: %w", err)
}
e.process.Log = e.Log
e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.ReadStdoutFn = e.outputReader
e.process.ReadStderrFn = e.cmdReadErr
e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.StopOnError = e.StopOnError
e.process.Log = e.Log

if err = e.process.Start(); err != nil {
// if there was only one argument, and it contained spaces, warn the user
Expand Down
88 changes: 77 additions & 11 deletions plugins/inputs/execd/execd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestExternalInputWorks(t *testing.T) {
require.NoError(t, err)

e := &Execd{
Command: []string{exe, "-counter"},
Command: []string{exe, "-mode", "counter"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application", "METRIC_NAME=counter"},
RestartDelay: config.Duration(5 * time.Second),
Signal: "STDIN",
Expand Down Expand Up @@ -158,6 +158,62 @@ test{handler="execd",quantile="0.5"} 42.0
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
}

func TestStopOnError(t *testing.T) {
exe, err := os.Executable()
require.NoError(t, err)

plugin := &Execd{
Command: []string{exe, "-mode", "fail"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application"},
StopOnError: true,
RestartDelay: config.Duration(5 * time.Second),
Log: testutil.Logger{},
}

parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
require.NoError(t, parser.Init())
plugin.SetParser(parser)

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

require.Eventually(t, func() bool {
_, running := plugin.process.State()
return !running
}, 3*time.Second, 100*time.Millisecond)

state, running := plugin.process.State()
require.False(t, running)
require.Equal(t, 42, state.ExitCode())
}

func TestStopOnErrorSuccess(t *testing.T) {
exe, err := os.Executable()
require.NoError(t, err)

plugin := &Execd{
Command: []string{exe, "-mode", "success"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application"},
StopOnError: true,
RestartDelay: config.Duration(100 * time.Millisecond),
Log: testutil.Logger{},
}

parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
require.NoError(t, parser.Init())
plugin.SetParser(parser)

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

// Wait for at least two metric as this indicates the process was restarted
require.Eventually(t, func() bool {
return acc.NMetrics() > 1
}, 3*time.Second, 100*time.Millisecond)
}

func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric {
to := time.NewTimer(timeout)
defer to.Stop()
Expand Down Expand Up @@ -188,20 +244,32 @@ func (tm *TestMetricMaker) Log() telegraf.Logger {
return models.NewLogger("TestPlugin", "test", "")
}

var counter = flag.Bool("counter", false,
"if true, act like line input program instead of test")

func TestMain(m *testing.M) {
var mode string

flag.StringVar(&mode, "mode", "counter", "determines the output when run as mockup program")
flag.Parse()
runMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE")
if *counter && runMode == "application" {

operationMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE")
if operationMode != "application" {
// Run the normal test mode
os.Exit(m.Run())
}

// Run as a mock program
switch mode {
case "counter":
if err := runCounterProgram(); err != nil {
os.Exit(1)
}
os.Exit(0)
case "fail":
os.Exit(42)
case "success":
fmt.Println("test value=42i")
os.Exit(0)
}
code := m.Run()
os.Exit(code)
os.Exit(23)
}

func runCounterProgram() error {
Expand All @@ -216,9 +284,7 @@ func runCounterProgram() error {
for scanner.Scan() {
m := metric.New(envMetricName,
map[string]string{},
map[string]interface{}{
"count": i,
},
map[string]interface{}{"count": i},
time.Now(),
)
i++
Expand Down
10 changes: 7 additions & 3 deletions plugins/inputs/execd/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@
## "SIGHUP" : Send a HUP signal. Not available on Windows. (not recommended)
## "SIGUSR1" : Send a USR1 signal. Not available on Windows.
## "SIGUSR2" : Send a USR2 signal. Not available on Windows.
signal = "none"
# signal = "none"

## Delay before the process is restarted after an unexpected termination
restart_delay = "10s"
# restart_delay = "10s"

## Buffer size used to read from the command output stream
## Optional parameter. Default is 64 Kib, minimum is 16 bytes
# buffer_size = "64Kib"

## Disable automatic restart of the program and stop if the program exits
## with an error (i.e. non-zero error code)
# stop_on_error = false

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
# data_format = "influx"