Skip to content
This repository has been archived by the owner on Feb 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #50 from stealthrocket/remove-log-resource-type
Browse files Browse the repository at this point in the history
timecraft: remove log resource type
  • Loading branch information
achille-roussel committed Jun 2, 2023
2 parents 209cdfe + 7875a27 commit 9585fc6
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 180 deletions.
184 changes: 80 additions & 104 deletions describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"math"
"os"
"strconv"
"strings"
"text/tabwriter"
"time"
Expand Down Expand Up @@ -36,28 +35,27 @@ Usage: timecraft describe <resource type> <resource ids...> [options]
Examples:
$ timecraft describe log e1c6ae6e-caa8-45c1-bc9b-827617347063
ID: e1c6ae6e-caa8-45c1-bc9b-827617347063
Size: 1.62 KiB/3.88 KiB +64 B (compression: 58.27%)
Start: 3h ago, Mon, 29 May 2023 23:00:41 UTC
Records: 27 (1 batch)
---
SEGMENT RECORDS BATCHES SIZE UNCOMPRESSED SIZE COMPRESSED SIZE COMPRESSION RATIO
0 27 1 1.68 KiB 3.88 KiB 1.62 KiB 58.27%
$ timecraft describe runtime 79004e7beff2
ID: sha256:79004e7beff2db76e4fab5fdff04618bb7328829bcddb856462194994c8e46f9
Runtime: timecraft
Version: devel
Options:
-c, --config Path to the timecraft configuration file (overrides TIMECRAFTCONFIG)
-h, --help Show this usage information
-o, --output format Output format, one of: text, json, yaml
-v, --verbose For text output, display more details about the resource
`

func describe(ctx context.Context, args []string) error {
var (
output = outputFormat("text")
output = outputFormat("text")
verbose = false
)

flagSet := newFlagSet("timecraft describe", describeUsage)
customVar(flagSet, &output, "o", "output")
boolVar(flagSet, &verbose, "v", "verbose")

args, err := parseFlags(flagSet, args)
if err != nil {
Expand Down Expand Up @@ -85,6 +83,11 @@ func describe(ctx context.Context, args []string) error {
return err
}

format := "%v"
if verbose {
format = "%+v"
}

var lookup func(context.Context, *timemachine.Registry, string, *configuration) (any, error)
var writer stream.WriteCloser[any]
switch output {
Expand All @@ -96,7 +99,7 @@ func describe(ctx context.Context, args []string) error {
writer = yamlprint.NewWriter[any](os.Stdout)
default:
lookup = resource.describe
writer = textprint.NewWriter[any](os.Stdout)
writer = textprint.NewWriter[any](os.Stdout, textprint.Format[any](format))
}
defer writer.Close()

Expand Down Expand Up @@ -261,22 +264,11 @@ func describeProcess(ctx context.Context, reg *timemachine.Registry, id string,
}
}

segments := reg.ListLogSegments(ctx, processID)
defer segments.Close()

i := stream.Iter[format.LogSegment](segments)
for i.Next() {
v := i.Value()
desc.log = append(desc.log, logSegment{
Number: v.Number,
Size: human.Bytes(v.Size),
CreatedAt: human.Time(v.CreatedAt.In(time.Local)),
})
}
if err := i.Err(); err != nil {
log, err := describeLog(ctx, reg, processID)
if err != nil {
return nil, err
}

desc.log = log
return desc, nil
}

Expand Down Expand Up @@ -328,7 +320,19 @@ func lookupProcess(ctx context.Context, reg *timemachine.Registry, id string, co
if err != nil {
return nil, err
}
return descriptorAndData(desc, proc), nil
log, err := describeLog(ctx, reg, proc.ID)
if err != nil {
return nil, err
}
return &struct {
Desc *format.Descriptor `json:"descriptor" yaml:"descriptor"`
Data *format.Process `json:"data" yaml:"data"`
Segments []logSegment `json:"segments" yaml:"segments"`
}{
Desc: desc,
Data: proc,
Segments: log,
}, nil
}

func lookupProfile(ctx context.Context, reg *timemachine.Registry, id string, config *configuration) (any, error) {
Expand Down Expand Up @@ -397,8 +401,12 @@ func (desc *configDescriptor) Format(w fmt.State, _ rune) {
fmt.Fprintf(w, " %s\n", arg)
}
fmt.Fprintf(w, "Env:\n")
for _, env := range desc.env {
fmt.Fprintf(w, " %s\n", env)
if w.Flag('+') {
for _, env := range desc.env {
fmt.Fprintf(w, " %s\n", env)
}
} else {
fmt.Fprintf(w, " ...\n")
}
}

Expand Down Expand Up @@ -517,7 +525,7 @@ type processDescriptor struct {
modules []moduleDescriptor
args []string
env []string
log []logSegment
log logDescriptor
}

func (desc *processDescriptor) Format(w fmt.State, _ rune) {
Expand All @@ -533,12 +541,22 @@ func (desc *processDescriptor) Format(w fmt.State, _ rune) {
fmt.Fprintf(w, " %s\n", arg)
}
fmt.Fprintf(w, "Env:\n")
for _, env := range desc.env {
fmt.Fprintf(w, " %s\n", env)
if w.Flag('+') {
for _, env := range desc.env {
fmt.Fprintf(w, " %s\n", env)
}
} else {
fmt.Fprintf(w, " ...\n")
}
fmt.Fprintf(w, "Log:\n")
for _, log := range desc.log {
fmt.Fprintf(w, " segment %d: %s, created %s (%s)\n", log.Number, log.Size, log.CreatedAt, time.Time(log.CreatedAt).Format(time.RFC1123))
if desc.log != nil {
if w.Flag('+') {
desc.log.Format(w, 's')
for _, seg := range desc.log {
seg.Format(w, 'v')
}
} else {
desc.log.Format(w, 'v')
}
}
}

Expand Down Expand Up @@ -616,6 +634,7 @@ func moduleName(module *format.Descriptor) string {
}

type recordBatch struct {
SegmentNumber int `json:"-" yaml:"-" text:"SEGMENT"`
NumRecords int `json:"numRecords" yaml:"numRecords" text:"RECORDS"`
FirstOffset int64 `json:"firstOffset" yaml:"firstOffset" text:"FIRST OFFSET"`
FirstTimestamp human.Time `json:"firstTimestamp" yaml:"firstTimestamp" text:"-"`
Expand All @@ -628,50 +647,33 @@ type recordBatch struct {
}

type logSegment struct {
Number int `json:"number" yaml:"number" text:"SEGMENT"`
Number int `json:"-" yaml:"-" text:"SEGMENT"`
NumRecords int `json:"-" yaml:"-" text:"RECORDS"`
NumBatches int `json:"-" yaml:"-" text:"BATCHES"`
Duration human.Duration `json:"-" yaml:"-" text:"DURATION"`
CreatedAt human.Time `json:"createdAt" yaml:"createdAt" text:"-"`
Size human.Bytes `json:"size" yaml:"size" text:"SIZE"`
UncompressedSize human.Bytes `json:"-" yaml:"-" text:"UNCOMPRESSED SIZE"`
CompressedSize human.Bytes `json:"-" yaml:"-" text:"COMPRESSED SIZE"`
CompressionRatio human.Ratio `json:"-" yaml:"-" text:"COMPRESSION RATIO"`
CreatedAt human.Time `json:"createdAt" yaml:"createdAt" text:"-"`
RecordBatches []recordBatch `json:"recordBatches" yaml:"recordBatches" text:"-"`
}

func (desc *logSegment) Format(w fmt.State, _ rune) {
startTime := human.Time{}
if len(desc.RecordBatches) > 0 {
startTime = desc.RecordBatches[0].FirstTimestamp
}

fmt.Fprintf(w, "Segment: %d\n", desc.Number)
fmt.Fprintf(w, "Size: %s/%s +%s (compression: %s)\n", desc.CompressedSize, desc.UncompressedSize, desc.Size-desc.CompressedSize, desc.CompressionRatio)
fmt.Fprintf(w, "Start: %s, %s\n", startTime, time.Time(startTime).Format(time.RFC1123))
fmt.Fprintf(w, "Records: %d (%d batch)\n", desc.NumRecords, len(desc.RecordBatches))
fmt.Fprintf(w, "---\n")

table := textprint.NewTableWriter[recordBatch](w)
defer table.Close()

_, _ = table.Write(desc.RecordBatches)
}

type logDescriptor struct {
ProcessID format.UUID `json:"id" yaml:"id"`
Size human.Bytes `json:"size" yaml:"size"`
StartTime human.Time `json:"startTime" yaml:"startTime"`
Segments []logSegment `json:"segments" yaml:"segments"`
}
type logDescriptor []logSegment

func (desc *logDescriptor) Format(w fmt.State, _ rune) {
func (desc logDescriptor) Format(w fmt.State, v rune) {
uncompressedSize := human.Bytes(0)
compressedSize := human.Bytes(0)
metadataSize := human.Bytes(0)
numRecords := 0
numBatches := 0
for _, seg := range desc.Segments {
for _, seg := range desc {
uncompressedSize += seg.UncompressedSize
compressedSize += seg.CompressedSize
metadataSize += seg.Size - seg.CompressedSize
Expand All @@ -680,47 +682,33 @@ func (desc *logDescriptor) Format(w fmt.State, _ rune) {
}
compressionRatio := 1 - human.Ratio(compressedSize)/human.Ratio(uncompressedSize)

fmt.Fprintf(w, "ID: %s\n", desc.ProcessID)
fmt.Fprintf(w, "Size: %s/%s +%s (compression: %s)\n", compressedSize, uncompressedSize, metadataSize, compressionRatio)
fmt.Fprintf(w, "Start: %s, %s\n", desc.StartTime, time.Time(desc.StartTime).Format(time.RFC1123))
fmt.Fprintf(w, "Records: %d (%d batch)\n", numRecords, numBatches)
fmt.Fprintf(w, "---\n")

table := textprint.NewTableWriter[logSegment](w)
defer table.Close()

_, _ = table.Write(desc.Segments)
}

func describeLog(ctx context.Context, reg *timemachine.Registry, id string, config *configuration) (any, error) {
logSegmentNumber := -1
logID, logNumber, ok := strings.Cut(id, "/")
if ok {
n, err := strconv.Atoi(logNumber)
if err != nil {
return nil, errors.New(`malformed log id (suffix is not a valid segment number)`)
}
logSegmentNumber = n
}
fmt.Fprintf(w, "Records: %d, %d batch(es), %d segment(s), %s/%s +%s (compression: %s)\n---\n",
numRecords,
numBatches,
len(desc),
compressedSize,
uncompressedSize,
metadataSize,
compressionRatio)

processID, err := uuid.Parse(logID)
if err != nil {
return nil, errors.New(`malformed process id (not a UUID)`)
if v == 'v' {
table := textprint.NewTableWriter[logSegment](w)
defer table.Close()
_, _ = table.Write(desc)
}
}

func describeLog(ctx context.Context, reg *timemachine.Registry, processID format.UUID) (logDescriptor, error) {
m, err := reg.LookupLogManifest(ctx, processID)
if err != nil {
return nil, err
}

logch := make(chan logSegment, len(m.Segments))
segch := make(chan logSegment, len(m.Segments))
errch := make(chan error, len(m.Segments))
wait := 0

for _, seg := range m.Segments {
if logSegmentNumber >= 0 && seg.Number != logSegmentNumber {
continue
}
wait++
go func(seg format.LogSegment) {
r, err := reg.ReadLogSegment(ctx, processID, seg.Number)
Expand Down Expand Up @@ -751,7 +739,7 @@ func describeLog(ctx context.Context, reg *timemachine.Registry, id string, conf
logSegment.CompressionRatio = 1 - c/u
logSegment.NumBatches = len(logSegment.RecordBatches)
logSegment.Duration = human.Duration(lastTime.Sub(time.Time(logSegment.CreatedAt)))
logch <- logSegment
segch <- logSegment
}
break
}
Expand All @@ -772,6 +760,7 @@ func describeLog(ctx context.Context, reg *timemachine.Registry, id string, conf
logSegment.CompressedSize += compressedSize
logSegment.UncompressedSize += uncompressedSize
logSegment.RecordBatches = append(logSegment.RecordBatches, recordBatch{
SegmentNumber: seg.Number,
NumRecords: numRecords,
FirstOffset: firstOffset,
FirstTimestamp: human.Time(firstTimestamp),
Expand All @@ -786,13 +775,13 @@ func describeLog(ctx context.Context, reg *timemachine.Registry, id string, conf
}(seg)
}

var logs []logSegment
var segs []logSegment
var errs []error
for wait > 0 {
wait--
select {
case log := <-logch:
logs = append(logs, log)
case seg := <-segch:
segs = append(segs, seg)
case err := <-errch:
errs = append(errs, err)
}
Expand All @@ -801,22 +790,9 @@ func describeLog(ctx context.Context, reg *timemachine.Registry, id string, conf
return nil, errors.Join(errs...)
}

if logSegmentNumber >= 0 {
return &logs[0], nil
}

slices.SortFunc(logs, func(s1, s2 logSegment) bool {
slices.SortFunc(segs, func(s1, s2 logSegment) bool {
return s1.Number < s2.Number
})

desc := &logDescriptor{
ProcessID: processID,
StartTime: human.Time(m.StartTime),
Segments: logs,
}

for _, log := range logs {
desc.Size += log.Size
}
return desc, nil
return logDescriptor(segs), nil
}
Loading

0 comments on commit 9585fc6

Please sign in to comment.