diff --git a/format/logsegment/logsegment.fbs b/format/logsegment/logsegment.fbs index 1e48874c..c624ec71 100644 --- a/format/logsegment/logsegment.fbs +++ b/format/logsegment/logsegment.fbs @@ -9,8 +9,11 @@ table RecordBatch { // Logical offset of the first record in this batch. first_offset:long; // Monotonic timestamp of the first record in this batch, relative to the process start - // time of the function invocation time (expressed in nanoseconds). + // time (expressed in nanoseconds). first_timestamp:long; + // Monotonic timestamp of the last record in this batch, relative to the process start + // time (expressed in nanoseconds). + last_timestamp:long; // Size of the compressed records following the record batch (in bytes). compressed_size:uint; // Uncompressed size of the records (in bytes). diff --git a/format/logsegment/logsegment_generated.go b/format/logsegment/logsegment_generated.go index 5d54a1f8..19a09a8d 100644 --- a/format/logsegment/logsegment_generated.go +++ b/format/logsegment/logsegment_generated.go @@ -59,8 +59,20 @@ func (rcv *RecordBatch) MutateFirstTimestamp(n int64) bool { return rcv._tab.MutateInt64Slot(6, n) } -func (rcv *RecordBatch) CompressedSize() uint32 { +func (rcv *RecordBatch) LastTimestamp() int64 { o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *RecordBatch) MutateLastTimestamp(n int64) bool { + return rcv._tab.MutateInt64Slot(8, n) +} + +func (rcv *RecordBatch) CompressedSize() uint32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) if o != 0 { return rcv._tab.GetUint32(o + rcv._tab.Pos) } @@ -68,11 +80,11 @@ func (rcv *RecordBatch) CompressedSize() uint32 { } func (rcv *RecordBatch) MutateCompressedSize(n uint32) bool { - return rcv._tab.MutateUint32Slot(8, n) + return rcv._tab.MutateUint32Slot(10, n) } func (rcv *RecordBatch) UncompressedSize() uint32 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) if o != 0 { return rcv._tab.GetUint32(o + rcv._tab.Pos) } @@ -80,11 +92,11 @@ func (rcv *RecordBatch) UncompressedSize() uint32 { } func (rcv *RecordBatch) MutateUncompressedSize(n uint32) bool { - return rcv._tab.MutateUint32Slot(10, n) + return rcv._tab.MutateUint32Slot(12, n) } func (rcv *RecordBatch) NumRecords() uint32 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) if o != 0 { return rcv._tab.GetUint32(o + rcv._tab.Pos) } @@ -92,11 +104,11 @@ func (rcv *RecordBatch) NumRecords() uint32 { } func (rcv *RecordBatch) MutateNumRecords(n uint32) bool { - return rcv._tab.MutateUint32Slot(12, n) + return rcv._tab.MutateUint32Slot(14, n) } func (rcv *RecordBatch) Checksum() uint32 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) if o != 0 { return rcv._tab.GetUint32(o + rcv._tab.Pos) } @@ -104,11 +116,11 @@ func (rcv *RecordBatch) Checksum() uint32 { } func (rcv *RecordBatch) MutateChecksum(n uint32) bool { - return rcv._tab.MutateUint32Slot(14, n) + return rcv._tab.MutateUint32Slot(16, n) } func (rcv *RecordBatch) Compression() types.Compression { - o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(18)) if o != 0 { return types.Compression(rcv._tab.GetUint32(o + rcv._tab.Pos)) } @@ -116,11 +128,11 @@ func (rcv *RecordBatch) Compression() types.Compression { } func (rcv *RecordBatch) MutateCompression(n types.Compression) bool { - return rcv._tab.MutateUint32Slot(16, uint32(n)) + return rcv._tab.MutateUint32Slot(18, uint32(n)) } func RecordBatchStart(builder *flatbuffers.Builder) { - builder.StartObject(7) + builder.StartObject(8) } func RecordBatchAddFirstOffset(builder *flatbuffers.Builder, firstOffset int64) { builder.PrependInt64Slot(0, firstOffset, 0) @@ -128,20 +140,23 @@ func RecordBatchAddFirstOffset(builder *flatbuffers.Builder, firstOffset int64) func RecordBatchAddFirstTimestamp(builder *flatbuffers.Builder, firstTimestamp int64) { builder.PrependInt64Slot(1, firstTimestamp, 0) } +func RecordBatchAddLastTimestamp(builder *flatbuffers.Builder, lastTimestamp int64) { + builder.PrependInt64Slot(2, lastTimestamp, 0) +} func RecordBatchAddCompressedSize(builder *flatbuffers.Builder, compressedSize uint32) { - builder.PrependUint32Slot(2, compressedSize, 0) + builder.PrependUint32Slot(3, compressedSize, 0) } func RecordBatchAddUncompressedSize(builder *flatbuffers.Builder, uncompressedSize uint32) { - builder.PrependUint32Slot(3, uncompressedSize, 0) + builder.PrependUint32Slot(4, uncompressedSize, 0) } func RecordBatchAddNumRecords(builder *flatbuffers.Builder, numRecords uint32) { - builder.PrependUint32Slot(4, numRecords, 0) + builder.PrependUint32Slot(5, numRecords, 0) } func RecordBatchAddChecksum(builder *flatbuffers.Builder, checksum uint32) { - builder.PrependUint32Slot(5, checksum, 0) + builder.PrependUint32Slot(6, checksum, 0) } func RecordBatchAddCompression(builder *flatbuffers.Builder, compression types.Compression) { - builder.PrependUint32Slot(6, uint32(compression), 0) + builder.PrependUint32Slot(7, uint32(compression), 0) } func RecordBatchEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() diff --git a/format/timecraft.go b/format/timecraft.go index 483ce7ee..09c19fc4 100644 --- a/format/timecraft.go +++ b/format/timecraft.go @@ -197,8 +197,10 @@ func jsonDecode(b []byte, value any) error { } type Manifest struct { - Process *Descriptor `json:"process" yaml:"process"` - StartTime time.Time `json:"startTime" yaml:"startTime"` + ProcessID UUID `json:"-" yaml:"-"` + StartTime time.Time `json:"startTime" yaml:"startTime"` + Process *Descriptor `json:"process" yaml:"process"` + Segments []LogSegment `json:"segments,omitempty" yaml:"segments,omitempty"` } func (m *Manifest) ContentType() MediaType { @@ -213,6 +215,12 @@ func (m *Manifest) UnmarshalResource(b []byte) error { return jsonDecode(b, m) } +type LogSegment struct { + Number int `json:"number" yaml:"number"` + Size int64 `json:"size" yaml:"size"` + CreatedAt time.Time `json:"createdAt" yaml:"createdAt"` +} + var ( _ ResourceMarshaler = (*Descriptor)(nil) _ ResourceMarshaler = (*Module)(nil) diff --git a/format/types/types.fbs b/format/types/types.fbs index 43fa7f38..2b7832c4 100644 --- a/format/types/types.fbs +++ b/format/types/types.fbs @@ -3,7 +3,7 @@ namespace types; // Compression is the enumeration representing the supported compression // algorithms for data sections of log snapshots. enum Compression:uint { - Uncompressed, Snappy, Zstd + uncompressed, snappy, zstd } // Hash represents a OCI hash which pairs an algorithm name to a digest. diff --git a/format/types/types_generated.go b/format/types/types_generated.go index b3e7928d..9b8e3d73 100644 --- a/format/types/types_generated.go +++ b/format/types/types_generated.go @@ -11,21 +11,21 @@ import ( type Compression uint32 const ( - CompressionUncompressed Compression = 0 - CompressionSnappy Compression = 1 - CompressionZstd Compression = 2 + Compressionuncompressed Compression = 0 + Compressionsnappy Compression = 1 + Compressionzstd Compression = 2 ) var EnumNamesCompression = map[Compression]string{ - CompressionUncompressed: "Uncompressed", - CompressionSnappy: "Snappy", - CompressionZstd: "Zstd", + Compressionuncompressed: "uncompressed", + Compressionsnappy: "snappy", + Compressionzstd: "zstd", } var EnumValuesCompression = map[string]Compression{ - "Uncompressed": CompressionUncompressed, - "Snappy": CompressionSnappy, - "Zstd": CompressionZstd, + "uncompressed": Compressionuncompressed, + "snappy": Compressionsnappy, + "zstd": Compressionzstd, } func (v Compression) String() string { diff --git a/internal/cmd/describe.go b/internal/cmd/describe.go index c00981f8..06e50e4e 100644 --- a/internal/cmd/describe.go +++ b/internal/cmd/describe.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "strconv" "strings" "time" @@ -23,6 +24,23 @@ import ( const describeUsage = ` Usage: timecraft describe [options] + The describe command prints detailed information about specific resources. + + Resource types available to 'timecraft get' are also usable with the describe + command. Values displayed in the ID column of the get output can be passed as + arguments to describe. + +Examples: + + $ timecraft describe log e1c6ae6e-caa8-45c1-bc9b-827617347063 + ID: e1c6ae6e-caa8-45c1-bc9b-827617347063 + Size: 1.62 KiB/3.88 KiB +64 B (compression: 58.27%) + Start: 3h ago, Mon, 29 May 2023 23:00:41 UTC + Records: 27 (1 batch) + --- + SEGMENT RECORDS BATCHES SIZE UNCOMPRESSED SIZE COMPRESSED SIZE COMPRESSION RATIO + 0 27 1 1.68 KiB 3.88 KiB 1.62 KiB 58.27% + Options: -h, --help Show this usage information -o, --ouptut format Output format, one of: text, json, yaml @@ -234,13 +252,13 @@ func describeProcess(ctx context.Context, reg *timemachine.Registry, id string) segments := reg.ListLogSegments(ctx, processID) defer segments.Close() - i := stream.Iter[timemachine.LogSegment](segments) + i := stream.Iter[format.LogSegment](segments) for i.Next() { v := i.Value() desc.log = append(desc.log, logSegment{ - number: v.Number, - size: human.Bytes(v.Size), - createdAt: human.Time(v.CreatedAt.In(time.Local)), + Number: v.Number, + Size: human.Bytes(v.Size), + CreatedAt: human.Time(v.CreatedAt.In(time.Local)), }) } if err := i.Err(); err != nil { @@ -334,7 +352,7 @@ type configDescriptor struct { } func (desc *configDescriptor) Format(w fmt.State, _ rune) { - fmt.Fprintf(w, "ID: %s\n", desc.id) + fmt.Fprintf(w, "ID: %s\n", desc.id) fmt.Fprintf(w, "Runtime: %s (%s)\n", desc.runtime.runtime, desc.runtime.version) fmt.Fprintf(w, "Modules:\n") for _, module := range desc.modules { @@ -357,7 +375,7 @@ type moduleDescriptor struct { } func (desc *moduleDescriptor) Format(w fmt.State, _ rune) { - fmt.Fprintf(w, "ID: %s\n", desc.id) + fmt.Fprintf(w, "ID: %s\n", desc.id) fmt.Fprintf(w, "Name: %s\n", desc.name) fmt.Fprintf(w, "Size: %v\n", desc.size) } @@ -372,15 +390,9 @@ type processDescriptor struct { log []logSegment } -type logSegment struct { - number int - size human.Bytes - createdAt human.Time -} - func (desc *processDescriptor) Format(w fmt.State, _ rune) { - fmt.Fprintf(w, "ID: %s\n", desc.id) - fmt.Fprintf(w, "Start Time: %s, %s\n", desc.startTime, time.Time(desc.startTime).Format(time.RFC1123)) + fmt.Fprintf(w, "ID: %s\n", desc.id) + fmt.Fprintf(w, "Start: %s, %s\n", desc.startTime, time.Time(desc.startTime).Format(time.RFC1123)) fmt.Fprintf(w, "Runtime: %s (%s)\n", desc.runtime.runtime, desc.runtime.version) fmt.Fprintf(w, "Modules:\n") for _, module := range desc.modules { @@ -396,7 +408,7 @@ func (desc *processDescriptor) Format(w fmt.State, _ rune) { } fmt.Fprintf(w, "Log:\n") for _, log := range desc.log { - fmt.Fprintf(w, " segment %d: %s, created %s (%s)\n", log.number, log.size, log.createdAt, time.Time(log.createdAt).Format(time.RFC1123)) + fmt.Fprintf(w, " segment %d: %s, created %s (%s)\n", log.Number, log.Size, log.CreatedAt, time.Time(log.CreatedAt).Format(time.RFC1123)) } } @@ -407,7 +419,7 @@ type runtimeDescriptor struct { } func (desc *runtimeDescriptor) Format(w fmt.State, _ rune) { - fmt.Fprintf(w, "ID: %s\n", desc.id) + fmt.Fprintf(w, "ID: %s\n", desc.id) fmt.Fprintf(w, "Runtime: %s\n", desc.runtime) fmt.Fprintf(w, "Version: %s\n", desc.version) } @@ -419,3 +431,209 @@ func moduleName(module *format.Descriptor) string { } return name } + +type recordBatch struct { + NumRecords int `json:"numRecords" yaml:"numRecords" text:"RECORDS"` + FirstOffset int64 `json:"firstOffset" yaml:"firstOffset" text:"FIRST OFFSET"` + FirstTimestamp human.Time `json:"firstTimestamp" yaml:"firstTimestamp" text:"-"` + LastTimestamp human.Time `json:"lastTimestamp" yaml:"lastTimestamp" text:"-"` + Duration human.Duration `json:"-" yaml:"-" text:"DURATION"` + UncompressedSize human.Bytes `json:"uncompressedSize" yaml:"uncompressedSize" text:"UNCOMPRESSED SIZE"` + CompressedSize human.Bytes `json:"compressedSize" yaml:"compressedSize" text:"COMPRESSED SIZE"` + CompressionRatio human.Ratio `json:"-" yaml:"-" text:"COMPRESSION RATIO"` + Compression string `json:"compression" yaml:"compression" text:"COMPRESSION"` +} + +type logSegment struct { + Number int `json:"number" yaml:"number" text:"SEGMENT"` + NumRecords int `json:"-" yaml:"-" text:"RECORDS"` + NumBatches int `json:"-" yaml:"-" text:"BATCHES"` + Duration human.Duration `json:"-" yaml:"-" text:"DURATION"` + Size human.Bytes `json:"size" yaml:"size" text:"SIZE"` + UncompressedSize human.Bytes `json:"-" yaml:"-" text:"UNCOMPRESSED SIZE"` + CompressedSize human.Bytes `json:"-" yaml:"-" text:"COMPRESSED SIZE"` + CompressionRatio human.Ratio `json:"-" yaml:"-" text:"COMPRESSION RATIO"` + CreatedAt human.Time `json:"createdAt" yaml:"createdAt" text:"-"` + RecordBatches []recordBatch `json:"recordBatches" yaml:"recordBatches" text:"-"` +} + +func (desc *logSegment) Format(w fmt.State, _ rune) { + startTime := human.Time{} + if len(desc.RecordBatches) > 0 { + startTime = desc.RecordBatches[0].FirstTimestamp + } + + fmt.Fprintf(w, "Segment: %d\n", desc.Number) + fmt.Fprintf(w, "Size: %s/%s +%s (compression: %s)\n", desc.CompressedSize, desc.UncompressedSize, desc.Size-desc.CompressedSize, desc.CompressionRatio) + fmt.Fprintf(w, "Start: %s, %s\n", startTime, time.Time(startTime).Format(time.RFC1123)) + fmt.Fprintf(w, "Records: %d (%d batch)\n", desc.NumRecords, len(desc.RecordBatches)) + fmt.Fprintf(w, "---\n") + + table := textprint.NewTableWriter[recordBatch](w) + defer table.Close() + + _, _ = table.Write(desc.RecordBatches) +} + +type logDescriptor struct { + ProcessID format.UUID `json:"id" yaml:"id"` + Size human.Bytes `json:"size" yaml:"size"` + StartTime human.Time `json:"startTime" yaml:"startTime"` + Segments []logSegment `json:"segments" yaml:"segments"` +} + +func (desc *logDescriptor) Format(w fmt.State, _ rune) { + uncompressedSize := human.Bytes(0) + compressedSize := human.Bytes(0) + metadataSize := human.Bytes(0) + numRecords := 0 + numBatches := 0 + for _, seg := range desc.Segments { + uncompressedSize += seg.UncompressedSize + compressedSize += seg.CompressedSize + metadataSize += seg.Size - seg.CompressedSize + numRecords += seg.NumRecords + numBatches += seg.NumBatches + } + compressionRatio := 1 - human.Ratio(compressedSize)/human.Ratio(uncompressedSize) + + fmt.Fprintf(w, "ID: %s\n", desc.ProcessID) + fmt.Fprintf(w, "Size: %s/%s +%s (compression: %s)\n", compressedSize, uncompressedSize, metadataSize, compressionRatio) + fmt.Fprintf(w, "Start: %s, %s\n", desc.StartTime, time.Time(desc.StartTime).Format(time.RFC1123)) + fmt.Fprintf(w, "Records: %d (%d batch)\n", numRecords, numBatches) + fmt.Fprintf(w, "---\n") + + table := textprint.NewTableWriter[logSegment](w) + defer table.Close() + + _, _ = table.Write(desc.Segments) +} + +func describeLog(ctx context.Context, reg *timemachine.Registry, id string) (any, error) { + logSegmentNumber := -1 + logID, logNumber, ok := strings.Cut(id, "/") + if ok { + n, err := strconv.Atoi(logNumber) + if err != nil { + return nil, errors.New(`malformed log id (suffix is not a valid segment number)`) + } + logSegmentNumber = n + } + + processID, err := uuid.Parse(logID) + if err != nil { + return nil, errors.New(`malformed process id (not a UUID)`) + } + + m, err := reg.LookupLogManifest(ctx, processID) + if err != nil { + return nil, err + } + + logch := make(chan logSegment, len(m.Segments)) + errch := make(chan error, len(m.Segments)) + wait := 0 + + for _, seg := range m.Segments { + if logSegmentNumber >= 0 && seg.Number != logSegmentNumber { + continue + } + wait++ + go func(seg format.LogSegment) { + r, err := reg.ReadLogSegment(ctx, processID, seg.Number) + if err != nil { + errch <- err + return + } + defer r.Close() + + logReader := timemachine.NewLogReader(r, seg.CreatedAt) + defer logReader.Close() + + logSegment := logSegment{ + Number: seg.Number, + Size: human.Bytes(seg.Size), + CreatedAt: human.Time(seg.CreatedAt), + } + + lastTime := time.Time(logSegment.CreatedAt) + for { + b, err := logReader.ReadRecordBatch() + if err != nil { + if err != io.EOF { + errch <- err + } else { + c := human.Ratio(logSegment.CompressedSize) + u := human.Ratio(logSegment.UncompressedSize) + logSegment.CompressionRatio = 1 - c/u + logSegment.NumBatches = len(logSegment.RecordBatches) + logSegment.Duration = human.Duration(lastTime.Sub(time.Time(logSegment.CreatedAt))) + logch <- logSegment + } + break + } + + var ( + numRecords = b.NumRecords() + firstOffset = b.FirstOffset() + firstTimestamp = b.FirstTimestamp() + lastTimestamp = b.LastTimestamp() + duration = human.Duration(lastTimestamp.Sub(firstTimestamp)) + uncompressedSize = human.Bytes(b.UncompressedSize()) + compressedSize = human.Bytes(b.CompressedSize()) + compression = b.Compression() + ) + + lastTime = lastTimestamp + logSegment.NumRecords += numRecords + logSegment.CompressedSize += compressedSize + logSegment.UncompressedSize += uncompressedSize + logSegment.RecordBatches = append(logSegment.RecordBatches, recordBatch{ + NumRecords: numRecords, + FirstOffset: firstOffset, + FirstTimestamp: human.Time(firstTimestamp), + LastTimestamp: human.Time(lastTimestamp), + Duration: duration, + UncompressedSize: uncompressedSize, + CompressedSize: compressedSize, + CompressionRatio: 1 - human.Ratio(compressedSize)/human.Ratio(uncompressedSize), + Compression: compression.String(), + }) + } + }(seg) + } + + var logs []logSegment + var errs []error + for wait > 0 { + wait-- + select { + case log := <-logch: + logs = append(logs, log) + case err := <-errch: + errs = append(errs, err) + } + } + if len(errs) != 0 { + return nil, errors.Join(errs...) + } + + if logSegmentNumber >= 0 { + return &logs[0], nil + } + + slices.SortFunc(logs, func(s1, s2 logSegment) bool { + return s1.Number < s2.Number + }) + + desc := &logDescriptor{ + ProcessID: processID, + StartTime: human.Time(m.StartTime), + Segments: logs, + } + + for _, log := range logs { + desc.Size += log.Size + } + return desc, nil +} diff --git a/internal/cmd/get.go b/internal/cmd/get.go index a05d6319..67a0f6ea 100644 --- a/internal/cmd/get.go +++ b/internal/cmd/get.go @@ -23,7 +23,7 @@ Usage: timecraft get [options] The get sub-command gives access to the state of the time machine registry. The command must be followed by the name of resources to display, which must - be one of config, module, process, or runtime. + be one of config, log, module, process, or runtime. (the command also accepts plurals and abbreviations of the resource names) Examples: @@ -66,6 +66,13 @@ var resources = [...]resource{ describe: describeConfig, lookup: lookupConfig, }, + { + typ: "log", + alt: []string{"logs"}, + mediaType: format.TypeTimecraftManifest, + describe: describeLog, + lookup: describeLog, + }, { typ: "module", alt: []string{"mo", "mod", "mods", "modules"}, @@ -127,6 +134,27 @@ Did you mean?%s`, resourceTypeLookup, joinResourceTypes(matchingResources, "\n return err } + // We make a special case for the log segments because they are not + // immutable objects and therefore don't have descriptors. + if resource.typ == "log" { + reader := registry.ListLogManifests(ctx) // TODO: time range + defer reader.Close() + + var writer stream.WriteCloser[*format.Manifest] + switch output { + case "json": + writer = jsonprint.NewWriter[*format.Manifest](os.Stdout) + case "yaml": + writer = yamlprint.NewWriter[*format.Manifest](os.Stdout) + default: + writer = getLogs(ctx, os.Stdout, registry) + } + defer writer.Close() + + _, err = stream.Copy[*format.Manifest](writer, reader) + return err + } + reader := registry.ListResources(ctx, resource.mediaType, timeRange) defer reader.Close() @@ -152,7 +180,7 @@ func getConfigs(ctx context.Context, w io.Writer, reg *timemachine.Registry) str Modules int `text:"MODULES"` Size human.Bytes `text:"SIZE"` } - return newDescTableWriter(w, + return newTableWriter(w, func(c1, c2 config) bool { return c1.ID < c2.ID }, @@ -180,7 +208,7 @@ func getModules(ctx context.Context, w io.Writer, reg *timemachine.Registry) str Name string `text:"MODULE NAME"` Size human.Bytes `text:"SIZE"` } - return newDescTableWriter(w, + return newTableWriter(w, func(m1, m2 module) bool { return m1.ID < m2.ID }, @@ -202,7 +230,7 @@ func getProcesses(ctx context.Context, w io.Writer, reg *timemachine.Registry) s ID format.UUID `text:"PROCESS ID"` StartTime human.Time `text:"STARTED"` } - return newDescTableWriter(w, + return newTableWriter(w, func(p1, p2 process) bool { return time.Time(p1.StartTime).Before(time.Time(p2.StartTime)) }, @@ -224,7 +252,7 @@ func getRuntimes(ctx context.Context, w io.Writer, reg *timemachine.Registry) st Runtime string `text:"RUNTIME NAME"` Version string `text:"VERSION"` } - return newDescTableWriter(w, + return newTableWriter(w, func(r1, r2 runtime) bool { return r1.ID < r2.ID }, @@ -241,9 +269,33 @@ func getRuntimes(ctx context.Context, w io.Writer, reg *timemachine.Registry) st }) } -func newDescTableWriter[T any](w io.Writer, orderBy func(T, T) bool, conv func(*format.Descriptor) (T, error)) stream.WriteCloser[*format.Descriptor] { - tw := textprint.NewTableWriter[T](w, textprint.OrderBy(orderBy)) - cw := stream.ConvertWriter[T](tw, conv) +func getLogs(ctx context.Context, w io.Writer, reg *timemachine.Registry) stream.WriteCloser[*format.Manifest] { + type manifest struct { + ProcessID format.UUID `text:"PROCESS ID"` + Segments human.Count `text:"SEGMENTS"` + Size human.Bytes `text:"SIZE"` + StartTime human.Time `text:"STARTED"` + } + return newTableWriter(w, + func(m1, m2 manifest) bool { + return time.Time(m1.StartTime).Before(time.Time(m2.StartTime)) + }, + func(m *format.Manifest) (manifest, error) { + manifest := manifest{ + ProcessID: m.ProcessID, + Segments: human.Count(len(m.Segments)), + StartTime: human.Time(m.StartTime), + } + for _, segment := range m.Segments { + manifest.Size += human.Bytes(segment.Size) + } + return manifest, nil + }) +} + +func newTableWriter[T1, T2 any](w io.Writer, orderBy func(T1, T1) bool, conv func(T2) (T1, error)) stream.WriteCloser[T2] { + tw := textprint.NewTableWriter[T1](w, textprint.OrderBy(orderBy)) + cw := stream.ConvertWriter[T1](tw, conv) return stream.NewWriteCloser(cw, tw) } diff --git a/internal/print/human/bytes.go b/internal/print/human/bytes.go index 9a380d8f..b8e00b55 100644 --- a/internal/print/human/bytes.go +++ b/internal/print/human/bytes.go @@ -111,7 +111,7 @@ var bytes1000 = [...]byteUnit{ } var bytes1024 = [...]byteUnit{ - {B, ""}, + {B, "B"}, {KiB, "KiB"}, {MiB, "MiB"}, {GiB, "GiB"}, @@ -199,7 +199,7 @@ func (b *Bytes) UnmarshalJSON(j []byte) error { } func (b Bytes) MarshalYAML() (any, error) { - return b.String(), nil + return uint64(b), nil } func (b *Bytes) UnmarshalYAML(y *yaml.Node) error { diff --git a/internal/print/human/bytes_test.go b/internal/print/human/bytes_test.go index d93ab73d..de722bb7 100644 --- a/internal/print/human/bytes_test.go +++ b/internal/print/human/bytes_test.go @@ -54,7 +54,7 @@ func TestBytesFormat(t *testing.T) { out string }{ {fmt: "%v", out: "0", in: 0}, - {fmt: "%v", out: "2", in: 2}, + {fmt: "%v", out: "2 B", in: 2}, {fmt: "%v", out: "1.95 KiB", in: 2 * KB}, {fmt: "%v", out: "1.91 MiB", in: 2 * MB}, diff --git a/internal/print/textprint/table.go b/internal/print/textprint/table.go index bae5efda..984e69b3 100644 --- a/internal/print/textprint/table.go +++ b/internal/print/textprint/table.go @@ -60,13 +60,7 @@ func (t *tableWriter[T]) Close() error { } var encoders []encodeFunc - for i, f := range reflect.VisibleFields(valueType) { - if i != 0 { - if _, err := io.WriteString(tw, "\t"); err != nil { - return err - } - } - + for _, f := range reflect.VisibleFields(valueType) { name := f.Name if textTag := f.Tag.Get("text"); textTag != "" { tag := strings.Split(textTag, ",") @@ -82,6 +76,11 @@ func (t *tableWriter[T]) Close() error { continue } + if len(encoders) > 0 { + if _, err := io.WriteString(tw, "\t"); err != nil { + return err + } + } if _, err := io.WriteString(tw, name); err != nil { return err } diff --git a/internal/stream/chan.go b/internal/stream/chan.go new file mode 100644 index 00000000..9defdc40 --- /dev/null +++ b/internal/stream/chan.go @@ -0,0 +1,56 @@ +package stream + +import "io" + +type Optional[T any] struct { + val T + err error +} + +func Opt[T any](val T, err error) Optional[T] { + return Optional[T]{val: val, err: err} +} + +func (opt *Optional[T]) Value() (T, error) { + return opt.val, opt.err +} + +func ChanReader[T any](ch <-chan Optional[T]) Reader[T] { + return chanReader[T](ch) +} + +type chanReader[T any] <-chan Optional[T] + +func (r chanReader[T]) Read(values []T) (int, error) { + if len(values) == 0 { + return 0, nil + } + + opt, ok := <-r + if !ok { + return 0, io.EOF + } + v, err := opt.Value() + if err != nil { + return 0, err + } + values[0] = v + + for n := 1; n < len(values); n++ { + select { + case opt, ok := <-r: + if !ok { + return n, io.EOF + } + v, err := opt.Value() + if err != nil { + return n, err + } + values[n] = v + default: + return n, nil + } + } + + return len(values), nil +} diff --git a/internal/timemachine/compress.go b/internal/timemachine/compress.go index 3e22e3a7..96f19965 100644 --- a/internal/timemachine/compress.go +++ b/internal/timemachine/compress.go @@ -12,9 +12,9 @@ import ( type Compression = types.Compression const ( - Uncompressed Compression = types.CompressionUncompressed - Snappy Compression = types.CompressionSnappy - Zstd Compression = types.CompressionZstd + Uncompressed Compression = types.Compressionuncompressed + Snappy Compression = types.Compressionsnappy + Zstd Compression = types.Compressionzstd ) var ( diff --git a/internal/timemachine/log.go b/internal/timemachine/log.go index 9af0a7ea..49749dcb 100644 --- a/internal/timemachine/log.go +++ b/internal/timemachine/log.go @@ -52,9 +52,17 @@ func (r *LogReader) Close() error { // The RecordBatch is only valid until the next call to ReadRecordBatch. func (r *LogReader) ReadRecordBatch() (*RecordBatch, error) { if r.batch.reader.N > 0 { - if _, err := io.Copy(io.Discard, &r.batch.reader); err != nil { + var err error + if s, ok := r.batch.reader.R.(io.Seeker); ok { + _, err = s.Seek(r.batch.reader.N, io.SeekCurrent) + } else { + _, err = io.Copy(io.Discard, &r.batch.reader) + } + if err != nil { return nil, err } + r.batch.reader.R = nil + r.batch.reader.N = 0 } buffer.Release(&r.batchFrame, &frameBufferPool) var err error diff --git a/internal/timemachine/record_batch.go b/internal/timemachine/record_batch.go index 35954cf7..6ccb01f2 100644 --- a/internal/timemachine/record_batch.go +++ b/internal/timemachine/record_batch.go @@ -86,11 +86,16 @@ func (b *RecordBatch) FirstOffset() int64 { return b.batch.FirstOffset() } -// FirstTimestamp returns the time at which the first record was produced. +// FirstTimestamp returns the time of the first record in the batch. func (b *RecordBatch) FirstTimestamp() time.Time { return b.startTime.Add(time.Duration(b.batch.FirstTimestamp())) } +// LastTimestamp returns the time of the last record in the batch. +func (b *RecordBatch) LastTimestamp() time.Time { + return b.startTime.Add(time.Duration(b.batch.LastTimestamp())) +} + // CompressedSize returns the size of the record batch data section in the log // segment. func (b *RecordBatch) CompressedSize() int64 { @@ -175,6 +180,7 @@ type RecordBatchBuilder struct { compression Compression firstOffset int64 firstTimestamp int64 + lastTimestamp int64 recordCount uint32 uncompressed []byte compressed []byte @@ -198,6 +204,7 @@ func (b *RecordBatchBuilder) Reset(compression Compression, firstOffset int64) { b.records = nil b.firstOffset = firstOffset b.firstTimestamp = 0 + b.lastTimestamp = 0 b.finished = false b.concatenated = false b.recordCount = 0 @@ -215,6 +222,7 @@ func (b *RecordBatchBuilder) AddRecord(record *RecordBuilder) { if b.recordCount == 0 { b.firstTimestamp = record.timestamp } + b.lastTimestamp = record.timestamp b.recordCount++ } @@ -267,6 +275,7 @@ func (b *RecordBatchBuilder) build() { logsegment.RecordBatchStart(b.builder) logsegment.RecordBatchAddFirstOffset(b.builder, b.firstOffset) logsegment.RecordBatchAddFirstTimestamp(b.builder, b.firstTimestamp) + logsegment.RecordBatchAddLastTimestamp(b.builder, b.lastTimestamp) logsegment.RecordBatchAddCompressedSize(b.builder, uint32(len(b.compressed))) logsegment.RecordBatchAddUncompressedSize(b.builder, uint32(len(b.uncompressed))) logsegment.RecordBatchAddChecksum(b.builder, checksum(b.records)) diff --git a/internal/timemachine/registry.go b/internal/timemachine/registry.go index 69983845..e850549e 100644 --- a/internal/timemachine/registry.go +++ b/internal/timemachine/registry.go @@ -12,13 +12,14 @@ import ( "os" "path" "strconv" + "sync" "time" - "golang.org/x/exp/slices" - + "github.com/google/uuid" "github.com/stealthrocket/timecraft/format" "github.com/stealthrocket/timecraft/internal/object" "github.com/stealthrocket/timecraft/internal/stream" + "golang.org/x/exp/slices" ) var ( @@ -47,12 +48,6 @@ func (tr TimeRange) Duration() time.Duration { return tr.End.Sub(tr.Start) } -type LogSegment struct { - Number int - Size int64 - CreatedAt time.Time -} - type Registry struct { // The object store that the registry uses to load and store data. Store object.Store @@ -350,15 +345,15 @@ func (w *logSegmentWriter) Close() error { return err } -func (reg *Registry) ListLogSegments(ctx context.Context, processID format.UUID) stream.ReadCloser[LogSegment] { +func (reg *Registry) ListLogSegments(ctx context.Context, processID format.UUID) stream.ReadCloser[format.LogSegment] { reader := reg.Store.ListObjects(ctx, "log/"+processID.String()+"/data/") - return convert(reader, func(info object.Info) (LogSegment, error) { + return convert(reader, func(info object.Info) (format.LogSegment, error) { number := path.Base(info.Name) n, err := strconv.ParseInt(number, 16, 32) if err != nil || n < 0 { - return LogSegment{}, fmt.Errorf("invalid log segment entry: %q", info.Name) + return format.LogSegment{}, fmt.Errorf("invalid log segment entry: %q", info.Name) } - segment := LogSegment{ + segment := format.LogSegment{ Number: int(n), Size: info.Size, CreatedAt: info.CreatedAt, @@ -367,6 +362,48 @@ func (reg *Registry) ListLogSegments(ctx context.Context, processID format.UUID) }) } +func (reg *Registry) ListLogManifests(ctx context.Context) stream.ReadCloser[*format.Manifest] { + ch := make(chan stream.Optional[*format.Manifest]) + ctx, cancel := context.WithCancel(ctx) + + go func(reader stream.ReadCloser[object.Info]) { + defer close(ch) + defer reader.Close() + + it := stream.Iter[object.Info](reader) + wg := sync.WaitGroup{} + + for it.Next() { + processID, err := uuid.Parse(path.Base(it.Value().Name)) + if err != nil { + continue + } + wg.Add(1) + go func() { + defer wg.Done() + ch <- stream.Opt(reg.LookupLogManifest(ctx, processID)) + }() + } + + if err := it.Err(); err != nil { + ch <- stream.Opt[*format.Manifest](nil, err) + } + + wg.Wait() + }(reg.Store.ListObjects(ctx, "log/")) + + return stream.NewReadCloser(stream.ChanReader(ch), closerFunc(func() error { + cancel() + for range ch { + } + return nil + })) +} + +type closerFunc func() error + +func (f closerFunc) Close() error { return f() } + func (reg *Registry) LookupLogManifest(ctx context.Context, processID format.UUID) (*format.Manifest, error) { r, err := reg.Store.ReadObject(ctx, reg.manifestKey(processID)) if err != nil { @@ -380,10 +417,24 @@ func (reg *Registry) LookupLogManifest(ctx context.Context, processID format.UUI if err != nil { return nil, err } + m := new(format.Manifest) if err := m.UnmarshalResource(b); err != nil { return nil, err } + m.ProcessID = processID + + segments := reg.ListLogSegments(ctx, processID) + defer segments.Close() + + m.Segments, err = stream.ReadAll[format.LogSegment](segments) + if err != nil { + return nil, err + } + + slices.SortFunc(m.Segments, func(s1, s2 format.LogSegment) bool { + return s1.Number < s2.Number + }) return m, nil }