Skip to content
4 changes: 2 additions & 2 deletions packages/orchestrator/pkg/factories/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ func run(config cfg.Config, opts Options) (success bool) {
if err != nil {
logger.L().Fatal(ctx, "failed to create orchestrator server", zap.Error(err))
}
closers = append(closers, closer{"orchestrator server", func(context.Context) error {
return orchestratorService.Close()
closers = append(closers, closer{"orchestrator server", func(ctx context.Context) error {
return orchestratorService.Close(ctx)
}})

// template manager sandbox logger
Expand Down
15 changes: 10 additions & 5 deletions packages/orchestrator/pkg/sandbox/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,19 @@ var (
)

const (
futureTTL = 1 * time.Hour
// futureTTL must outlive a parent upload's full retry window so a child's
// in-memory Wait still finds the parent's future. Keep >= the upload retry
// budget (server.uploadTotalBudget, 2h).
futureTTL = 3 * time.Hour

// refreshHeaderBudget bounds how long an upload Wait polls remote storage
// for a parent's V4 header. Crosses orchestrators: A may still be uploading
// on a remote orch when B's runV4 calls Wait(A) here. Matches the
// per-upload bound in server.uploadTimeout — anything longer means the
// parent's upload is itself stuck and would have failed on its own.
refreshHeaderBudget = 20 * time.Minute
// on a remote orch when B's runV4 calls Wait(A) here. It must be >= the
// parent's full retry window (server.uploadTotalBudget, 2h); otherwise the
// poll's budget expiry returns a non-retryable "object does not exist" and
// the child gives up while the parent is still retrying. The per-attempt
// context (server.uploadTimeout) bounds the actual poll duration.
refreshHeaderBudget = 2 * time.Hour

// uploadDoneChannelPrefix is the Redis pub/sub channel prefix for per-build
// upload-finished signals. Empty payload = success; non-empty = upload error.
Expand Down
65 changes: 64 additions & 1 deletion packages/orchestrator/pkg/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/jellydator/ttlcache/v3"
Expand Down Expand Up @@ -38,6 +39,10 @@ const uploadedBuildsTTL = 1 * time.Hour
// MaxStartingInstancesPerNode feature flag and resize the semaphore.
const startingSandboxesLimitRefreshInterval = 30 * time.Second

// uploadDrainLogInterval is how often Close logs progress while waiting for
// in-flight snapshot uploads to finish during shutdown.
const uploadDrainLogInterval = 10 * time.Second

type Server struct {
orchestrator.UnimplementedSandboxServiceServer
orchestrator.UnimplementedChunkServiceServer
Expand All @@ -58,6 +63,13 @@ type Server struct {
uploads *sandbox.Uploads
sandboxCreateDuration metric.Int64Histogram
sandboxKilledCounter metric.Int64Counter
uploadFailedCounter metric.Int64Counter

// uploadsWG tracks in-flight async snapshot uploads so a graceful shutdown
// can wait for them to finish instead of dropping them. uploadsInFlight is
// the live count, used to log drain progress during shutdown.
uploadsWG sync.WaitGroup
uploadsInFlight atomic.Int64

done chan struct{}
closeOnce sync.Once
Expand Down Expand Up @@ -123,6 +135,12 @@ func New(ctx context.Context, cfg ServiceConfig) (*Server, error) {
}
server.sandboxKilledCounter = sandboxKilledCounter

uploadFailedCounter, err := telemetry.GetCounter(meter, telemetry.OrchestratorSnapshotUploadFailedCounterName)
if err != nil {
return nil, fmt.Errorf("failed to register snapshot upload failed counter: %w", err)
}
server.uploadFailedCounter = uploadFailedCounter

_, err = telemetry.GetObservableUpDownCounter(meter, telemetry.OrchestratorSandboxCountMeterName, func(_ context.Context, observer metric.Int64Observer) error {
observer.Observe(int64(server.sandboxFactory.Sandboxes.Count()))

Expand Down Expand Up @@ -156,16 +174,61 @@ func New(ctx context.Context, cfg ServiceConfig) (*Server, error) {
return server, nil
}

func (s *Server) Close() error {
func (s *Server) Close(ctx context.Context) error {
s.closeOnce.Do(func() {
close(s.done)
})

// Wait for in-flight snapshot uploads to finish so a graceful shutdown
// doesn't drop a snapshot that is still uploading. ctx is cancelled on a
// forced stop, in which case we stop waiting and let the process exit.
uploadsDone := make(chan struct{})
go func() {
s.uploadsWG.Wait()
close(uploadsDone)
}()

s.drainUploads(ctx, uploadsDone)

s.uploadedBuilds.Stop()

return nil
}

// drainUploads waits for in-flight snapshot uploads to finish, logging progress
// periodically, until they complete or ctx is cancelled (forced stop).
func (s *Server) drainUploads(ctx context.Context, uploadsDone <-chan struct{}) {
inFlight := s.uploadsInFlight.Load()
if inFlight == 0 {
return
}

logger.L().Info(ctx, "waiting for in-flight snapshot uploads to finish", zap.Int64("uploads", inFlight))

ticker := time.NewTicker(uploadDrainLogInterval)
defer ticker.Stop()

for {
select {
case <-uploadsDone:
logger.L().Info(ctx, "all in-flight snapshot uploads finished")

return
case <-ctx.Done():
logger.L().Warn(ctx, "shutting down with snapshot uploads still in flight",
zap.Int64("uploads", s.uploadsInFlight.Load()),
zap.Error(context.Cause(ctx)),
)

return
case <-ticker.C:
logger.L().Info(ctx, "still waiting for in-flight snapshot uploads",
zap.Int64("uploads", s.uploadsInFlight.Load()),
)
}
}
}

func (s *Server) refreshStartingSandboxesLimit(ctx context.Context) {
ticker := time.NewTicker(startingSandboxesLimitRefreshInterval)
defer ticker.Stop()
Expand Down
66 changes: 51 additions & 15 deletions packages/orchestrator/pkg/server/sandboxes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
sbxlogger "github.com/e2b-dev/infra/packages/shared/pkg/logger/sandbox"
"github.com/e2b-dev/infra/packages/shared/pkg/retry"
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
Expand All @@ -45,12 +46,25 @@ const (
// acquireTimeout is the max time to wait for a semaphore for resuming sandboxes snapshot.
acquireTimeout = 15 * time.Second

// uploadTimeout is the max time allowed for uploading snapshot files to
// remote storage.
// uploadTimeout is the max time allowed for a single upload attempt to
// remote storage. The overall retry window is uploadTotalBudget.
uploadTimeout = 20 * time.Minute
// redisPeerKeyTTL is slightly longer than uploadTimeout so the key is still
// valid for the entire upload window before being cleaned up.
redisPeerKeyTTL = uploadTimeout + 2*time.Minute
// uploadTotalBudget bounds how long a snapshot upload is retried before it
// is given up. Covers a long GCS outage without retrying forever.
uploadTotalBudget = 2 * time.Hour
// redisPeerKeyTTL keeps the peer routing key valid across the whole retry
// window so a long retry doesn't drop peer routing mid-upload. It is
// unregistered promptly once the upload finishes (success or give-up).
redisPeerKeyTTL = uploadTotalBudget + 2*time.Minute
Comment thread
cursor[bot] marked this conversation as resolved.

// uploadRetryInitialBackoff is the wait before the first retry; it grows
// exponentially up to uploadRetryMaxBackoff.
uploadRetryInitialBackoff = 5 * time.Second
// uploadRetryMaxBackoff caps the backoff between attempts.
uploadRetryMaxBackoff = 2 * time.Minute
// uploadRetryBackoffMultiplier is the exponential growth factor between
// retry attempts.
uploadRetryBackoffMultiplier = 2

// executionEventDataKey is the key used in webhook event data for sandbox execution metrics.
executionEventDataKey = "execution"
Expand Down Expand Up @@ -860,7 +874,12 @@ func (s *Server) snapshotAndCacheSandbox(
return
}

s.uploadedBuilds.Set(meta.Template.BuildID, struct{}{}, ttlcache.DefaultTTL)
// Only advertise the build as fully uploaded when it actually landed.
// On abandon/failure the bytes are not in storage, so marking it would
// make chunk-serving falsely report "already uploaded".
if uploadErr == nil {
s.uploadedBuilds.Set(meta.Template.BuildID, struct{}{}, ttlcache.DefaultTTL)
}

if err := s.peerRegistry.Unregister(ctx, meta.Template.BuildID); err != nil {
logger.L().Warn(ctx, "failed to unregister peer address from routing", zap.String("build_id", meta.Template.BuildID), zap.Error(err))
Expand All @@ -885,23 +904,40 @@ func (s *Server) snapshotAndCacheSandbox(
// background and cleans up the Redis peer key once done. Used by the Pause
// handler where no prefetch data is available.
func (s *Server) uploadSnapshotAsync(ctx context.Context, sbx *sandbox.Sandbox, res *snapshotResult) {
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), uploadTimeout)
// Detach from the request: the upload retries for up to uploadTotalBudget.
// A graceful shutdown waits for it to finish (see Server.Close via uploadsWG)
// rather than cancelling, so an in-flight snapshot isn't dropped on restart.
uploadCtx := context.WithoutCancel(ctx)

go func() {
defer cancel()
s.uploadsInFlight.Add(1)
s.uploadsWG.Go(func() {
defer s.uploadsInFlight.Add(-1)

ctx, span := tracer.Start(ctx, "upload snapshot")
spanCtx, span := tracer.Start(uploadCtx, "upload snapshot")
defer span.End()

err := res.upload.Run(ctx)
err := retry.Do(
spanCtx,
defaultUploadRetryPolicy(),
isRetryableUploadErr,
res.upload.Run,
func(attempt int, backoff time.Duration, err error) {
sbxlogger.I(sbx).Warn(spanCtx, "snapshot upload attempt failed, retrying",
zap.Int("attempt", attempt),
zap.Duration("backoff", backoff),
zap.Error(err),
)
},
)
if err != nil {
sbxlogger.I(sbx).Error(ctx, "error uploading snapshot files", zap.Error(err))
sbxlogger.I(sbx).Error(spanCtx, "snapshot upload did not durably land", zap.Error(err))
s.uploadFailedCounter.Add(spanCtx, 1)
} else {
sbxlogger.I(sbx).Info(ctx, "snapshot finished uploading successfully")
sbxlogger.I(sbx).Info(spanCtx, "snapshot finished uploading successfully")
}

res.completeUpload(ctx, err)
}()
res.completeUpload(spanCtx, err)
})
}

// setupSandboxLifecycle sets up the cleanup goroutine for a sandbox.
Expand Down
44 changes: 44 additions & 0 deletions packages/orchestrator/pkg/server/upload_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//go:build linux

package server

import (
"context"
"errors"

"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/build"
"github.com/e2b-dev/infra/packages/shared/pkg/retry"
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
)

// defaultUploadRetryPolicy is the retry policy for pause-snapshot uploads:
// retry with a fresh per-attempt timeout under the total budget, with
// exponential backoff.
func defaultUploadRetryPolicy() retry.Policy {
return retry.Policy{
TotalBudget: uploadTotalBudget,
AttemptTimeout: uploadTimeout,
InitialBackoff: uploadRetryInitialBackoff,
MaxBackoff: uploadRetryMaxBackoff,
Multiplier: uploadRetryBackoffMultiplier,
}
}

// isRetryableUploadErr classifies an upload failure. The default is RETRYABLE:
// a lost snapshot is unrecoverable and cascades to descendants, so a wasted
// retry is far cheaper than dropping a recoverable build. Only genuinely
// terminal conditions stop the loop.
func isRetryableUploadErr(err error) bool {
switch {
case errors.Is(err, build.NoDiffError{}):
return false // nothing to upload
case errors.Is(err, storage.ErrObjectNotExist):
return false // source vanished; retry cannot recover it
case errors.Is(err, context.Canceled):
return false // parent cancelled (shutdown)
default:
// Includes per-attempt context.DeadlineExceeded, GCS 401/503, rate
// limiting, and unknown errors — all worth retrying within the budget.
return true
}
}
40 changes: 40 additions & 0 deletions packages/orchestrator/pkg/server/upload_retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//go:build linux

package server

import (
"context"
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/build"
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
)

func TestIsRetryableUploadErr(t *testing.T) {
t.Parallel()

tests := []struct {
name string
err error
retryable bool
}{
{"no diff", build.NoDiffError{}, false},
{"object not exist", storage.ErrObjectNotExist, false},
{"object not exist wrapped", fmt.Errorf("load: %w", storage.ErrObjectNotExist), false},
{"parent cancelled", context.Canceled, false},
{"per-attempt deadline", context.DeadlineExceeded, true},
{"gcs 503", errors.New("server error (503)"), true},
{"unknown", errors.New("boom"), true},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
assert.Equal(t, tt.retryable, isRetryableUploadErr(tt.err))
})
}
}
Loading
Loading