Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

atproto/lexicon: package for working with lexicon schemas, and runtime data validation #420

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
36 changes: 36 additions & 0 deletions atproto/data/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ func parseMap(obj map[string]any) (any, error) {
return nil, fmt.Errorf("$type field must contain a non-empty string")
}
}
// legacy blob type
if len(obj) == 2 {
if _, ok := obj["mimeType"]; ok {
if _, ok := obj["cid"]; ok {
b, err := parseLegacyBlob(obj)
if err != nil {
return nil, err
}
return *b, nil
}
}
}
out := make(map[string]any, len(obj))
for k, val := range obj {
if len(k) > MAX_OBJECT_KEY_LEN {
Expand Down Expand Up @@ -213,6 +225,30 @@ func parseBlob(obj map[string]any) (*Blob, error) {
}, nil
}

func parseLegacyBlob(obj map[string]any) (*Blob, error) {
if len(obj) != 2 {
return nil, fmt.Errorf("legacy blobs expected to have 2 fields")
}
var err error
mimeType, ok := obj["mimeType"].(string)
if !ok {
return nil, fmt.Errorf("blob 'mimeType' missing or not a string")
}
cidStr, ok := obj["cid"]
if !ok {
return nil, fmt.Errorf("blob 'cid' missing")
}
c, err := cid.Parse(cidStr)
if err != nil {
return nil, fmt.Errorf("invalid CID: %w", err)
}
return &Blob{
Size: -1,
MimeType: mimeType,
Ref: CIDLink(c),
}, nil
}

func parseObject(obj map[string]any) (map[string]any, error) {
out, err := parseMap(obj)
if err != nil {
Expand Down
112 changes: 112 additions & 0 deletions atproto/lexicon/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package lexicon

import (
"encoding/json"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
)

// An aggregation of lexicon schemas, and methods for validating generic data against those schemas.
type Catalog interface {
Resolve(ref string) (*Schema, error)
}

type BaseCatalog struct {
schemas map[string]Schema
}

func NewBaseCatalog() BaseCatalog {
return BaseCatalog{
schemas: make(map[string]Schema),
}
}

func (c *BaseCatalog) Resolve(ref string) (*Schema, error) {
if ref == "" {
return nil, fmt.Errorf("tried to resolve empty string name")
}
// default to #main if name doesn't have a fragment
if !strings.Contains(ref, "#") {
ref = ref + "#main"
}
s, ok := c.schemas[ref]
if !ok {
return nil, fmt.Errorf("schema not found in catalog: %s", ref)
}
return &s, nil
}

func (c *BaseCatalog) AddSchemaFile(sf SchemaFile) error {
base := sf.ID
for frag, def := range sf.Defs {
if len(frag) == 0 || strings.Contains(frag, "#") || strings.Contains(frag, ".") {
// TODO: more validation here?
return fmt.Errorf("schema name invalid: %s", frag)
}
name := base + "#" + frag
if _, ok := c.schemas[name]; ok {
return fmt.Errorf("catalog already contained a schema with name: %s", name)
}
// "A file can have at most one definition with one of the "primary" types. Primary types should always have the name main. It is possible for main to describe a non-primary type."
switch s := def.Inner.(type) {
case SchemaRecord, SchemaQuery, SchemaProcedure, SchemaSubscription:
if frag != "main" {
return fmt.Errorf("record, query, procedure, and subscription types must be 'main', not: %s", frag)
}
case SchemaToken:
// add fully-qualified name to token
s.fullName = name
def.Inner = s
}
def.SetBase(base)
if err := def.CheckSchema(); err != nil {
return err
}
s := Schema{
ID: name,
Revision: sf.Revision,
Def: def.Inner,
}
c.schemas[name] = s
}
return nil
}

func (c *BaseCatalog) LoadDirectory(dirPath string) error {
return filepath.WalkDir(dirPath, func(p string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
if !strings.HasSuffix(p, ".json") {
return nil
}
// TODO: logging
fmt.Println(p)
f, err := os.Open(p)
if err != nil {
return err
}
defer func() { _ = f.Close() }()

b, err := io.ReadAll(f)
if err != nil {
return err
}

var sf SchemaFile
if err = json.Unmarshal(b, &sf); err != nil {
return err
}
if err = c.AddSchemaFile(sf); err != nil {
return err
}
return nil
})
}
90 changes: 90 additions & 0 deletions atproto/lexicon/cmd/lextool/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"encoding/json"
"fmt"
"io"
"log/slog"
"os"

"github.com/bluesky-social/indigo/atproto/lexicon"

"github.com/urfave/cli/v2"
)

func main() {
app := cli.App{
Name: "lex-tool",
Usage: "informal debugging CLI tool for atproto lexicons",
}
app.Commands = []*cli.Command{
&cli.Command{
Name: "parse-schema",
Usage: "parse an individual lexicon schema file (JSON)",
Action: runParseSchema,
},
&cli.Command{
Name: "load-directory",
Usage: "try recursively loading all the schemas from a directory",
Action: runLoadDirectory,
},
&cli.Command{
Name: "validate-record",
Usage: "fetch from network, validate against catalog",
Action: runValidateRecord,
},
&cli.Command{
Name: "validate-firehose",
Usage: "subscribe to a firehose, validate every known record against catalog",
Action: runValidateFirehose,
},
}
h := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})
slog.SetDefault(slog.New(h))
app.RunAndExitOnError()
}

