Skip to content

Commit

Permalink
feat(inputs.gnmi): Add yang-model decoding of JSON IETF payloads (#15201
Browse files Browse the repository at this point in the history
)
  • Loading branch information
srebhan committed May 23, 2024
1 parent 12ab6df commit 4aa288b
Show file tree
Hide file tree
Showing 23 changed files with 1,386 additions and 25 deletions.
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ following works:
- github.com/olivere/elastic [MIT License](https://github.com/olivere/elastic/blob/release-branch.v7/LICENSE)
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil [Apache License 2.0](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/LICENSE)
- github.com/openconfig/gnmi [Apache License 2.0](https://github.com/openconfig/gnmi/blob/master/LICENSE)
- github.com/openconfig/goyang [Apache License 2.0](https://github.com/openconfig/goyang/blob/master/LICENSE)
- github.com/opencontainers/go-digest [Apache License 2.0](https://github.com/opencontainers/go-digest/blob/master/LICENSE)
- github.com/opencontainers/image-spec [Apache License 2.0](https://github.com/opencontainers/image-spec/blob/master/LICENSE)
- github.com/opensearch-project/opensearch-go [Apache License 2.0](https://github.com/opensearch-project/opensearch-go/blob/main/LICENSE.txt)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ require (
github.com/nwaples/tacplus v0.0.3
github.com/olivere/elastic v6.2.37+incompatible
github.com/openconfig/gnmi v0.10.0
github.com/openconfig/goyang v1.0.0
github.com/opensearch-project/opensearch-go/v2 v2.3.0
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b
github.com/openzipkin-contrib/zipkin-go-opentracing v0.5.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,7 @@ github.com/openconfig/gnmi v0.10.0 h1:kQEZ/9ek3Vp2Y5IVuV2L/ba8/77TgjdXg505QXvYmg
github.com/openconfig/gnmi v0.10.0/go.mod h1:Y9os75GmSkhHw2wX8sMsxfI7qRGAEcDh8NTa5a8vj6E=
github.com/openconfig/goyang v0.0.0-20200115183954-d0a48929f0ea/go.mod h1:dhXaV0JgHJzdrHi2l+w0fZrwArtXL7jEFoiqLEdmkvU=
github.com/openconfig/goyang v0.2.2/go.mod h1:vX61x01Q46AzbZUzG617vWqh/cB+aisc+RrNkXRd3W8=
github.com/openconfig/goyang v1.0.0 h1:nYaFu7BOAk/eQn4CgAUjgYPfp3J6CdXrBryp32E5CjI=
github.com/openconfig/goyang v1.0.0/go.mod h1:vX61x01Q46AzbZUzG617vWqh/cB+aisc+RrNkXRd3W8=
github.com/openconfig/gribi v0.1.1-0.20210423184541-ce37eb4ba92f/go.mod h1:OoH46A2kV42cIXGyviYmAlGmn6cHjGduyC2+I9d/iVs=
github.com/openconfig/grpctunnel v0.0.0-20210610163803-fde4a9dc048d/go.mod h1:x9tAZ4EwqCQ0jI8D6S8Yhw9Z0ee7/BxWQX0k0Uib5Q8=
Expand Down
263 changes: 263 additions & 0 deletions plugins/common/yangmodel/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package yangmodel

import (
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"strconv"

"github.com/openconfig/goyang/pkg/yang"
)

var (
ErrInsufficientData = errors.New("insufficient data")
ErrNotFound = errors.New("no such node")
)

type Decoder struct {
modules map[string]*yang.Module
rootNodes map[string][]yang.Node
}

func NewDecoder(paths ...string) (*Decoder, error) {
modules := yang.NewModules()
modules.ParseOptions.IgnoreSubmoduleCircularDependencies = true

var moduleFiles []string
modulePaths := paths
unresolved := paths
for {
var newlyfound []string
for _, path := range unresolved {
entries, err := os.ReadDir(path)
if err != nil {
return nil, fmt.Errorf("reading directory %q failed: %w", path, err)
}
for _, entry := range entries {
info, err := entry.Info()
if err != nil {
fmt.Printf("Couldn't get info for %q: %v", entry.Name(), err)
continue
}

if info.Mode()&os.ModeSymlink != 0 {
target, err := filepath.EvalSymlinks(entry.Name())
if err != nil {
fmt.Printf("Couldn't evaluate symbolic links for %q: %v", entry.Name(), err)
continue
}
info, err = os.Lstat(target)
if err != nil {
fmt.Printf("Couldn't stat target %v: %v", target, err)
continue
}
}

newPath := filepath.Join(path, info.Name())
if info.IsDir() {
newlyfound = append(newlyfound, newPath)
continue
}
if info.Mode().IsRegular() && filepath.Ext(info.Name()) == ".yang" {
moduleFiles = append(moduleFiles, info.Name())
}
}
}
if len(newlyfound) == 0 {
break
}

modulePaths = append(modulePaths, newlyfound...)
unresolved = newlyfound
}

// Add the module paths
modules.AddPath(modulePaths...)
for _, fn := range moduleFiles {
if err := modules.Read(fn); err != nil {
fmt.Printf("reading file %q failed: %v\n", fn, err)
}
}
if errs := modules.Process(); len(errs) > 0 {
return nil, errors.Join(errs...)
}

// Get all root nodes defined in models with their origin. We require
// those nodes to later resolve paths to YANG model leaf nodes...
moduleLUT := make(map[string]*yang.Module)
moduleRootNodes := make(map[string][]yang.Node)
for _, m := range modules.Modules {
// Check if we processed the module already
if _, found := moduleLUT[m.Name]; found {
continue
}
// Create a module mapping for easily finding modules by name
moduleLUT[m.Name] = m

// Determine the origin defined in the module
var prefix string
for _, imp := range m.Import {
if imp.Name == "openconfig-extensions" {
prefix = imp.Name
if imp.Prefix != nil {
prefix = imp.Prefix.Name
}
break
}
}

var moduleOrigin string
if prefix != "" {
for _, e := range m.Extensions {
if e.Keyword == prefix+":origin" || e.Keyword == "origin" {
moduleOrigin = e.Argument
break
}
}
}
for _, u := range m.Uses {
root, err := yang.FindNode(m, u.Name)
if err != nil {
return nil, err
}
moduleRootNodes[moduleOrigin] = append(moduleRootNodes[moduleOrigin], root)
}
}

return &Decoder{modules: moduleLUT, rootNodes: moduleRootNodes}, nil
}

func (d *Decoder) FindLeaf(name, identifier string) (*yang.Leaf, error) {
// Get module name from the element
module, found := d.modules[name]
if !found {
return nil, fmt.Errorf("cannot find module %q", name)
}

for _, grp := range module.Grouping {
for _, leaf := range grp.Leaf {
if leaf.Name == identifier {
return leaf, nil
}
}
}
return nil, ErrNotFound
}

func DecodeLeafValue(leaf *yang.Leaf, value interface{}) (interface{}, error) {
schema := leaf.Type.YangType

// Ignore all non-string values as the types seem already converted...
s, ok := value.(string)
if !ok {
return value, nil
}

switch schema.Kind {
case yang.Ybinary:
// Binary values are encodes as base64 string, so decode the string
raw, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return value, err
}

switch schema.Name {
case "ieeefloat32":
if len(raw) != 4 {
return raw, fmt.Errorf("%w, expected 4 but got %d bytes", ErrInsufficientData, len(raw))
}
return math.Float32frombits(binary.BigEndian.Uint32(raw)), nil
default:
return raw, nil
}
case yang.Yint8:
v, err := strconv.ParseInt(s, 10, 8)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return int8(v), nil
case yang.Yint16:
v, err := strconv.ParseInt(s, 10, 16)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return int16(v), nil
case yang.Yint32:
v, err := strconv.ParseInt(s, 10, 32)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return int32(v), nil
case yang.Yint64:
v, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return v, nil
case yang.Yuint8:
v, err := strconv.ParseUint(s, 10, 8)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return uint8(v), nil
case yang.Yuint16:
v, err := strconv.ParseUint(s, 10, 16)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return uint16(v), nil
case yang.Yuint32:
v, err := strconv.ParseUint(s, 10, 32)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return uint32(v), nil
case yang.Yuint64:
v, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return v, nil
case yang.Ydecimal64:
v, err := strconv.ParseFloat(s, 64)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return v, nil
}
return value, nil
}

func (d *Decoder) DecodeLeafElement(namespace, identifier string, value interface{}) (interface{}, error) {
leaf, err := d.FindLeaf(namespace, identifier)
if err != nil {
return nil, fmt.Errorf("finding %s failed: %w", identifier, err)
}

return DecodeLeafValue(leaf, value)
}

func (d *Decoder) DecodePathElement(origin, path string, value interface{}) (interface{}, error) {
rootNodes, found := d.rootNodes[origin]
if !found || len(rootNodes) == 0 {
return value, nil
}

for _, root := range rootNodes {
node, _ := yang.FindNode(root, path)
if node == nil {
// The path does not exist in this root node
continue
}
// We do expect a leaf node...
if leaf, ok := node.(*yang.Leaf); ok {
return DecodeLeafValue(leaf, value)
}
}

return value, nil
}
5 changes: 5 additions & 0 deletions plugins/inputs/gnmi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ details on how to use them.
## adds component, component_id & sub_component_id as additional tags
# vendor_specific = []

## YANG model paths for decoding IETF JSON payloads
## Model files are loaded recursively from the given directories. Disabled if
## no models are specified.
# yang_model_paths = []

## Define additional aliases to map encoding paths to measurement names
# [inputs.gnmi.aliases]
# ifcounters = "openconfig:/interfaces/interface/state/counters"
Expand Down
13 changes: 13 additions & 0 deletions plugins/inputs/gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
internaltls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/common/yangmodel"
"github.com/influxdata/telegraf/plugins/inputs"
)

Expand Down Expand Up @@ -62,11 +63,13 @@ type GNMI struct {
EnableTLS bool `toml:"enable_tls" deprecated:"1.27.0;use 'tls_enable' instead"`
KeepaliveTime config.Duration `toml:"keepalive_time"`
KeepaliveTimeout config.Duration `toml:"keepalive_timeout"`
YangModelPaths []string `toml:"yang_model_paths"`
Log telegraf.Logger `toml:"-"`
internaltls.ClientConfig

// Internal state
internalAliases map[*pathInfo]string
decoder *yangmodel.Decoder
cancel context.CancelFunc
wg sync.WaitGroup
}
Expand Down Expand Up @@ -219,6 +222,15 @@ func (c *GNMI) Init() error {
return err
}

// Load the YANG models if specified by the user
if len(c.YangModelPaths) > 0 {
decoder, err := yangmodel.NewDecoder(c.YangModelPaths...)
if err != nil {
return fmt.Errorf("creating YANG model decoder failed: %w", err)
}
c.decoder = decoder
}

return nil
}

Expand Down Expand Up @@ -275,6 +287,7 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
trimSlash: c.TrimFieldNames,
tagPathPrefix: c.PrefixTagKeyWithPath,
guessPathStrategy: c.GuessPathStrategy,
decoder: c.decoder,
log: c.Log,
ClientParameters: keepalive.ClientParameters{
Time: time.Duration(c.KeepaliveTime),
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/gnmi/gnmi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ func TestCases(t *testing.T) {
require.Eventually(t,
func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, 1*time.Second, 100*time.Millisecond)
}, 15*time.Second, 100*time.Millisecond)
plugin.Stop()
grpcServer.Stop()
wg.Wait()
Expand Down
8 changes: 7 additions & 1 deletion plugins/inputs/gnmi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/yangmodel"
jnprHeader "github.com/influxdata/telegraf/plugins/inputs/gnmi/extensions/jnpr_gnmi_extention"
"github.com/influxdata/telegraf/selfstat"
)
Expand All @@ -44,6 +45,7 @@ type handler struct {
trimSlash bool
tagPathPrefix bool
guessPathStrategy string
decoder *yangmodel.Decoder
log telegraf.Logger
keepalive.ClientParameters
}
Expand Down Expand Up @@ -172,7 +174,11 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
var valueFields []updateField
for _, update := range response.Update.Update {
fullPath := prefix.append(update.Path)
fields, err := newFieldsFromUpdate(fullPath, update)
if update.Path.Origin != "" {
fullPath.origin = update.Path.Origin
}

fields, err := h.newFieldsFromUpdate(fullPath, update)
if err != nil {
h.log.Errorf("Processing update %v failed: %v", update, err)
}
Expand Down
8 changes: 8 additions & 0 deletions plugins/inputs/gnmi/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ func (pi *pathInfo) String() string {
return out
}

func (pi *pathInfo) Path() (origin, path string) {
if len(pi.segments) == 0 {
return pi.origin, "/"
}

return pi.origin, "/" + strings.Join(pi.segments, "/")
}

func (pi *pathInfo) Tags(pathPrefix bool) map[string]string {
tags := make(map[string]string, len(pi.keyValues))
for _, s := range pi.keyValues {
Expand Down

0 comments on commit 4aa288b

Please sign in to comment.