Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processors.parser): Add base64 decode for fields #15328

Merged
merged 3 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/processors/parser/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## The name of the fields whose value will be parsed.
parse_fields = ["message"]

## Fields to base64 decode.
## These fields do not need to be specified in parse_fields.
## Fields specified here will have base64 decode applied to them.
# parse_fields_base64 = []

## The name of the tags whose value will be parsed.
# parse_tags = []

Expand Down
75 changes: 48 additions & 27 deletions plugins/processors/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package parser
import (
"bytes"
_ "embed"
"encoding/base64"
gobin "encoding/binary"
"fmt"
"slices"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -19,6 +21,7 @@ type Parser struct {
DropOriginal bool `toml:"drop_original"`
Merge string `toml:"merge"`
ParseFields []string `toml:"parse_fields"`
Base64Fields []string `toml:"parse_fields_base64"`
ParseTags []string `toml:"parse_tags"`
Log telegraf.Logger `toml:"-"`
parser telegraf.Parser
Expand Down Expand Up @@ -53,39 +56,57 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
}

// parse fields
for _, key := range p.ParseFields {
for _, field := range metric.FieldList() {
if field.Key != key {
continue
}
value, err := p.toBytes(field.Value)
if err != nil {
p.Log.Errorf("could not convert field %s: %v; skipping", key, err)
continue
}
fromFieldMetric, err := p.parser.Parse(value)
for _, field := range metric.FieldList() {
plain := slices.Contains(p.ParseFields, field.Key)
b64 := slices.Contains(p.Base64Fields, field.Key)

if !plain && !b64 {
continue
}

if plain && b64 {
p.Log.Errorf("field %s is listed in both parse fields and base64 fields; skipping", field.Key)
continue
}

value, err := p.toBytes(field.Value)
if err != nil {
p.Log.Errorf("could not convert field %s: %v; skipping", field.Key, err)
continue
}

if b64 {
decoded := make([]byte, base64.StdEncoding.DecodedLen(len(value)))
n, err := base64.StdEncoding.Decode(decoded, value)
if err != nil {
p.Log.Errorf("could not parse field %s: %v", key, err)
p.Log.Errorf("could not decode base64 field %s: %v; skipping", field.Key, err)
continue
}
value = decoded[:n]
}

for _, m := range fromFieldMetric {
// The parser get the parent plugin's name as
// default measurement name. Thus, in case the
// parsed metric does not provide a name itself,
// the parser will return 'parser' as we are in
// processors.parser. In those cases we want to
// keep the original metric name.
if m.Name() == "" || m.Name() == "parser" {
m.SetName(metric.Name())
}
}
fromFieldMetric, err := p.parser.Parse(value)
if err != nil {
p.Log.Errorf("could not parse field %s: %v", field.Key, err)
continue
}

// multiple parsed fields shouldn't create multiple
// metrics so we'll merge tags/fields down into one
// prior to returning.
newMetrics = append(newMetrics, fromFieldMetric...)
for _, m := range fromFieldMetric {
// The parser get the parent plugin's name as
// default measurement name. Thus, in case the
// parsed metric does not provide a name itself,
// the parser will return 'parser' as we are in
// processors.parser. In those cases we want to
// keep the original metric name.
if m.Name() == "" || m.Name() == "parser" {
m.SetName(metric.Name())
}
}

// multiple parsed fields shouldn't create multiple
// metrics so we'll merge tags/fields down into one
// prior to returning.
newMetrics = append(newMetrics, fromFieldMetric...)
}

// parse tags
Expand Down
130 changes: 130 additions & 0 deletions plugins/processors/parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestApply(t *testing.T) {
name string
parseFields []string
parseTags []string
parseBase64 []string
parser telegraf.Parser
dropOriginal bool
merge string
Expand Down Expand Up @@ -708,6 +709,103 @@ func TestApply(t *testing.T) {
time.Unix(1593287020, 0)),
},
},
{
name: "test base 64 field single",
parseBase64: []string{"sample"},
dropOriginal: true,
parser: &json.Parser{
TagKeys: []string{
"text",
},
},
input: metric.New(
"singleField",
map[string]string{
"some": "tag",
},
map[string]interface{}{
"sample": `eyJ0ZXh0IjogInRlc3QgYmFzZTY0In0=`,
},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"singleField",
map[string]string{
"text": "test base64",
},
map[string]interface{}{},
time.Unix(0, 0)),
},
},
{
name: "parse two base64 fields",
parseBase64: []string{"field_1", "field_2"},
dropOriginal: true,
parser: &json.Parser{
TagKeys: []string{"lvl", "msg", "err", "fatal"},
},
input: metric.New(
"bigMeasure",
map[string]string{},
map[string]interface{}{
"field_1": `eyJsdmwiOiJpbmZvIiwibXNnIjoiaHR0cCByZXF1ZXN0In0=`,
"field_2": `eyJlcnIiOiJmYXRhbCIsImZhdGFsIjoic2VjdXJpdHkgdGhyZWF0In0=`,
},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"bigMeasure",
map[string]string{
"lvl": "info",
"msg": "http request",
},
map[string]interface{}{},
time.Unix(0, 0)),
metric.New(
"bigMeasure",
map[string]string{
"err": "fatal",
"fatal": "security threat",
},
map[string]interface{}{},
time.Unix(0, 0)),
},
},
{
name: "parse two fields, one base64",
parseFields: []string{"field_2"},
parseBase64: []string{"field_1"},
dropOriginal: true,
parser: &json.Parser{
TagKeys: []string{"lvl", "msg", "err", "fatal"},
},
input: metric.New(
"bigMeasure",
map[string]string{},
map[string]interface{}{
"field_1": `eyJsdmwiOiJpbmZvIiwibXNnIjoiaHR0cCByZXF1ZXN0In0=`,
"field_2": `{"err":"fatal","fatal":"security threat"}`,
},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"bigMeasure",
map[string]string{
"lvl": "info",
"msg": "http request",
},
map[string]interface{}{},
time.Unix(0, 0)),
metric.New(
"bigMeasure",
map[string]string{
"err": "fatal",
"fatal": "security threat",
},
map[string]interface{}{},
time.Unix(0, 0)),
},
},
}