func runParseSchema(cctx *cli.Context) error {
p := cctx.Args().First()
if p == "" {
return fmt.Errorf("need to provide path to a schema file as an argument")
}

f, err := os.Open(p)
if err != nil {
return err
}
defer func() { _ = f.Close() }()

b, err := io.ReadAll(f)
if err != nil {
return err
}

var sf lexicon.SchemaFile
if err := json.Unmarshal(b, &sf); err != nil {
return err
}
out, err := json.MarshalIndent(sf, "", " ")
if err != nil {
return err
}
fmt.Println(string(out))
return nil
}

func runLoadDirectory(cctx *cli.Context) error {
p := cctx.Args().First()
if p == "" {
return fmt.Errorf("need to provide directory path as an argument")
}

c := lexicon.NewBaseCatalog()
err := c.LoadDirectory(p)
if err != nil {
return err
}

fmt.Println("success!")
return nil
}
95 changes: 95 additions & 0 deletions atproto/lexicon/cmd/lextool/net.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import (
"context"
"fmt"
"io"
"log/slog"
"net/http"

"github.com/bluesky-social/indigo/atproto/data"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/lexicon"
"github.com/bluesky-social/indigo/atproto/syntax"

"github.com/urfave/cli/v2"
)

func runValidateRecord(cctx *cli.Context) error {
ctx := context.Background()
args := cctx.Args().Slice()
if len(args) != 2 {
return fmt.Errorf("expected two args (catalog path and AT-URI)")
}
p := args[0]
if p == "" {
return fmt.Errorf("need to provide directory path as an argument")
}

cat := lexicon.NewBaseCatalog()
err := cat.LoadDirectory(p)
if err != nil {
return err
}

aturi, err := syntax.ParseATURI(args[1])
if err != nil {
return err
}
if aturi.RecordKey() == "" {
return fmt.Errorf("need a full, not partial, AT-URI: %s", aturi)
}
dir := identity.DefaultDirectory()
ident, err := dir.Lookup(ctx, aturi.Authority())
if err != nil {
return fmt.Errorf("resolving AT-URI authority: %v", err)
}
pdsURL := ident.PDSEndpoint()
if pdsURL == "" {
return fmt.Errorf("could not resolve PDS endpoint for AT-URI account: %s", ident.DID.String())
}

slog.Info("fetching record", "did", ident.DID.String(), "collection", aturi.Collection().String(), "rkey", aturi.RecordKey().String())
url := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
pdsURL, ident.DID, aturi.Collection(), aturi.RecordKey())
resp, err := http.Get(url)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("fetch failed")
}
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

body, err := data.UnmarshalJSON(respBytes)
record, ok := body["value"].(map[string]any)
if !ok {
return fmt.Errorf("fetched record was not an object")
}

slog.Info("validating", "did", ident.DID.String(), "collection", aturi.Collection().String(), "rkey", aturi.RecordKey().String())
err = lexicon.ValidateRecordLenient(&cat, record, aturi.Collection().String())
if err != nil {
return err
}
fmt.Println("success!")
return nil
}

func runValidateFirehose(cctx *cli.Context) error {
p := cctx.Args().First()
if p == "" {
return fmt.Errorf("need to provide directory path as an argument")
}

cat := lexicon.NewBaseCatalog()
err := cat.LoadDirectory(p)
if err != nil {
return err
}

return fmt.Errorf("UNIMPLEMENTED")
}
4 changes: 4 additions & 0 deletions atproto/lexicon/docs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/*
Package atproto/lexicon provides generic Lexicon schema parsing and run-time validation.
*/
package lexicon
20 changes: 20 additions & 0 deletions atproto/lexicon/extract.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package lexicon

import (
"encoding/json"
)

// Helper type for extracting record type from JSON
type genericSchemaDef struct {
Type string `json:"type"`
}

// Parses the top-level $type field from generic atproto JSON data
func ExtractTypeJSON(b []byte) (string, error) {
var gsd genericSchemaDef
if err := json.Unmarshal(b, &gsd); err != nil {
return "", err
}

return gsd.Type, nil
}
Loading
Loading