Skip to content

Commit

Permalink
Use buffered channels and background workers to manage the upload wor…
Browse files Browse the repository at this point in the history
…k queue
  • Loading branch information
moredatarequired committed May 7, 2024
1 parent e9a0dc2 commit c7788db
Showing 1 changed file with 155 additions and 95 deletions.
250 changes: 155 additions & 95 deletions core/pkg/artifacts/saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/url"
"os"
"strings"
"time"

"github.com/Khan/genqlient/graphql"

Expand All @@ -27,6 +26,14 @@ type ArtifactSaver struct {
Artifact *service.ArtifactRecord
HistoryStep int64
StagingDir string
batchSize int
}

type serverFileResponse struct {
Name string
BirthArtifactID string
UploadUrl *string
UploadHeaders []string
}

func NewArtifactSaver(
Expand All @@ -45,6 +52,7 @@ func NewArtifactSaver(
Artifact: artifact,
HistoryStep: historyStep,
StagingDir: stagingDir,
batchSize: 1000,
}
}

Expand Down Expand Up @@ -123,17 +131,11 @@ func (as *ArtifactSaver) createManifest(
return response.GetCreateArtifactManifest().ArtifactManifest, nil
}

func (as *ArtifactSaver) uploadFiles(artifactID string, manifest *Manifest, manifestID string, _ chan<- *service.Record) error {
const batchSize int = 10000
const maxBacklog int = 10000

type TaskResult struct {
Task *filetransfer.Task
Name string
}

// Prepare all file specs.
var fileSpecs []gql.CreateArtifactFileSpecInput
func (as *ArtifactSaver) uploadFiles(
artifactID string, manifest *Manifest, manifestID string, _ chan<- *service.Record,
) error {
// Prepare GQL input for files that (might) need to be uploaded.
fileSpecs := map[string]gql.CreateArtifactFileSpecInput{}
for name, entry := range manifest.Contents {
if entry.LocalPath == nil {
continue
Expand All @@ -144,99 +146,157 @@ func (as *ArtifactSaver) uploadFiles(artifactID string, manifest *Manifest, mani
Md5: entry.Digest,
ArtifactManifestID: &manifestID,
}
fileSpecs = append(fileSpecs, fileSpec)
fileSpecs[name] = fileSpec
}

// Upload in batches.
numInProgress, numDone := 0, 0
nameToScheduledTime := map[string]time.Time{}
taskResultsChan := make(chan TaskResult)
fileSpecsBatch := make([]gql.CreateArtifactFileSpecInput, 0, batchSize)
for numDone < len(fileSpecs) {
// Prepare a batch.
now := time.Now()
fileSpecsBatch = fileSpecsBatch[:0]
for _, fileSpec := range fileSpecs {
if _, ok := nameToScheduledTime[fileSpec.Name]; ok {
continue
}
nameToScheduledTime[fileSpec.Name] = now
fileSpecsBatch = append(fileSpecsBatch, fileSpec)
if len(fileSpecsBatch) >= batchSize {
break
}
var err error
for len(fileSpecs) > 0 {
numNeedUploading := len(fileSpecs)
if fileSpecs, err = as.processFiles(manifest, fileSpecs); err != nil {
return err

Check warning on line 156 in core/pkg/artifacts/saver.go

View check run for this annotation

Codecov / codecov/patch

core/pkg/artifacts/saver.go#L156

Added line #L156 was not covered by tests
}
if len(fileSpecsBatch) > 0 {
// Fetch upload URLs.
response, err := gql.CreateArtifactFiles(
as.Ctx,
as.GraphqlClient,
fileSpecsBatch,
gql.ArtifactStorageLayoutV2,
// If more than half of the remaining files uploaded we'll keep retrying.
// We shouldn't ordinarily need to retry at all: our internal client handles
// retryable errors, and the only failure this retry loop is for is when signed
// urls expire before an upload was started (exceedingly rare).
// Still, as long as more than half of them succeed in each iteration this will
// eventually terminate, so we're generous with our retry policy.
if len(fileSpecs) > numNeedUploading/2 {
return fmt.Errorf(
"most remaining uploads (%d/%d) have failed, giving up",
len(fileSpecs), numNeedUploading,

Check warning on line 167 in core/pkg/artifacts/saver.go

View check run for this annotation

Codecov / codecov/patch

core/pkg/artifacts/saver.go#L165-L167

Added lines #L165 - L167 were not covered by tests
)
if err != nil {
return err
}
if len(fileSpecsBatch) != len(response.CreateArtifactFiles.Files.Edges) {
return fmt.Errorf(
"expected %v upload URLs, got %v",
len(fileSpecsBatch),
len(response.CreateArtifactFiles.Files.Edges),
)
}
// Save birth artifact ids, schedule uploads.
for i, edge := range response.CreateArtifactFiles.Files.Edges {
name := fileSpecsBatch[i].Name
entry := manifest.Contents[name]
entry.BirthArtifactID = &edge.Node.Artifact.Id
manifest.Contents[name] = entry
if edge.Node.UploadUrl == nil {
numDone++
continue
}
numInProgress++
task := &filetransfer.Task{
FileKind: filetransfer.RunFileKindArtifact,
Type: filetransfer.UploadTask,
Path: *entry.LocalPath,
Url: *edge.Node.UploadUrl,
Headers: edge.Node.UploadHeaders,
}
task.SetCompletionCallback(
func(t *filetransfer.Task) {
taskResultsChan <- TaskResult{t, name}
},
)
as.FileTransferManager.AddTask(task)
}
}
// Wait for filetransfer to catch up. If there's nothing more to schedule, wait for all in progress tasks.
for numInProgress > maxBacklog || (len(fileSpecsBatch) == 0 && numInProgress > 0) {
numInProgress--
result := <-taskResultsChan
if result.Task.Err != nil {
// We want to retry when the signed URL expires. However, distinguishing that error from others is not
// trivial. As a heuristic, we retry if the request failed more than an hour after we fetched the URL.
if time.Since(nameToScheduledTime[result.Name]) < 1*time.Hour {
return result.Task.Err
}
delete(nameToScheduledTime, result.Name) // retry
if len(fileSpecs) > 0 {
slog.Warn("some files failed to upload, retrying", "count", len(fileSpecs))

Check warning on line 171 in core/pkg/artifacts/saver.go

View check run for this annotation

Codecov / codecov/patch

core/pkg/artifacts/saver.go#L171

Added line #L171 was not covered by tests
}
}
return nil
}

func (as *ArtifactSaver) processFiles(
manifest *Manifest, fileSpecs map[string]gql.CreateArtifactFileSpecInput,
) (map[string]gql.CreateArtifactFileSpecInput, error) {

// Channels to get responses from the batch url retrievers.
readyChan := make(chan serverFileResponse, as.batchSize)
errorChan := make(chan error)

doneChan := make(chan *filetransfer.Task, as.batchSize)
mustRetry := map[string]gql.CreateArtifactFileSpecInput{}

numTotal := len(fileSpecs)
numActive := 0
numDone := 0

fileSpecBatches := batchify(fileSpecs, as.batchSize)

for numDone+len(mustRetry) < numTotal {
// If we have less than a batch worth of requests open, start a new batch.
if numActive <= as.batchSize && len(fileSpecBatches) > 0 {
batch := fileSpecBatches[0]
numActive += len(batch)
go as.batchFileDataRetriever(batch, readyChan, errorChan)
fileSpecBatches = fileSpecBatches[1:]
}
select {
// Start any uploads that are ready.
case fileInfo := <-readyChan:
entry := manifest.Contents[fileInfo.Name]
entry.BirthArtifactID = &fileInfo.BirthArtifactID
manifest.Contents[fileInfo.Name] = entry
cacheEntry(as.FileCache, entry)
if fileInfo.UploadUrl == nil {
// The server already has this file.
numActive--
numDone++
continue
}
numDone++
entry := manifest.Contents[result.Name]
if !entry.SkipCache {
digest := entry.Digest
go func() {
err := as.FileCache.AddFileAndCheckDigest(result.Task.Path, digest)
if err != nil {
slog.Error("error adding file to cache", "err", err)
}
}()
task := newUploadTask(fileInfo, *entry.LocalPath)
task.SetCompletionCallback(func(t *filetransfer.Task) { doneChan <- t })
as.FileTransferManager.AddTask(task)
// Listen for completed uploads, adding to the retry list if they failed.
case result := <-doneChan:
numActive--
if result.Err != nil {
mustRetry[result.Name] = fileSpecs[result.Name]

Check warning on line 222 in core/pkg/artifacts/saver.go

View check run for this annotation

Codecov / codecov/patch

core/pkg/artifacts/saver.go#L222

Added line #L222 was not covered by tests
} else {
numDone++
}
// Check for errors.
case err := <-errorChan:
return nil, err

Check warning on line 228 in core/pkg/artifacts/saver.go

View check run for this annotation

Codecov / codecov/patch

core/pkg/artifacts/saver.go#L227-L228

Added lines #L227 - L228 were not covered by tests
}
}
return nil
return mustRetry, nil
}

// batchFileDataRetriever takes a batch of file specs, requests upload URLs for each file,
// assembles the info needed for the next step, and feeds them into an output channel.
func (as *ArtifactSaver) batchFileDataRetriever(
batch []gql.CreateArtifactFileSpecInput,
resultChan chan<- serverFileResponse,
errorChan chan<- error,
) {
response, err := gql.CreateArtifactFiles(
as.Ctx, as.GraphqlClient, batch, gql.ArtifactStorageLayoutV2,
)
if err != nil {
errorChan <- fmt.Errorf("requesting upload URLs failed: %v", err)
return

Check warning on line 246 in core/pkg/artifacts/saver.go

View check run for this annotation

Codecov / codecov/patch

core/pkg/artifacts/saver.go#L245-L246

Added lines #L245 - L246 were not covered by tests
}
batchDetails := response.CreateArtifactFiles.Files.Edges
if len(batch) != len(batchDetails) {
errorChan <- fmt.Errorf("expected %v upload URLs, got %v", len(batch), len(batchDetails))
return

Check warning on line 251 in core/pkg/artifacts/saver.go

View check run for this annotation

Codecov / codecov/patch

core/pkg/artifacts/saver.go#L250-L251

Added lines #L250 - L251 were not covered by tests
}
for i, edge := range batchDetails {
// We block on this channel, so we won't request data on more batches until
// the next step has handled these responses.
resultChan <- serverFileResponse{
Name: batch[i].Name,
BirthArtifactID: edge.Node.Artifact.Id,
UploadUrl: edge.Node.UploadUrl,
UploadHeaders: edge.Node.UploadHeaders,
}
}
}

func batchify(
fileSpecs map[string]gql.CreateArtifactFileSpecInput, batchSize int,
) [][]gql.CreateArtifactFileSpecInput {
batches := [][]gql.CreateArtifactFileSpecInput{}
for _, fileSpec := range fileSpecs {
if len(batches) == 0 || len(batches[len(batches)-1]) >= batchSize {
batches = append(batches, []gql.CreateArtifactFileSpecInput{})
}
batches[len(batches)-1] = append(batches[len(batches)-1], fileSpec)
}
return batches
}

func newUploadTask(fileInfo serverFileResponse, localPath string) *filetransfer.Task {
return &filetransfer.Task{
FileKind: filetransfer.RunFileKindArtifact,
Type: filetransfer.UploadTask,
Path: localPath,
Name: fileInfo.Name,
Url: *fileInfo.UploadUrl,
Headers: fileInfo.UploadHeaders,
}
}

func cacheEntry(cache Cache, entry ManifestEntry) {
if entry.SkipCache {
return
}
path := *entry.LocalPath
digest := entry.Digest
go func() {
if err := cache.AddFileAndCheckDigest(path, digest); err != nil {
slog.Error("error adding file to cache", "err", err)

Check warning on line 297 in core/pkg/artifacts/saver.go

View check run for this annotation

Codecov / codecov/patch

core/pkg/artifacts/saver.go#L297

Added line #L297 was not covered by tests
}
}()
}

func (as *ArtifactSaver) resolveClientIDReferences(manifest *Manifest) error {
Expand Down

0 comments on commit c7788db

Please sign in to comment.