Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Merge filestream "loop_" files #7624

Merged
merged 4 commits into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 6 additions & 54 deletions core/pkg/filestream/filestream.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,4 @@
// Package filestream implements routines necessary for communicating with
// the W&B backend filestream service.
//
// Internally there are three goroutines spun up:
//
// process: process records into an appropriate format to transmit
// transmit: collect and transmit messages to the filestream service
// feedback: process feedback from the filestream service
//
// Below demonstrates a common execution flow through this package:
//
// {caller}:
// - filestream.go: NewFileStream - create service
// - filestream.go: FileStream.Start - spin up worker goroutines
// - filestream.go: FileStream.StreamRecord - add a record to be processed and sent
// - loop_process.go: Filestream.addProcess - add to process channel
// {goroutine process}:
// - loop_process.go: Filestream.loopProcess - loop acting on process channel
// - loop_transmit.go: Filestream.addTransmit - add to transmit channel
// {goroutine transmit}:
// - loop_transmit.go: Filestream.loopTransmit - loop acting on transmit channel
// - collector.go: chunkCollector - class to coordinate collecting work from transmit channel
// - collector.go: chunkCollector.read - read the first transmit work from transmit channel
// - collector.go: chunkCollector.readMore - keep reading until we have enough or hit timeout
// - collector.go: chunkCollector.dump - create a blob to be used to serialize into json to send
// - loop_transmit.go: Filestream.send - send json to backend filestream service
// - loop_feedback.go: Filestream.add_feedback - add to feedback channel
// {goroutine feedback}
// - loop_feedback.go: Filestream.loopFeedback - loop acting on feedback channel
// {caller}
// - filestream.go: FileStream.Close - graceful shutdown of worker goroutines
// Package filestream communicates with the W&B backend filestream service.
package filestream

