Skip to content
This repository has been archived by the owner on Feb 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #47 from stealthrocket/decouple-records-from-flatb…
Browse files Browse the repository at this point in the history
…uffers

decouple time machine records from flatbuffers
  • Loading branch information
achille-roussel committed Jun 1, 2023
2 parents d2ba046 + 68adcf9 commit 04faa6c
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 202 deletions.
52 changes: 21 additions & 31 deletions internal/timemachine/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,34 @@ import (
func TestReadRecordBatch(t *testing.T) {
startTime := time.Now()

batches := [][]record{
batches := [][]timemachine.Record{
{
{
Timestamp: startTime.Add(1 * time.Millisecond),
Time: startTime.Add(1 * time.Millisecond),
FunctionID: 0,
FunctionCall: []byte("function call 0"),
},
},
{
{
Timestamp: startTime.Add(2 * time.Millisecond),
Time: startTime.Add(2 * time.Millisecond),
FunctionID: 1,
FunctionCall: []byte("function call 1"),
},
{
Timestamp: startTime.Add(3 * time.Millisecond),
Time: startTime.Add(3 * time.Millisecond),
FunctionID: 2,
FunctionCall: []byte("function call 2"),
},
},
{
{
Timestamp: startTime.Add(4 * time.Millisecond),
Time: startTime.Add(4 * time.Millisecond),
FunctionID: 3,
FunctionCall: []byte("function call: A, B, C, D"),
},
{
Timestamp: startTime.Add(5 * time.Millisecond),
Time: startTime.Add(5 * time.Millisecond),
FunctionID: 4,
FunctionCall: []byte("hello world!"),
},
Expand All @@ -59,7 +59,7 @@ func TestReadRecordBatch(t *testing.T) {
recordBatchBuilder.Reset(timemachine.Zstd, firstOffset)
for _, r := range batch {
recordBuilder.Reset(startTime)
recordBuilder.SetTimestamp(r.Timestamp)
recordBuilder.SetTimestamp(r.Time)
recordBuilder.SetFunctionID(r.FunctionID)
recordBuilder.SetFunctionCall(r.FunctionCall)
recordBatchBuilder.AddRecord(&recordBuilder)
Expand All @@ -71,7 +71,7 @@ func TestReadRecordBatch(t *testing.T) {
}

reader := timemachine.NewLogReader(bytes.NewReader(buffer.Bytes()), startTime)
batchesRead := make([][]record, 0, len(batches))
batchesRead := make([][]timemachine.Record, 0, len(batches))
for {
batch, err := reader.ReadRecordBatch()
if err != nil {
Expand All @@ -80,18 +80,14 @@ func TestReadRecordBatch(t *testing.T) {
}
t.Fatal(err)
}
records := make([]record, batch.NumRecords())
records := make([]timemachine.Record, batch.NumRecords())
count := 0
iter := stream.Iter[timemachine.Record](batch)

for iter.Next() {
r := iter.Value()
assert.Less(t, count, len(records))
records[count] = record{
Timestamp: r.Time(),
FunctionID: r.FunctionID(),
FunctionCall: r.FunctionCall(),
}
records[count] = r
count++
}

Expand All @@ -105,28 +101,22 @@ func TestReadRecordBatch(t *testing.T) {
}
}

type record struct {
Timestamp time.Time
FunctionID int
FunctionCall []byte
}

func BenchmarkLogWriter(b *testing.B) {
startTime := time.Now()

tests := []struct {
scenario string
batch []record
batch []timemachine.Record
}{
{
scenario: "zero records",
},

{
scenario: "one record",
batch: []record{
batch: []timemachine.Record{
{
Timestamp: startTime.Add(1 * time.Millisecond),
Time: startTime.Add(1 * time.Millisecond),
FunctionID: 0,
FunctionCall: []byte("function call 0"),
},
Expand All @@ -135,29 +125,29 @@ func BenchmarkLogWriter(b *testing.B) {

{
scenario: "five records",
batch: []record{
batch: []timemachine.Record{
{
Timestamp: startTime.Add(1 * time.Millisecond),
Time: startTime.Add(1 * time.Millisecond),
FunctionID: 0,
FunctionCall: []byte("1"),
},
{
Timestamp: startTime.Add(2 * time.Millisecond),
Time: startTime.Add(2 * time.Millisecond),
FunctionID: 1,
FunctionCall: []byte("1,2"),
},
{
Timestamp: startTime.Add(3 * time.Millisecond),
Time: startTime.Add(3 * time.Millisecond),
FunctionID: 2,
FunctionCall: []byte("1,2,3"),
},
{
Timestamp: startTime.Add(4 * time.Millisecond),
Time: startTime.Add(4 * time.Millisecond),
FunctionID: 3,
FunctionCall: []byte("A,B,C,D"),
},
{
Timestamp: startTime.Add(5 * time.Millisecond),
Time: startTime.Add(5 * time.Millisecond),
FunctionID: 4,
FunctionCall: []byte("hello world!"),
},
Expand All @@ -172,7 +162,7 @@ func BenchmarkLogWriter(b *testing.B) {
}
}

func benchmarkLogWriterWriteRecordBatch(b *testing.B, startTime time.Time, compression timemachine.Compression, batch []record) {
func benchmarkLogWriterWriteRecordBatch(b *testing.B, startTime time.Time, compression timemachine.Compression, batch []timemachine.Record) {
w := timemachine.NewLogWriter(io.Discard)

var recordBuilder timemachine.RecordBuilder
Expand All @@ -185,7 +175,7 @@ func benchmarkLogWriterWriteRecordBatch(b *testing.B, startTime time.Time, compr
recordBatchBuilder.Reset(compression, 0)
for _, r := range batch {
recordBuilder.Reset(startTime)
recordBuilder.SetTimestamp(r.Timestamp)
recordBuilder.SetTimestamp(r.Time)
recordBuilder.SetFunctionID(r.FunctionID)
recordBuilder.SetFunctionCall(r.FunctionCall)
recordBatchBuilder.AddRecord(&recordBuilder)
Expand Down
40 changes: 3 additions & 37 deletions internal/timemachine/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,9 @@ import (

// Record is a read-only record from the log.
type Record struct {
startTime time.Time
record logsegment.Record
}

// MakeRecord creates a record from a buffer.
//
// The buffer must live as long as the record.
func MakeRecord(startTime time.Time, buffer []byte) (r Record) {
r.Reset(startTime, buffer)
return
}

// Reset resets a record.
func (r *Record) Reset(startTime time.Time, buffer []byte) {
r.startTime = startTime
r.record = *logsegment.GetSizePrefixedRootAsRecord(buffer, 0)
}

// Time is the record timestamp as a time.Time value (adjusted from
// the process start time).
func (r *Record) Time() time.Time {
return r.startTime.Add(time.Duration(r.Timestamp()))
}

// Timestamp is the monontonic timestamp encoded in the record.
func (r *Record) Timestamp() int64 {
return r.record.Timestamp()
}

// FunctionID is the record's associated function ID.
func (r *Record) FunctionID() int {
return int(r.record.FunctionId())
}

// FunctionCall returns the function call details.
func (r *Record) FunctionCall() []byte {
return r.record.FunctionCallBytes()
Time time.Time
FunctionID int
FunctionCall []byte
}

// RecordBuilder is a builder for records.
Expand Down
17 changes: 8 additions & 9 deletions internal/timemachine/record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ type RecordBatch struct {
offset uint32
}

// MakeRecordBatch creates a record batch from the specified buffer.
//
// The buffer must live as long as the record batch.
func MakeRecordBatch(startTime time.Time, buffer []byte, reader io.Reader) (rb RecordBatch) {
rb.Reset(startTime, buffer, reader)
return
}

// Reset resets the record batch.
func (b *RecordBatch) Reset(startTime time.Time, buf []byte, reader io.Reader) {
if b.records != nil {
Expand Down Expand Up @@ -137,7 +129,14 @@ func (b *RecordBatch) Read(records []Record) (int, error) {
if b.offset+size < b.offset || b.offset+size > uint32(len(batch)) {
return n, fmt.Errorf("cannot read record at [%d:%d+%d] as records buffer is length %d: %w", b.offset, b.offset, size, len(batch), io.ErrUnexpectedEOF)
}
records[n] = MakeRecord(b.startTime, batch[b.offset:b.offset+size+4])
i := 4 + b.offset
j := 4 + b.offset + size
r := logsegment.GetRootAsRecord(batch[i:j:j], 0)
records[n] = Record{
Time: b.startTime.Add(time.Duration(r.Timestamp())),
FunctionID: int(r.FunctionId()),
FunctionCall: r.FunctionCallBytes(),
}
b.offset += size + 4
}
return len(records), nil
Expand Down
Loading

0 comments on commit 04faa6c

Please sign in to comment.