for _, tt := range tests {
Expand All @@ -718,6 +816,7 @@ func TestApply(t *testing.T) {
plugin := Parser{
ParseFields: tt.parseFields,
ParseTags: tt.parseTags,
Base64Fields: tt.parseBase64,
DropOriginal: tt.dropOriginal,
Merge: tt.merge,
Log: testutil.Logger{Name: "processor.parser"},
Expand Down Expand Up @@ -812,6 +911,37 @@ func TestBadApply(t *testing.T) {
}
}

func TestBase64FieldValidation(t *testing.T) {
testMetric := metric.New(
"test",
map[string]string{},
map[string]interface{}{
"b": `eyJsdmwiOiJpbmZvIiwibXNnIjoiaHR0cCByZXF1ZXN0In0=`,
},
time.Unix(0, 0))

testLogger := &testutil.CaptureLogger{}
plugin := &Parser{
ParseFields: []string{"a"},
Base64Fields: []string{"b"},
Log: testLogger,
}
plugin.SetParser(&json.Parser{})
require.NoError(t, plugin.Init())
plugin.Apply(testMetric)
require.Empty(t, testLogger.Errors())

plugin = &Parser{
ParseFields: []string{"b"},
Base64Fields: []string{"b"},
Log: testLogger,
}
plugin.SetParser(&json.Parser{})
require.NoError(t, plugin.Init())
plugin.Apply(testMetric)
require.NotEmpty(t, testLogger.Errors())
}

func TestTracking(t *testing.T) {
var testCases = []struct {
name string
Expand Down
5 changes: 5 additions & 0 deletions plugins/processors/parser/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
## The name of the fields whose value will be parsed.
parse_fields = ["message"]

## Fields to base64 decode.
## These fields do not need to be specified in parse_fields.
## Fields specified here will have base64 decode applied to them.
# parse_fields_base64 = []

## The name of the tags whose value will be parsed.
# parse_tags = []

Expand Down