import (
Expand Down Expand Up @@ -108,9 +78,6 @@ type fileStream struct {
processChan chan Update
transmitChan chan CollectorStateUpdate
feedbackChan chan map[string]interface{}

processWait *sync.WaitGroup
transmitWait *sync.WaitGroup
feedbackWait *sync.WaitGroup

// keep track of where we are streaming each file chunk
Expand Down Expand Up @@ -167,12 +134,10 @@ func NewFileStream(params FileStreamParams) FileStream {
logger: params.Logger,
printer: params.Printer,
apiClient: params.ApiClient,
processWait: &sync.WaitGroup{},
transmitWait: &sync.WaitGroup{},
feedbackWait: &sync.WaitGroup{},
processChan: make(chan Update, BufferSize),
transmitChan: make(chan CollectorStateUpdate, BufferSize),
feedbackChan: make(chan map[string]interface{}, BufferSize),
feedbackWait: &sync.WaitGroup{},
offsetMap: make(FileStreamOffsetMap),
maxItemsPerPush: defaultMaxItemsPerPush,
deadChanOnce: &sync.Once{},
Expand Down Expand Up @@ -220,36 +185,23 @@ func (fs *fileStream) Start(
fs.offsetMap = maps.Clone(offsetMap)
}

fs.processWait.Add(1)
go func() {
defer fs.processWait.Done()
fs.loopProcess(fs.processChan)
}()

fs.transmitWait.Add(1)
go func() {
defer fs.transmitWait.Done()
fs.loopTransmit(fs.transmitChan)
}()
go fs.loopProcess()
go fs.loopTransmit()

fs.feedbackWait.Add(1)
go func() {
defer fs.feedbackWait.Done()
fs.loopFeedback(fs.feedbackChan)
fs.loopFeedback()
}()
}

func (fs *fileStream) StreamUpdate(update Update) {
fs.logger.Debug("filestream: stream update", "update", update)
fs.addProcess(update)
fs.processChan <- update
}

func (fs *fileStream) Close() {
close(fs.processChan)
fs.processWait.Wait()
close(fs.transmitChan)
fs.transmitWait.Wait()
close(fs.feedbackChan)
fs.feedbackWait.Wait()
fs.logger.Debug("filestream: closed")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,48 @@ type FsTransmitFileData struct {
Content []string `json:"content"`
}

func (fs *fileStream) addTransmit(chunk CollectorStateUpdate) {
fs.transmitChan <- chunk
// loopProcess ingests the processChan to process incoming data into
// FS requests.
func (fs *fileStream) loopProcess() {
// Close the output channel at the end.
defer close(fs.transmitChan)

fs.logger.Debug("filestream: open", "path", fs.path)

for update := range fs.processChan {
err := update.Apply(UpdateContext{
ModifyRequest: func(csu CollectorStateUpdate) {
fs.transmitChan <- csu
},

Settings: fs.settings,
ClientID: fs.clientId,

Logger: fs.logger,
Printer: fs.printer,
})

if err != nil {
fs.logFatalAndStopWorking(err)
break
}
}

// Flush input channel if we exited early.
for range fs.processChan {
}
}

// loopTransmit sends updates to the backend, consuming the entire channel.
// loopTransmit sends updates in the transmitChan to the backend.
//
// Updates are batched to reduce the total number of HTTP requests.
// An empty "heartbeat" request is sent when there are no updates for too long,
// guaranteeing that a request is sent at least once every period specified
// by `heartbeatStopwatch`.
func (fs *fileStream) loopTransmit(updates <-chan CollectorStateUpdate) {
func (fs *fileStream) loopTransmit() {
// Close the output channel at the end.
defer close(fs.feedbackChan)

state := CollectorState{}

transmissions := make(chan *FsTransmitData)
Expand All @@ -45,7 +76,7 @@ func (fs *fileStream) loopTransmit(updates <-chan CollectorStateUpdate) {
transmitWG.Add(1)
go func() {
defer transmitWG.Done()
fs.sendAll(transmissions)
fs.sendAll(transmissions, fs.feedbackChan)
}()

// Periodically send heartbeats.
Expand All @@ -65,9 +96,9 @@ func (fs *fileStream) loopTransmit(updates <-chan CollectorStateUpdate) {
//
// We try to batch updates that happen in quick succession by waiting a
// short time after receiving an update.
for firstUpdate := range updates {
for firstUpdate := range fs.transmitChan {
firstUpdate.Apply(&state)
fs.collectBatch(&state, updates)
fs.collectBatch(&state, fs.transmitChan)

fs.heartbeatStopwatch.Reset()
data, hasData := state.Consume(fs.offsetMap, false /*isDone*/)
Expand All @@ -87,6 +118,12 @@ func (fs *fileStream) loopTransmit(updates <-chan CollectorStateUpdate) {
transmitWG.Wait()
}

// loopFeedback consumes the feedbackChan to process feedback from the backend.
func (fs *fileStream) loopFeedback() {
for range fs.feedbackChan {
}
}

func (fs *fileStream) sendHeartbeats(
stop <-chan struct{},
out chan<- *FsTransmitData,
Expand Down Expand Up @@ -122,9 +159,12 @@ func (fs *fileStream) collectBatch(
}
}

func (fs *fileStream) sendAll(data <-chan *FsTransmitData) {
func (fs *fileStream) sendAll(
data <-chan *FsTransmitData,
feedbackChan chan<- map[string]any,
) {
for x := range data {
err := fs.send(x)
err := fs.send(x, feedbackChan)

if err != nil {
fs.logFatalAndStopWorking(err)
Expand All @@ -137,7 +177,10 @@ func (fs *fileStream) sendAll(data <-chan *FsTransmitData) {
}
}

func (fs *fileStream) send(data *FsTransmitData) error {
func (fs *fileStream) send(
data *FsTransmitData,
feedbackChan chan<- map[string]any,
) error {
// Stop working after death to avoid data corruption.
if fs.isDead() {
return fmt.Errorf("filestream: can't send because I am dead")
Expand Down Expand Up @@ -183,7 +226,7 @@ func (fs *fileStream) send(data *FsTransmitData) error {
if err != nil {
fs.logger.CaptureError("json decode error", err)
}
fs.addFeedback(res)
feedbackChan <- res
fs.logger.Debug("filestream: post response", "response", res)
return nil
}
10 changes: 0 additions & 10 deletions core/pkg/filestream/loop_feedback.go

This file was deleted.

31 changes: 0 additions & 31 deletions core/pkg/filestream/loop_process.go

This file was deleted.