SubmitQueue is a distributed system for managing code submission workflows. It follows clean architecture with interface-driven extensibility.
Immutability and Eventual Consistency:
- Immutable entities — once created, don't modify in place. Create new versions with updated fields.
- Eventual consistency — handle stale reads, idempotent operations, and convergence over time.
- Event sourcing — store events (what happened) rather than just current state for critical changes.
- Optimistic locking — use version numbers instead of pessimistic locks. Avoid transactions; prefer optimistic concurrency and retries. Version arithmetic lives in the controller, not the storage layer. Update methods take both
oldVersion(the where-clause guard) andnewVersion(the value to write); the store performs a pure conditional write. Controllers computenewVersion = oldVersion + 1, call the store, and only assignentity.Version = newVersionafter the call succeeds. Pre-incrementing in memory before the call is a bug pattern — on error the in-memory version drifts ahead of the database. See submitqueue/extension/storage/README.md. - Idempotency keys — include unique request IDs, check for duplicates before executing.
// Immutable entity pattern
type Request struct {
ID string
Version int // For optimistic locking
Status Status
CreatedAt int64
UpdatedAt int64
}
// Controller pattern — version arithmetic outside storage, assigned only on success
newVersion := request.Version + 1
if err := store.UpdateStatus(ctx, request.ID, request.Version, newVersion, newStatus); err != nil {
return err
}
request.Version = newVersionsubmitqueue/ # repo root (Go module github.com/uber/submitqueue)
├── api/ # Published wire contracts (cross-domain/external)
│ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/ # RPC (proto)
│ ├── stovepipe/{gateway,orchestrator}/{proto,protopb}/
│ └── runway/messagequeue/ # external queue contracts (proto + protojson)
├── platform/ # SHARED cross-domain packages — no domain deps
│ ├── errs/, metrics/, consumer/, http/
│ ├── base/ # SHARED entities (change/, messagequeue/, …)
│ └── extension/ # SHARED extension contracts + backends (counter/, messagequeue/, …)
├── submitqueue/ # SubmitQueue domain
│ ├── gateway/ # Gateway service (port 8081) - entry point
│ ├── orchestrator/ # Orchestrator service (port 8082) - coordinates jobs
│ ├── entity/ # SubmitQueue-specific domain entities
│ ├── extension/ # SubmitQueue-specific extension impls (storage, counter, mergechecker, …)
│ └── core/ # SubmitQueue-internal shared infra (consumer wiring, request, topickey, …)
├── stovepipe/ # Stovepipe domain
│ ├── gateway/ # Gateway service: commit deployment verification entry point
│ ├── orchestrator/ # Orchestrator service: commit verification pipeline
│ ├── entity/ # Stovepipe-specific domain entities
│ ├── extension/ # Stovepipe-specific extension impls
│ └── core/ # Stovepipe-internal shared infra (placeholder; mirrors submitqueue/core)
├── tool/ # Development and CI tooling
├── example/
│ ├── submitqueue/ # Runnable SubmitQueue servers/clients + Docker Compose
│ └── stovepipe/ # Runnable Stovepipe servers/clients
├── test/
│ ├── e2e/submitqueue/ # End-to-end tests (full stack)
│ ├── integration/ # Integration tests (platform/, submitqueue/, stovepipe/, …)
│ └── testutil/ # Test utilities (ComposeStack, MySQL helpers)
└── doc/ # Documentation
The platform/ tree holds code reused across domains (infrastructure, shared entities, shared extension contracts). Each domain (submitqueue/, stovepipe/, …) keeps the same internal layout (gateway/, orchestrator/, entity/, extension/, core/); a domain's own core/ (e.g. submitqueue/core/) holds infra shared only between that domain's services.
The api/ tree holds published wire contracts — those depended on from outside the owning domain. RPC contracts live at api/{domain}/{service}/ (proto/ for .proto sources, protopb/ for committed generated Go); a service package may hold multiple .proto files, all generating into the same protopb/. External message-queue contracts live at api/{domain}/messagequeue/ (see Message Queue Contracts below). Internal queue contracts do not go here — they live under {domain}/core/messagequeue/.
- Import path
github.com/uber/submitqueue/platform/httpuses Go package namehttpand aliases the standard library asnethttpinside the package. Source files that also importnet/httpshould import the platform package with a distinct alias (for examplephttp "github.com/uber/submitqueue/platform/http") and callphttp.NewClient,phttp.BaseURLTransport, etc. platform/baseis the shared entity root; subpackages (change,messagequeue, …) hold concrete types. The rootbasepackage is documentation-only.
Each service follows the same layout:
<service>/
└── controller/ # Business logic (pure, transport-agnostic)
├── {method}.go # RPC controllers (e.g., land.go, ping.go)
├── {method}_test.go
└── {step}/ # Queue message controllers (e.g., request/)
├── {step}.go
└── {step}_test.go
Wire contracts for a service live separately under api/{domain}/{service}/ (see Project Layout): proto/ holds .proto sources and protopb/ holds the committed generated stubs.
Two types, both containing pure business logic independent of infrastructure:
RPC Controllers — in {service}/controller/, accept protobuf types:
func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error)Queue Message Controllers — in {service}/controller/{step}/, implement consumer.Controller:
// Return nil to ack, error to nack. Consumer handles ack/nack automatically.
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) errorControllers receive consumer.Delivery (subset interface without Ack/Nack) to enforce separation of business logic from infrastructure.
Queue payloads: IDs within a boundary, full payloads across one. When producer and consumer share a store (same service — e.g. build→buildsignal, validate→mergeconflict), put only the entity ID on the queue and reload from storage (the store is the source of truth, messages stay small, redelivery is idempotent). When a queue crosses a service boundary (the consumer cannot read the producer's store — e.g. orchestrator→runway), publish the full payload the consumer needs, and have the client own the correlation ID so it can match the async result back to the work it is tracking. The queue's owner defines the wire contract and topic keys (in its own domain package); the other side imports them.
Domain objects live under each domain's entity/ tree, or under platform/base/ when shared across domains. Guidelines:
- Pure and framework-agnostic — no external dependencies
- Value types, not references
int64milliseconds for timestamps (CreatedAt int64) and durations (TimeoutMs int64)- Every field must have a comment
- Reference other entities by ID (string or int), not directly
- String enums with sentinel values (
""for unknown)
Vendor-agnostic, pluggable interfaces with implementations in subdirectories:
- Shared across domains — define interfaces at
platform/extension/{ext}/, implementations atplatform/extension/{ext}/{impl}/. - Domain-specific — define at
{domain}/extension/{ext}/, implementations at{domain}/extension/{ext}/{impl}/. - Factory interface for dependency injection and lifecycle management (constructed in wiring, not inside
platform/extensionpackages).
Extensions hold contracts and implementations only — not factories or routing.
A {domain}/extension/{ext} or platform/extension/{ext} package contains the behavioral interface, its Config, the Factory interface, and impl constructors New(...) that return the interface. It must not contain Factory implementations (NewFactory() constructors or factory structs) or any queue-selection logic.
Why: an impl package (e.g. scorer/heuristic) can't know the queue topology or the other impls, so a "which impl for which queue" decision doesn't belong there. Per-queue routing — and the small adapters that wrap a New(...) impl in the Factory interface — live in the wiring layer (e.g. example/{domain}/{service}/server/main.go), the one place that knows the full queue set. That's where you route on Config.QueueName.
Rule of thumb: if you're about to add a NewFactory() or a map[queue]impl under {domain}/extension/ or platform/extension/, it belongs in the wiring layer instead.
Design interfaces for the technology space, not the implementation in front of you. The interface is a contract every backend will have to satisfy — SQL, key-value (DynamoDB, Bigtable), document, message queue, search, RPC, in-memory, mocks. If the contract assumes a capability that some plausible backend can't provide cheaply, you've baked the current impl's strengths into the API.
Common over-constraints to avoid:
- Batch atomicity (multi-row inserts as one transaction) — many KV stores can't do this. Prefer single-record primitives + caller loops + idempotency-on-retry.
- Multi-key queries (
WHERE x IN (...)) — fine in SQL, awkward elsewhere. Prefer per-key reads. - Server-side filters (joins, sub-queries, complex predicates) — push filtering and aggregation to the caller; keep the store responsible only for "get/put by key" semantics.
- Transactions across entities — virtually no distributed store offers this. Use eventual consistency + idempotency.
- Strict ordering / exactly-once in messaging — most queues are at-least-once with best-effort ordering. Make consumers idempotent.
- Synchronous, low-latency calls for things that may run remotely — design for retry/backoff and timeouts, not assumed-fast.
The cost of "callers loop over a small batch" is usually negligible. The cost of forcing every future backend to fake a capability the API demanded is permanent.
When in doubt, ask: "If the next implementation were DynamoDB / Kafka / Bigtable / a remote RPC service / an in-memory map, could it satisfy this signature without contortion?" If the answer is no, simplify the contract.
Input contract — identity in, resolve internally. A decision/action extension takes the orchestrator's thin reference entity at its pipeline-stage granularity — entity.Request (request stage) or entity.Batch / []entity.Batch (batch stage) — never controller-pre-resolved data. It resolves the granular content it needs (changes, diffs, targets) through dependencies injected at its Factory (e.g. a request store, a change provider), not a global aggregator. Stores (storage, changestore) and config (queueconfig) are the exception — they are the resolution targets and stay key/value-shaped per the rule above. conflict.Analyzer is the reference shape; every new extension or signature change must follow it. See doc/rfc/submitqueue/extension-contract.md.
Paths follow the directory layout: shared packages live under platform/ at the repo root; domain code nests under submitqueue/, stovepipe/, and other domain folders.
- RPC Controllers:
github.com/uber/submitqueue/{domain}/{service}/controller(e.g..../submitqueue/gateway/controller) - Queue Controllers:
github.com/uber/submitqueue/{domain}/{service}/controller/{step} - Proto (generated):
github.com/uber/submitqueue/api/{domain}/{service}/protopb - Queue contracts: external
github.com/uber/submitqueue/api/{domain}/messagequeue; internalgithub.com/uber/submitqueue/{domain}/core/messagequeue - Domain entities:
github.com/uber/submitqueue/{domain}/entity(e.g..../submitqueue/entity) - Domain extensions:
github.com/uber/submitqueue/{domain}/extension/{ext}[/{impl}](e.g..../submitqueue/extension/storage/mysql) - Cross-domain consumer framework:
github.com/uber/submitqueue/platform/consumer; internal pipeline topic keys:github.com/uber/submitqueue/{domain}/core/topickey(external queue topic keys live with their contract package, e.g.api/runway/messagequeue) - Domain-internal infra:
github.com/uber/submitqueue/{domain}/core/{pkg}(e.g..../submitqueue/core/request) - Shared entities:
github.com/uber/submitqueue/platform/base/{pkg}(e.g..../platform/base/messagequeue) - Shared extensions:
github.com/uber/submitqueue/platform/extension/{ext}[/{impl}](e.g..../platform/extension/messagequeue/mysql) - Cross-domain infra:
github.com/uber/submitqueue/platform/{pkg}(e.g..../platform/errs,.../platform/metrics,.../platform/http)
Bazel with Bzlmod (NOT WORKSPACE).
- Dependencies:
MODULE.bazel+go.mod(both must be updated) - Bazel wrapper:
./tool/bazel(Bazelisk). With direnv (.envrc), usebazeldirectly. - BUILD files: Every Go package needs
BUILD.bazel. Runmake gazelleafter adding/removing Go files. - CI enforces BUILD files are in sync — always run
make gazellebefore committing.
Generated proto files are committed. When modifying .proto files:
- Edit in
api/{domain}/{service}/proto/(e.g.api/submitqueue/gateway/proto/) make proto(generates*.pb.go,*_grpc.pb.go,*.pb.yarpc.gointoapi/{domain}/{service}/protopb/)- Commit all generated files
To add a new .proto to a service, drop it in the service's api/{domain}/{service}/proto/ dir, add it to that package's srcs in api/{domain}/{service}/proto/BUILD.bazel and its exports_files, then make proto && make gazelle. The codegen and make proto copy loop already handle multiple .proto files per package.
Queue payloads are defined in proto3 (.proto under proto/, generated Go in protopb/ as the binding) and serialized as protobuf JSON (protojson) so the queue keeps storing self-describing JSON. Location follows audience: external/cross-domain contracts go under api/{domain}/messagequeue/; internal contracts (used only within the owning domain) go under {domain}/core/messagequeue/. Bazel visibility enforces the split — internal targets are domain-scoped, api/ targets are public. See doc/rfc/messagequeue-contract.md.
The message types are generated; the contract package adds only generic protojson glue — Marshal(m) / Unmarshal[T](b, m) — owning the wire conventions: UseProtoNames (snake_case fields), UPPER_SNAKE enum values, int64-as-string, unknown fields discarded on read (additive evolution). The topic key(s) carrying a message are declared on the message via the topic_keys proto option — a google.protobuf.MessageOptions extension defined in api/base/messagequeue. A topic key is a stable logical name, not a concrete wire topic; each implementer maps it to its backend's topic name, and a TopicKeys(msg) reflection helper reads the option back. It is contract metadata, not the hot path — publish/consume still routes on consumer.TopicKey + TopicRegistry. The contract package owns both halves: the proto payload and the TopicKey constants for its topic keys. A contract test round-trips the payloads and asserts every topic key is bound to exactly one message. Shared field types (Change, Strategy) are shared protos under api/base/{change,mergestrategy}. api/runway/messagequeue/ is the reference example.
- Directories: singular (
mock/,entity/, notmocks/,entities/) - Files:
{method}.go,{entity}.go,{file}_test.go,BUILD.bazel - Proto files:
{service}.proto - Test compose contexts: the
testContextpassed toNewComposeStack(and thus thesq-test-{context}-…Docker project/container names) must be domain-qualified —{category}-{domain}-{name}where{category}issvc/ext/core/e2eand{domain}issubmitqueue/stovepipe/… (omit the domain only for shared/cross-domain suites, e.g.ext-messagequeue-sql). This keeps containers unambiguous and lets suites run in parallel. See doc/howto/TESTING.md. - README files: Do not duplicate interface or type definitions as code blocks in READMEs. Describe behavior in prose and let readers navigate to the source. Only include code samples when explicitly instructed.
- Markdown prose width: Do not hard-wrap prose in Markdown docs (RFCs under
doc/, READMEs). Write one line per paragraph and one line per list item, and let the editor soft-wrap — hard wrapping at a fixed column renders as a narrow fixed-width column regardless of window size. Code blocks, tables, and ASCII diagrams keep their own line breaks.
Targets are alphabetically sorted. Each target has ## Description for auto-generated help and shell completion:
integration-test: build-all-linux ## Run all integration tests (auto-builds binaries)
@$(BAZEL) test //test/integration/... --test_output=streamedmake build # Build all services
make test # Run unit tests
make lint # Run all linters (fmt + YAML)
make fmt # Format Go and YAML code
make check-mocks # Check mock files are up to date
make check-tidy # Check go.mod and MODULE.bazel are tidy
make check-gazelle # Check BUILD.bazel files are up to date
make tidy # Run go mod tidy + bazel mod tidy
make gazelle # Update BUILD.bazel files
make mocks # Generate mock files using mockgen
make integration-test # Run all integration tests (Docker-based)
make e2e-test # Run end-to-end tests
make proto # Regenerate proto files
make local-submitqueue-start # Start full stack with Docker Compose
make local-submitqueue-ps # Show running containers and ports
make local-submitqueue-logs # View logs from all services
make local-stop # Stop all services
make clean # Clean Bazel cacheAdd new RPC method:
- Edit
api/{domain}/{service}/proto/*.proto→make proto - Add controller in
{domain}/{service}/controller/ - Wire up in
example/{domain}/{service}/server/main.go
Add new queue message controller:
- Create
{domain}/{service}/controller/{step}/implementingconsumer.Controller - Wire up in
example/{domain}/{service}/server/main.go
Add new extension:
- Create the extension under
{domain}/extension/{ext}/{impl}/(domain-specific, e.g.submitqueue/extension/...) orplatform/extension/{ext}/{impl}/(shared across domains) with factory and interfaces - Add
BUILD.bazel, tests, and README.md
Add new entity:
- Create
{domain}/entity/{entity}.go(domain-specific) or add packages underplatform/base/(shared) with test file andBUILD.bazel
Add gomock for an extension interface:
Mocks are checked-in files generated by mockgen. Run make mocks to regenerate, then make gazelle to update BUILD files. See submitqueue/extension/storage/mock/ for the canonical example.
To add a mock for a new interface file in an existing mock package (e.g., submitqueue/extension/storage/new_store.go):
- Add a
//go:generatedirective to the interface file://go:generate mockgen -source=new_store.go -destination=mock/new_store_mock.go -package=mock - Run
make mocksto generate the mock file. - Run
make gazelleto updateBUILD.bazelfiles. - Commit the generated mock file.
To create a mock package for a new extension (e.g., submitqueue/extension/newext/mock/):
- Add
//go:generatedirectives to each interface file (same pattern as above). - Create the
mock/directory:mkdir submitqueue/extension/newext/mock/. - Run
make mocksto generate mock files into the new directory. - Run
make gazelleto create theBUILD.bazelfile automatically.
For inline mocks (mock in the same package, e.g., platform/extension/messagequeue/mysql/mock_stores.go):
- Add a
//go:generatedirective with-package=mypkgand-destination=mock_file.go. - Run
make mocksandmake gazelle.
Using mocks in tests:
import storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock"
ctrl := gomock.NewController(t)
mockStore := storagemock.NewMockRequestStore(ctrl)
mockStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil)Test BUILD.bazel deps:
deps = [
"//submitqueue/extension/storage/mock",
"@org_uber_go_mock//gomock",
]- Table-driven tests — prefer table-driven tests with
t.Runsubtests over individual test functions. - Avoid asserting on error messages — assert on error type or check the error with
require.Error, do notassert.Contains(t, err.Error(), message) - No change detector tests — don't assert on default values, internal structure, or implementation details that can change without affecting behavior. Test what the code does, not how it's constructed.
- No
time.Sleepfor synchronization — use channels, callbacks, condition variables. - Use testify —
assert/requireinstead oft.Fatal().
Integration tests use Docker Compose via testutil.ComposeStack:
- Package naming: folder name as package (NOT
*_testsuffix) - Bazel: add
tags = ["integration"]anddata = [...]for compose/schema files - Use
testutil.NewComposeStack()with meaningful context (e.g.,"ext-storage-mysql")
See doc/howto/TESTING.md for full testing guide.
CI runs on every PR and enforces all checks via a required-checks gate. Before committing, validate locally:
make fmt— format Go and YAML code (CI will reject unformatted code)make lint— run all linters (formatting check)make check-tidy— ensurego.modandMODULE.bazelare tidymake check-gazelle— ensureBUILD.bazelfiles are up to date
- Structured logging —
zap.SugaredLoggerwithDebugw/Infow/Errorw(msg, key, val, ...). Never unstructured methods. - Interfaces for behavior, structs for data — use interfaces for behavioral contracts (Consumer, Controller, Storage). Use structs for data containers, configs, and registries (TopicRegistry, SubscriptionConfig).
- Value types over pointers — prefer value types for structs, configs, and return values. Use
(T, bool)to signal absence instead of*T. Pointers only when mutation or shared ownership is needed. - Errors for failures, not control flow — reserve
errorreturns for unexpected or infrastructure failures. Use result types (structs, bools) for expected outcomes like(Result, error)or(T, bool). Avoid sentinel errors that represent non-failure states.
Errors are classified by origin (user vs infra) and retryability. The framework lives in platform/errs/. See platform/errs/README.md for full details.
Key rules:
- Non-retryable by default — a plain
fmt.Errorf(...)is non-retryable. Wrap witherrs.NewRetryableError(...)to opt in to retry. - Infra by default — any error not wrapped with
NewUserErroris infra. There is noNewInfraError. - Extensions return plain errors — extension interfaces (
MergeChecker,Storage,Publisher) return standarderrorvalues with their own domain sentinels (e.g.storage.ErrNotFound). They do NOT classify errors as user or infra. - Controllers classify errors — the service controller that calls an extension decides whether the failure is user-caused or infrastructure-caused. The same extension error may be classified differently depending on context.
- Error chain works end-to-end — extensions wrap custom errors, controllers wrap with
errs.New*Error, anderrors.Is/errors.Aswalks the full chain. - Default classifiers — primary pipeline consumers compose one or more classifiers (each owning a focused concern such as transport-level signals or a specific driver/library's errors) into
errs.NewClassifierProcessor(...). Pick classifiers that match the failure surfaces the consumer actually touches; add a new classifier package when a backend introduces error shapes that no existing one understands, rather than teaching an unrelated classifier about them. DLQ reconciliation consumers useerrs.AlwaysRetryableProcessorinstead so any failure is redelivered rather than dropped.