Skip to content
This repository has been archived by the owner on Feb 17, 2024. It is now read-only.

Commit

Permalink
fetch log manigests concurrently
Browse files Browse the repository at this point in the history
Signed-off-by: Achille Roussel <[email protected]>
  • Loading branch information
achille-roussel committed May 30, 2023
1 parent 5fff760 commit adac0ed
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 7 deletions.
56 changes: 56 additions & 0 deletions internal/stream/chan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package stream

import "io"

type Optional[T any] struct {
val T
err error
}

func Opt[T any](val T, err error) Optional[T] {
return Optional[T]{val: val, err: err}
}

func (opt *Optional[T]) Value() (T, error) {
return opt.val, opt.err
}

func ChanReader[T any](ch <-chan Optional[T]) Reader[T] {
return chanReader[T](ch)
}

type chanReader[T any] <-chan Optional[T]

func (r chanReader[T]) Read(values []T) (int, error) {
if len(values) == 0 {
return 0, nil
}

opt, ok := <-r
if !ok {
return 0, io.EOF
}
v, err := opt.Value()
if err != nil {
return 0, err
}
values[0] = v

for n := 1; n < len(values); n++ {
select {
case opt, ok := <-r:
if !ok {
return n, io.EOF
}
v, err := opt.Value()
if err != nil {
return n, err
}
values[n] = v
default:
return n, nil
}
}

return len(values), nil
}
46 changes: 39 additions & 7 deletions internal/timemachine/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"path"
"strconv"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -362,16 +363,47 @@ func (reg *Registry) ListLogSegments(ctx context.Context, processID format.UUID)
}

func (reg *Registry) ListLogManifests(ctx context.Context) stream.ReadCloser[*format.Manifest] {
reader := reg.Store.ListObjects(ctx, "log/")
return convert(reader, func(info object.Info) (*format.Manifest, error) {
processID, err := uuid.Parse(path.Base(info.Name))
if err != nil {
return nil, err
ch := make(chan stream.Optional[*format.Manifest])
ctx, cancel := context.WithCancel(ctx)

go func(reader stream.ReadCloser[object.Info]) {
defer close(ch)
defer reader.Close()

it := stream.Iter[object.Info](reader)
wg := sync.WaitGroup{}

for it.Next() {
processID, err := uuid.Parse(path.Base(it.Value().Name))
if err != nil {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
ch <- stream.Opt(reg.LookupLogManifest(ctx, processID))
}()
}
return reg.LookupLogManifest(ctx, processID)
})

if err := it.Err(); err != nil {
ch <- stream.Opt[*format.Manifest](nil, err)
}

wg.Wait()
}(reg.Store.ListObjects(ctx, "log/"))

return stream.NewReadCloser(stream.ChanReader(ch), closerFunc(func() error {
cancel()
for range ch {
}
return nil
}))
}

type closerFunc func() error

func (f closerFunc) Close() error { return f() }

func (reg *Registry) LookupLogManifest(ctx context.Context, processID format.UUID) (*format.Manifest, error) {
r, err := reg.Store.ReadObject(ctx, reg.manifestKey(processID))
if err != nil {
Expand Down

0 comments on commit adac0ed

Please sign in to comment.