Skip to content

Commit

Permalink
Use a dynamic batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
moredatarequired committed May 14, 2024
1 parent 30d9d00 commit cac9d37
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 27 deletions.
4 changes: 2 additions & 2 deletions core/internal/filetransfer/file_transfer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Storage int

const (
bufferSize = 32
defaultConcurrencyLimit = 128
DefaultConcurrencyLimit = 128
)

type FileTransfer interface {
Expand Down Expand Up @@ -92,7 +92,7 @@ func NewFileTransferManager(opts ...FileTransferManagerOption) FileTransferManag
fm := fileTransferManager{
inChan: make(chan *Task, bufferSize),
wg: &sync.WaitGroup{},
semaphore: make(chan struct{}, defaultConcurrencyLimit),
semaphore: make(chan struct{}, DefaultConcurrencyLimit),
}

for _, opt := range opts {
Expand Down
63 changes: 38 additions & 25 deletions core/pkg/artifacts/saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/url"
"os"
"strings"
"time"

"github.com/Khan/genqlient/graphql"

Expand All @@ -27,8 +28,10 @@ type ArtifactSaver struct {
Artifact *service.ArtifactRecord
HistoryStep int64
StagingDir string
batchSize int
maxActiveBatches int
numTotal int
numDone int
startTime time.Time
}

type serverFileResponse struct {
Expand Down Expand Up @@ -56,7 +59,6 @@ func NewArtifactSaver(
Artifact: artifact,
HistoryStep: historyStep,
StagingDir: stagingDir,
batchSize: 1000,
maxActiveBatches: 5,
}
}
Expand Down Expand Up @@ -153,7 +155,9 @@ func (as *ArtifactSaver) uploadFiles(
}
fileSpecs[name] = fileSpec
}
as.numTotal = len(fileSpecs)

as.startTime = time.Now()
var err error
for len(fileSpecs) > 0 {
numNeedUploading := len(fileSpecs)
Expand Down Expand Up @@ -182,28 +186,21 @@ func (as *ArtifactSaver) uploadFiles(
func (as *ArtifactSaver) processFiles(
manifest *Manifest, fileSpecs map[string]gql.CreateArtifactFileSpecInput,
) (map[string]gql.CreateArtifactFileSpecInput, error) {
maxActive := as.maxActiveBatches * as.batchSize

// Channels to get responses from the batch url retrievers.
readyChan := make(chan serverFileResponse, maxActive)
readyChan := make(chan serverFileResponse)
errorChan := make(chan error)

doneChan := make(chan *filetransfer.Task, maxActive)
doneChan := make(chan *filetransfer.Task)
mustRetry := map[string]gql.CreateArtifactFileSpecInput{}

numTotal := len(fileSpecs)
numActive := 0
numDone := 0

fileSpecBatches := batchify(fileSpecs, as.batchSize)

for numDone+len(mustRetry) < numTotal {
for as.numDone+len(mustRetry) < as.numTotal {
// Start new batches until we get to the desired number of active uploads.
for len(fileSpecBatches) > 0 && numActive <= maxActive-as.batchSize {
batch := fileSpecBatches[0]
for len(fileSpecs) > 0 && numActive <= (as.maxActiveBatches-1)*as.batchSize() {
batch := as.nextBatch(fileSpecs)
numActive += len(batch)
go as.batchFileDataRetriever(batch, readyChan, errorChan)
fileSpecBatches = fileSpecBatches[1:]
}
select {
// Start any uploads that are ready.
Expand All @@ -215,7 +212,7 @@ func (as *ArtifactSaver) processFiles(
if fileInfo.UploadUrl == nil {
// The server already has this file.
numActive--
numDone++
as.numDone++
continue
}
task := newUploadTask(fileInfo, *entry.LocalPath)
Expand All @@ -227,7 +224,7 @@ func (as *ArtifactSaver) processFiles(
if result.Err != nil {
mustRetry[result.Name] = fileSpecs[result.Name]

Check warning on line 225 in core/pkg/artifacts/saver.go

View check run for this annotation

Codecov / codecov/patch

core/pkg/artifacts/saver.go#L225

Added line #L225 was not covered by tests
} else {
numDone++
as.numDone++
}
// Check for errors.
case err := <-errorChan:
Expand Down Expand Up @@ -266,17 +263,33 @@ func (as *ArtifactSaver) batchFileDataRetriever(
}
}

func batchify(
fileSpecs map[string]gql.CreateArtifactFileSpecInput, batchSize int,
) [][]gql.CreateArtifactFileSpecInput {
batches := [][]gql.CreateArtifactFileSpecInput{}
for _, fileSpec := range fileSpecs {
if len(batches) == 0 || len(batches[len(batches)-1]) >= batchSize {
batches = append(batches, []gql.CreateArtifactFileSpecInput{})
func (as *ArtifactSaver) nextBatch(
fileSpecs map[string]gql.CreateArtifactFileSpecInput,
) []gql.CreateArtifactFileSpecInput {
batchSize := as.batchSize()
batch := []gql.CreateArtifactFileSpecInput{}
for key := range fileSpecs {
batch = append(batch, fileSpecs[key])
delete(fileSpecs, key)
if len(batch) >= batchSize {
break
}
batches[len(batches)-1] = append(batches[len(batches)-1], fileSpec)
}
return batches
return batch
}

func (as *ArtifactSaver) batchSize() int {
// We want to keep the number of pending uploads under the concurrency limit until
// we know how fast they upload.
minBatchSize := filetransfer.DefaultConcurrencyLimit / as.maxActiveBatches
maxBatchSize := 10000 / as.maxActiveBatches
if as.numDone < filetransfer.DefaultConcurrencyLimit {
return minBatchSize
}
// Given the average time per item, estimate a batch size that will take 1 minute.
sinceStart := time.Since(as.startTime)
filesPerMin := int(float64(as.numDone) / sinceStart.Minutes())
return max(min(maxBatchSize, filesPerMin), minBatchSize)

Check warning on line 292 in core/pkg/artifacts/saver.go

View check run for this annotation

Codecov / codecov/patch

core/pkg/artifacts/saver.go#L290-L292

Added lines #L290 - L292 were not covered by tests
}

func newUploadTask(fileInfo serverFileResponse, localPath string) *filetransfer.Task {
Expand Down

0 comments on commit cac9d37

Please sign in to comment.