Skip to content
This repository has been archived by the owner on Feb 17, 2024. It is now read-only.

timemachine: support seeking through log records #45

Merged
merged 2 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions internal/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ const DefaultSize = 4096

type Buffer struct{ Data []byte }

func (buf *Buffer) Size() int64 {
return int64(len(buf.Data))
}

type Pool struct{ pool sync.Pool }

func (p *Pool) Get(size int) *Buffer {
func (p *Pool) Get(size int64) *Buffer {
b, _ := p.pool.Get().(*Buffer)
if b != nil {
if size <= cap(b.Data) {
if int(size) <= cap(b.Data) {
b.Data = b.Data[:size]
return b
}
Expand All @@ -27,7 +31,7 @@ func (p *Pool) Put(b *Buffer) {
}
}

func New(size int) *Buffer {
func New(size int64) *Buffer {
return &Buffer{Data: make([]byte, size, Align(size, DefaultSize))}
}

Expand All @@ -38,6 +42,6 @@ func Release(buf **Buffer, pool *Pool) {
}
}

func Align(size, to int) int {
func Align(size, to int64) int64 {
return ((size + (to - 1)) / to) * to
}
6 changes: 3 additions & 3 deletions internal/object/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type Store interface {

// Reads an existing object from the store, returning a reader exposing its
// content.
ReadObject(ctx context.Context, name string) (io.ReadCloser, error)
ReadObject(ctx context.Context, name string) (io.ReadSeekCloser, error)

// Retrieves information about an object in the store.
StatObject(ctx context.Context, name string) (Info, error)
Expand Down Expand Up @@ -158,7 +158,7 @@ func (emptyStore) CreateObject(context.Context, string, io.Reader, ...Tag) error
return ErrReadOnly
}

func (emptyStore) ReadObject(context.Context, string) (io.ReadCloser, error) {
func (emptyStore) ReadObject(context.Context, string) (io.ReadSeekCloser, error) {
return nil, ErrNotExist
}

Expand Down Expand Up @@ -252,7 +252,7 @@ func (store dirStore) CreateObject(ctx context.Context, name string, data io.Rea
return nil
}

func (store dirStore) ReadObject(ctx context.Context, name string) (io.ReadCloser, error) {
func (store dirStore) ReadObject(ctx context.Context, name string) (io.ReadSeekCloser, error) {
path, err := store.joinPath(name)
if err != nil {
return nil, err
Expand Down
41 changes: 40 additions & 1 deletion internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// values.
package stream

import "io"
import (
"errors"
"io"
)

// Reader is an interface implemented by types that read a stream of values of
// type T.
Expand Down Expand Up @@ -61,6 +64,42 @@ type nopCloser[T any] struct{ reader Reader[T] }
func (r *nopCloser[T]) Close() error { return nil }
func (r *nopCloser[T]) Read(values []T) (int, error) { return r.reader.Read(values) }

type ReadSeeker[T any] interface {
Reader[T]
io.Seeker
}

type ReadSeekCloser[T any] interface {
Reader[T]
io.Seeker
io.Closer
}

var (
errSeekWhence = errors.New("seek: invalid whence value")
errSeekOffset = errors.New("seek: offset out of range")
)

func Seek(offset, length, seek int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
offset = seek
case io.SeekCurrent:
offset += seek
case io.SeekEnd:
offset = length - seek
default:
return -1, errSeekWhence
}
if offset < 0 {
return -1, errSeekOffset
}
if offset > length {
offset = length
}
return offset, nil
}

// ReadAll reads all values from r and returns them as a slice, along with any
// error that occurred (other than io.EOF).
func ReadAll[T any](r Reader[T]) ([]T, error) {
Expand Down
140 changes: 110 additions & 30 deletions internal/timemachine/log.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package timemachine

import (
"bufio"
"encoding/binary"
"fmt"
"io"
"math"
"time"

"github.com/stealthrocket/timecraft/internal/buffer"
Expand All @@ -17,25 +17,27 @@ var frameBufferPool buffer.Pool

// LogReader instances allow programs to read the content of a record log.
type LogReader struct {
input *bufio.Reader
bufferSize int
startTime time.Time
batch RecordBatch
batchFrame *buffer.Buffer
input io.ReadSeeker
nextByteOffset int64
nextRecordOffset int64
bufferSize int64
startTime time.Time
batch RecordBatch
batchFrame *buffer.Buffer
}

// NewLogReader construct a new log reader consuming input from the given
// io.Reader.
func NewLogReader(input io.Reader, startTime time.Time) *LogReader {
func NewLogReader(input io.ReadSeeker, startTime time.Time) *LogReader {
return NewLogReaderSize(input, startTime, buffer.DefaultSize)
}

// NewLogReaderSize is like NewLogReader but it allows the program to configure
// the read buffer size.
func NewLogReaderSize(input io.Reader, startTime time.Time, bufferSize int) *LogReader {
func NewLogReaderSize(input io.ReadSeeker, startTime time.Time, bufferSize int64) *LogReader {
return &LogReader{
startTime: startTime,
input: bufio.NewReaderSize(input, 64*1024),
input: input,
bufferSize: buffer.Align(bufferSize, buffer.DefaultSize),
}
}
Expand All @@ -44,34 +46,69 @@ func NewLogReaderSize(input io.Reader, startTime time.Time, bufferSize int) *Log
func (r *LogReader) Close() error {
buffer.Release(&r.batchFrame, &frameBufferPool)
r.batch.Reset(r.startTime, nil, nil)
r.nextByteOffset = 0
r.nextRecordOffset = 0
return nil
}

// Seek positions r on the first record batch which contains the given record
// offset.
//
// The whence value defines how to interpret the offset (see io.Seeker).
func (r *LogReader) Seek(offset int64, whence int) (int64, error) {
nextRecordOffset, err := stream.Seek(r.nextRecordOffset, math.MaxInt64, offset, whence)
if err != nil {
return -1, err
}
// Seek was called with an offset which is before the current position,
// we have to rewind to the start and let the loop below seek the first
// batch which includes the record offset.
if nextRecordOffset < r.batch.NextOffset() {
_, err := r.input.Seek(0, io.SeekStart)
if err != nil {
return -1, err
}
r.batch.Reset(r.startTime, nil, nil)
r.nextByteOffset = 0
}
r.nextRecordOffset = nextRecordOffset
return nextRecordOffset, nil
}

// ReadRecordBatch reads the next record batch.
//
// The RecordBatch is only valid until the next call to ReadRecordBatch.
// The RecordBatch is only valid until the next call to ReadRecordBatch or Seek.
func (r *LogReader) ReadRecordBatch() (*RecordBatch, error) {
if r.batch.reader.N > 0 {
var err error
if s, ok := r.batch.reader.R.(io.Seeker); ok {
_, err = s.Seek(r.batch.reader.N, io.SeekCurrent)
} else {
_, err = io.Copy(io.Discard, &r.batch.reader)
for {
// There may be data left that was not consumed from the prevous batch
achille-roussel marked this conversation as resolved.
Show resolved Hide resolved
// returned by ReadRecordBatch (e.g. if the program read the metadata
// and decided it wasn't interested in the batch). We know the byte
// offset of the current record batch
if r.batch.reader.N > 0 {
_, err := r.input.Seek(r.nextByteOffset, io.SeekStart)
if err != nil {
return nil, err
}
r.batch.reader.R = nil
r.batch.reader.N = 0
}

buffer.Release(&r.batchFrame, &frameBufferPool)

var err error
r.batchFrame, err = r.readFrame()
if err != nil {
return nil, err
}
r.batch.reader.R = nil
r.batch.reader.N = 0
}
buffer.Release(&r.batchFrame, &frameBufferPool)
var err error
r.batchFrame, err = r.readFrame()
if err != nil {
return nil, err

r.batch.Reset(r.startTime, r.batchFrame.Data, r.input)
r.nextByteOffset = 4 + r.batchFrame.Size() + r.batch.Size()

if nextRecordOffset := r.batch.NextOffset(); nextRecordOffset >= r.nextRecordOffset {
r.nextRecordOffset = nextRecordOffset
return &r.batch, nil
}
}
r.batch.Reset(r.startTime, r.batchFrame.Data, r.input)
return &r.batch, nil
}

func (r *LogReader) readFrame() (*buffer.Buffer, error) {
Expand Down Expand Up @@ -105,7 +142,7 @@ func (r *LogReader) readFrame() (*buffer.Buffer, error) {
f.Data = f.Data[:byteLength]
} else {
defer frameBufferPool.Put(f)
newFrame := buffer.New(byteLength)
newFrame := buffer.New(int64(byteLength))
copy(newFrame.Data, f.Data)
f = newFrame
}
Expand All @@ -120,6 +157,11 @@ func (r *LogReader) readFrame() (*buffer.Buffer, error) {
return f, nil
}

var (
_ io.Closer = (*LogReader)(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why not?

Suggested change
_ io.Closer = (*LogReader)(nil)
_ io.ReadCloser = (*LogReader)(nil)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we can't read bytes from the LogReader, the underlying io.Reader isn't exposed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry i meant _ io.ReadSeeker = (*LogReader)(nil) (combining this check and the one below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah we could have combined them, same same 😁

_ io.Seeker = (*LogReader)(nil)
)

// LogRecordReader wraps a LogReader to help with reading individual records
// in order.
//
Expand All @@ -129,6 +171,8 @@ func (r *LogReader) readFrame() (*buffer.Buffer, error) {
type LogRecordReader struct {
reader *LogReader
batch *RecordBatch
offset int64
seekTo int64
}

// NewLogRecordReader creates a log record iterator.
Expand All @@ -139,26 +183,62 @@ func NewLogRecordReader(r *LogReader) *LogRecordReader {
// Read reads records from r.
//
// The record values share memory buffer with the reader, they remain valid
// until the next call to Read.
// until the next call to Read or Seek.
func (r *LogRecordReader) Read(records []Record) (int, error) {
for {
if r.batch == nil {
b, err := r.reader.ReadRecordBatch()
if err != nil {
return 0, err
}
if b.NextOffset() < r.seekTo {
continue
}
r.batch = b
r.offset = b.FirstOffset()
}

n, err := r.batch.Read(records)
offset := r.offset
r.offset += int64(n)

if offset < r.seekTo {
if skip := r.seekTo - offset; skip < int64(n) {
n = copy(records, records[skip:])
} else {
n = 0
}
}

if n > 0 || err != io.EOF {
return n, nil
return n, err
}

r.batch = nil
}
}

// Seek positions the reader on the record at the given offset.
//
// The whence value defines how to interpret the offset (see io.Seeker).
func (r *LogRecordReader) Seek(offset int64, whence int) (int64, error) {
seekTo, err := r.reader.Seek(offset, whence)
if err != nil {
return -1, err
}
if seekTo < r.offset {
r.batch, r.offset = nil, 0
} else if r.batch != nil {
if nextOffset := r.batch.NextOffset(); seekTo >= nextOffset {
r.batch, r.offset = nil, nextOffset
}
}
r.seekTo = seekTo
return seekTo, nil
}

var (
_ stream.Reader[Record] = (*LogRecordReader)(nil)
_ stream.ReadSeeker[Record] = (*LogRecordReader)(nil)
)

// LogWriter supports writing log segments to an io.Writer.
Expand Down
Loading
Loading