Skip to content
This repository has been archived by the owner on Feb 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #45 from stealthrocket/seek
Browse files Browse the repository at this point in the history
timemachine: support seeking through log records
  • Loading branch information
achille-roussel committed Jun 1, 2023
2 parents c3ae913 + 05d9512 commit d2ba046
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 48 deletions.
12 changes: 8 additions & 4 deletions internal/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ const DefaultSize = 4096

type Buffer struct{ Data []byte }

func (buf *Buffer) Size() int64 {
return int64(len(buf.Data))
}

type Pool struct{ pool sync.Pool }

func (p *Pool) Get(size int) *Buffer {
func (p *Pool) Get(size int64) *Buffer {
b, _ := p.pool.Get().(*Buffer)
if b != nil {
if size <= cap(b.Data) {
if int(size) <= cap(b.Data) {
b.Data = b.Data[:size]
return b
}
Expand All @@ -27,7 +31,7 @@ func (p *Pool) Put(b *Buffer) {
}
}

func New(size int) *Buffer {
func New(size int64) *Buffer {
return &Buffer{Data: make([]byte, size, Align(size, DefaultSize))}
}

Expand All @@ -38,6 +42,6 @@ func Release(buf **Buffer, pool *Pool) {
}
}

func Align(size, to int) int {
func Align(size, to int64) int64 {
return ((size + (to - 1)) / to) * to
}
6 changes: 3 additions & 3 deletions internal/object/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type Store interface {

// Reads an existing object from the store, returning a reader exposing its
// content.
ReadObject(ctx context.Context, name string) (io.ReadCloser, error)
ReadObject(ctx context.Context, name string) (io.ReadSeekCloser, error)

// Retrieves information about an object in the store.
StatObject(ctx context.Context, name string) (Info, error)
Expand Down Expand Up @@ -158,7 +158,7 @@ func (emptyStore) CreateObject(context.Context, string, io.Reader, ...Tag) error
return ErrReadOnly
}

func (emptyStore) ReadObject(context.Context, string) (io.ReadCloser, error) {
func (emptyStore) ReadObject(context.Context, string) (io.ReadSeekCloser, error) {
return nil, ErrNotExist
}

Expand Down Expand Up @@ -252,7 +252,7 @@ func (store dirStore) CreateObject(ctx context.Context, name string, data io.Rea
return nil
}

func (store dirStore) ReadObject(ctx context.Context, name string) (io.ReadCloser, error) {
func (store dirStore) ReadObject(ctx context.Context, name string) (io.ReadSeekCloser, error) {
path, err := store.joinPath(name)
if err != nil {
return nil, err
Expand Down
41 changes: 40 additions & 1 deletion internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// values.
package stream

import "io"
import (
"errors"
"io"
)

// Reader is an interface implemented by types that read a stream of values of
// type T.
Expand Down Expand Up @@ -61,6 +64,42 @@ type nopCloser[T any] struct{ reader Reader[T] }
func (r *nopCloser[T]) Close() error { return nil }
func (r *nopCloser[T]) Read(values []T) (int, error) { return r.reader.Read(values) }

type ReadSeeker[T any] interface {
Reader[T]
io.Seeker
}

type ReadSeekCloser[T any] interface {
Reader[T]
io.Seeker
io.Closer
}

var (
errSeekWhence = errors.New("seek: invalid whence value")
errSeekOffset = errors.New("seek: offset out of range")
)

func Seek(offset, length, seek int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
offset = seek
case io.SeekCurrent:
offset += seek
case io.SeekEnd:
offset = length - seek
default:
return -1, errSeekWhence
}
if offset < 0 {
return -1, errSeekOffset
}
if offset > length {
offset = length
}
return offset, nil
}

// ReadAll reads all values from r and returns them as a slice, along with any
// error that occurred (other than io.EOF).
func ReadAll[T any](r Reader[T]) ([]T, error) {
Expand Down
140 changes: 110 additions & 30 deletions internal/timemachine/log.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package timemachine

import (
"bufio"
"encoding/binary"
"fmt"
"io"
"math"
"time"

"github.com/stealthrocket/timecraft/internal/buffer"
Expand All @@ -17,25 +17,27 @@ var frameBufferPool buffer.Pool

// LogReader instances allow programs to read the content of a record log.
type LogReader struct {
input *bufio.Reader
bufferSize int
startTime time.Time
batch RecordBatch
batchFrame *buffer.Buffer
input io.ReadSeeker
nextByteOffset int64
nextRecordOffset int64
bufferSize int64
startTime time.Time
batch RecordBatch
batchFrame *buffer.Buffer
}

// NewLogReader construct a new log reader consuming input from the given
// io.Reader.
func NewLogReader(input io.Reader, startTime time.Time) *LogReader {
func NewLogReader(input io.ReadSeeker, startTime time.Time) *LogReader {
return NewLogReaderSize(input, startTime, buffer.DefaultSize)
}

// NewLogReaderSize is like NewLogReader but it allows the program to configure
// the read buffer size.
func NewLogReaderSize(input io.Reader, startTime time.Time, bufferSize int) *LogReader {
func NewLogReaderSize(input io.ReadSeeker, startTime time.Time, bufferSize int64) *LogReader {
return &LogReader{
startTime: startTime,
input: bufio.NewReaderSize(input, 64*1024),
input: input,
bufferSize: buffer.Align(bufferSize, buffer.DefaultSize),
}
}
Expand All @@ -44,34 +46,69 @@ func NewLogReaderSize(input io.Reader, startTime time.Time, bufferSize int) *Log
func (r *LogReader) Close() error {
buffer.Release(&r.batchFrame, &frameBufferPool)
r.batch.Reset(r.startTime, nil, nil)
r.nextByteOffset = 0
r.nextRecordOffset = 0
return nil
}

// Seek positions r on the first record batch which contains the given record
// offset.
//
// The whence value defines how to interpret the offset (see io.Seeker).
func (r *LogReader) Seek(offset int64, whence int) (int64, error) {
nextRecordOffset, err := stream.Seek(r.nextRecordOffset, math.MaxInt64, offset, whence)
if err != nil {
return -1, err
}
// Seek was called with an offset which is before the current position,
// we have to rewind to the start and let the loop below seek the first
// batch which includes the record offset.
if nextRecordOffset < r.batch.NextOffset() {
_, err := r.input.Seek(0, io.SeekStart)
if err != nil {
return -1, err
}
r.batch.Reset(r.startTime, nil, nil)
r.nextByteOffset = 0
}
r.nextRecordOffset = nextRecordOffset
return nextRecordOffset, nil
}

// ReadRecordBatch reads the next record batch.
//
// The RecordBatch is only valid until the next call to ReadRecordBatch.
// The RecordBatch is only valid until the next call to ReadRecordBatch or Seek.
func (r *LogReader) ReadRecordBatch() (*RecordBatch, error) {
if r.batch.reader.N > 0 {
var err error
if s, ok := r.batch.reader.R.(io.Seeker); ok {
_, err = s.Seek(r.batch.reader.N, io.SeekCurrent)
} else {
_, err = io.Copy(io.Discard, &r.batch.reader)
for {
// There may be data left that was not consumed from the previous batch
// returned by ReadRecordBatch (e.g. if the program read the metadata
// and decided it wasn't interested in the batch). We know the byte
// offset of the current record batch
if r.batch.reader.N > 0 {
_, err := r.input.Seek(r.nextByteOffset, io.SeekStart)
if err != nil {
return nil, err
}
r.batch.reader.R = nil
r.batch.reader.N = 0
}

buffer.Release(&r.batchFrame, &frameBufferPool)

var err error
r.batchFrame, err = r.readFrame()
if err != nil {
return nil, err
}
r.batch.reader.R = nil
r.batch.reader.N = 0
}
buffer.Release(&r.batchFrame, &frameBufferPool)
var err error
r.batchFrame, err = r.readFrame()
if err != nil {
return nil, err

r.batch.Reset(r.startTime, r.batchFrame.Data, r.input)
r.nextByteOffset = 4 + r.batchFrame.Size() + r.batch.Size()

if nextRecordOffset := r.batch.NextOffset(); nextRecordOffset >= r.nextRecordOffset {
r.nextRecordOffset = nextRecordOffset
return &r.batch, nil
}
}
r.batch.Reset(r.startTime, r.batchFrame.Data, r.input)
return &r.batch, nil
}

func (r *LogReader) readFrame() (*buffer.Buffer, error) {
Expand Down Expand Up @@ -105,7 +142,7 @@ func (r *LogReader) readFrame() (*buffer.Buffer, error) {
f.Data = f.Data[:byteLength]
} else {
defer frameBufferPool.Put(f)
newFrame := buffer.New(byteLength)
newFrame := buffer.New(int64(byteLength))
copy(newFrame.Data, f.Data)
f = newFrame
}
Expand All @@ -120,6 +157,11 @@ func (r *LogReader) readFrame() (*buffer.Buffer, error) {
return f, nil
}

var (
_ io.Closer = (*LogReader)(nil)
_ io.Seeker = (*LogReader)(nil)
)

// LogRecordReader wraps a LogReader to help with reading individual records
// in order.
//
Expand All @@ -129,6 +171,8 @@ func (r *LogReader) readFrame() (*buffer.Buffer, error) {
type LogRecordReader struct {
reader *LogReader
batch *RecordBatch
offset int64
seekTo int64
}

// NewLogRecordReader creates a log record iterator.
Expand All @@ -139,26 +183,62 @@ func NewLogRecordReader(r *LogReader) *LogRecordReader {
// Read reads records from r.
//
// The record values share memory buffer with the reader, they remain valid
// until the next call to Read.
// until the next call to Read or Seek.
func (r *LogRecordReader) Read(records []Record) (int, error) {
for {
if r.batch == nil {
b, err := r.reader.ReadRecordBatch()
if err != nil {
return 0, err
}
if b.NextOffset() < r.seekTo {
continue
}
r.batch = b
r.offset = b.FirstOffset()
}

n, err := r.batch.Read(records)
offset := r.offset
r.offset += int64(n)

if offset < r.seekTo {
if skip := r.seekTo - offset; skip < int64(n) {
n = copy(records, records[skip:])
} else {
n = 0
}
}

if n > 0 || err != io.EOF {
return n, nil
return n, err
}

r.batch = nil
}
}

// Seek positions the reader on the record at the given offset.
//
// The whence value defines how to interpret the offset (see io.Seeker).
func (r *LogRecordReader) Seek(offset int64, whence int) (int64, error) {
seekTo, err := r.reader.Seek(offset, whence)
if err != nil {
return -1, err
}
if seekTo < r.offset {
r.batch, r.offset = nil, 0
} else if r.batch != nil {
if nextOffset := r.batch.NextOffset(); seekTo >= nextOffset {
r.batch, r.offset = nil, nextOffset
}
}
r.seekTo = seekTo
return seekTo, nil
}

var (
_ stream.Reader[Record] = (*LogRecordReader)(nil)
_ stream.ReadSeeker[Record] = (*LogRecordReader)(nil)
)

// LogWriter supports writing log segments to an io.Writer.
Expand Down
Loading

0 comments on commit d2ba046

Please sign in to comment.