Skip to content

Commit c23e491

Browse files
committed
feat(merge): make merge asynchronous via runway
Rework the merge stage from a synchronous in-process pusher call into a runway round-trip, mirroring the merge-conflict check. The merge controller now builds a full runway MergeRequest from the batch's member requests (one MergeStep per request, in Contains order) and publishes it to the runway-owned merge queue, keyed by the batch id as the correlation id. A new mergesignal controller consumes the MergeResult off merge-signal, transitions the batch to Succeeded/Failed, and fans out to conclude and speculate; a mergesignal DLQ reconciler fails the batch on an unprocessable result. The in-process pusher extension is retired from the orchestrator wiring (left in-tree but unused, like mergechecker); removal is a follow-up. workflow.md and extension-contract.md updated to reflect both the check and the merge crossing into runway over the shared MergeRequest/ MergeResult contract.
1 parent 9c029ed commit c23e491

13 files changed

Lines changed: 1000 additions & 525 deletions

File tree

doc/rfc/submitqueue/extension-contract.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ Two facts this grounds: `conflict` already resolves nothing (the baseline), and
3737
| `scorer.Scorer` | score | flat `Change`, per request | `entity.Batch` — resolve + reduce internally | one batch score (`float64`) — unchanged | request store + change provider |
3838
| `changeprovider.ChangeProvider` | validate | `Change` | `entity.Request` | per-URI change info (`[]ChangeInfo`, `URI`-tagged) — unchanged | none — it *is* the resolver |
3939
| `buildrunner.BuildRunner` | build | base/head `[]Change` | base `[]entity.Batch` + head `entity.Batch` | build id, then status/cancel (`BuildID`, `BuildStatus`) — unchanged | request store + change provider |
40-
| `pusher.Pusher` | merge | `[]Change` | ordered `[]entity.Batch` | **per-batch** outcomes (`Result` grouped by `BatchID`) — **changed** | request store + change provider |
40+
| `pusher.Pusher` *(removed)* | merge | | **moved out-of-process to runway** (`merge` / `merge-signal`); see the note below the table || |
4141
| `storage`, `changestore`, `queueconfig` || keys + entities | unchanged — resolution targets | entities ||
4242

43-
**Outputs are unchanged except `pusher`.** This RFC moves the *input* toward identity; four of the five return contracts — conflicts, score, change info, build id/status — are exactly what they are today. `pusher` is the lone exception: because its input becomes a *list* of independently-landed batches, its result regroups per batch (`BatchID`-tagged, per-change commit detail kept underneath) so each batch's outcome stays correlatable — the "output mirrors the input unit" principle above. No other output shape changes.
43+
**Outputs are unchanged.** This RFC moves the *input* toward identity; the four live return contracts — conflicts, score, change info, build id/status — are exactly what they are today. (The `pusher` row is not an in-process extension: merge runs out-of-process in runway, so its output is not part of this catalog — see the note below.) No other output shape changes.
4444

45-
The validate-time mergeability check runs **asynchronously and out-of-process** in runway rather than as an in-process extension: `validate` hands off to the `mergeconflict` controller, which publishes a full check request to the runway-owned `merge-conflict-check` queue, and `mergeconflictsignal` consumes runway's result (see [workflow.md](workflow.md)). The in-process `mergechecker` package is unused on the validate path.
45+
The validate-time mergeability **check** and the **merge** itself both run **asynchronously and out-of-process** in runway rather than as in-process extensions, over the one shared `MergeRequest`/`MergeResult` contract — a check is a dry run of a merge. `validate` hands off directly to runway (→ `merge-conflict-check`, result back via `mergeconflictsignal`); `merge` hands the batch to runway (→ `merge`, result back via `mergesignal`) rather than calling an in-process `pusher`. See [workflow.md](workflow.md). The in-process `mergechecker` and `pusher` packages are unused on the pipeline path.
4646

4747
Non-obvious points:
4848

doc/rfc/submitqueue/workflow.md

Lines changed: 78 additions & 73 deletions
Large diffs are not rendered by default.

example/submitqueue/orchestrator/server/BUILD.bazel

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ go_library(
3535
"//submitqueue/extension/conflict/fake",
3636
"//submitqueue/extension/conflict/fileoverlap",
3737
"//submitqueue/extension/conflict/none",
38-
"//submitqueue/extension/pusher",
39-
"//submitqueue/extension/pusher/fake",
40-
"//submitqueue/extension/pusher/git",
4138
"//submitqueue/extension/scorer",
4239
"//submitqueue/extension/scorer/composite",
4340
"//submitqueue/extension/scorer/fake",
@@ -53,6 +50,7 @@ go_library(
5350
"//submitqueue/orchestrator/controller/dlq",
5451
"//submitqueue/orchestrator/controller/merge",
5552
"//submitqueue/orchestrator/controller/mergeconflictsignal",
53+
"//submitqueue/orchestrator/controller/mergesignal",
5654
"//submitqueue/orchestrator/controller/score",
5755
"//submitqueue/orchestrator/controller/speculate",
5856
"//submitqueue/orchestrator/controller/start",
@@ -67,7 +65,7 @@ go_library(
6765
)
6866

6967
go_binary(
70-
name = "orchestrator",
68+
name = "server",
7169
embed = [":orchestrator_lib"],
7270
visibility = ["//visibility:public"],
7371
)

example/submitqueue/orchestrator/server/main.go

Lines changed: 32 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ import (
5454
conflictfake "github.com/uber/submitqueue/submitqueue/extension/conflict/fake"
5555
"github.com/uber/submitqueue/submitqueue/extension/conflict/fileoverlap"
5656
"github.com/uber/submitqueue/submitqueue/extension/conflict/none"
57-
"github.com/uber/submitqueue/submitqueue/extension/pusher"
58-
pushfake "github.com/uber/submitqueue/submitqueue/extension/pusher/fake"
59-
gitpusher "github.com/uber/submitqueue/submitqueue/extension/pusher/git"
6057
"github.com/uber/submitqueue/submitqueue/extension/scorer"
6158
"github.com/uber/submitqueue/submitqueue/extension/scorer/composite"
6259
scorerfake "github.com/uber/submitqueue/submitqueue/extension/scorer/fake"
@@ -72,6 +69,7 @@ import (
7269
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq"
7370
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge"
7471
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflictsignal"
72+
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergesignal"
7573
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/score"
7674
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate"
7775
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/start"
@@ -239,13 +237,12 @@ func run() error {
239237

240238
// Per-extension factories all resolve against the registry by queue name.
241239
cpf := changeProviderFactory{queues}
242-
pshf := pusherFactory{queues}
243240
brf := buildRunnerFactory{queues}
244241
scf := scorerFactory{queues}
245242
cof := analyzerFactory{queues}
246243

247244
// Register controllers
248-
primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, cpf, pshf, brf, scf, cof, cnt, store)
245+
primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, cpf, brf, scf, cof, cnt, store)
249246
if err != nil {
250247
return err
251248
}
@@ -382,6 +379,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
382379
{topickey.TopicKeyBuild, "build", "orchestrator-build"},
383380
{topickey.TopicKeyBuildSignal, "buildsignal", "orchestrator-buildsignal"},
384381
{topickey.TopicKeyMerge, "merge", "orchestrator-merge"},
382+
{runwaymq.TopicKeyMergeSignal, "merge-signal", "orchestrator-mergesignal"},
385383
{topickey.TopicKeyConclude, "conclude", "orchestrator-conclude"},
386384
}
387385

@@ -441,6 +439,17 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
441439
Queue: q,
442440
})
443441

442+
// Publish-only: the orchestrator hands merge requests to runway via the
443+
// runway-owned merge queue. Runway is the sole consumer, so the
444+
// orchestrator registers no consuming subscription (and no DLQ) here; the
445+
// inbound result arrives on the separate merge-signal queue, which is a
446+
// consumed primary topic above.
447+
configs = append(configs, consumer.TopicConfig{
448+
Key: runwaymq.TopicKeyMerge,
449+
Name: "merge",
450+
Queue: q,
451+
})
452+
444453
return consumer.NewTopicRegistry(configs)
445454
}
446455

@@ -473,11 +482,10 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
473482
//
474483
// queueExtensions is the full set of extension implementations for a single
475484
// queue. Grouping them per queue (rather than per extension) lets the wiring
476-
// read as "for this queue, here are its scorer, analyzer, pusher, …", and lets
485+
// read as "for this queue, here are its scorer, analyzer, change provider, …", and lets
477486
// a queue profile start from a baseline and override only what differs.
478487
type queueExtensions struct {
479488
changeProvider changeprovider.ChangeProvider
480-
pusher pusher.Pusher
481489
buildRunner buildrunner.BuildRunner
482490
scorer scorer.Scorer
483491
analyzer conflict.Analyzer
@@ -508,12 +516,6 @@ func (f changeProviderFactory) For(cfg changeprovider.Config) (changeprovider.Ch
508516
return f.reg.get(cfg.QueueName).changeProvider, nil
509517
}
510518

511-
type pusherFactory struct{ reg queueRegistry }
512-
513-
func (f pusherFactory) For(cfg pusher.Config) (pusher.Pusher, error) {
514-
return f.reg.get(cfg.QueueName).pusher, nil
515-
}
516-
517519
type buildRunnerFactory struct{ reg queueRegistry }
518520

519521
func (f buildRunnerFactory) For(cfg buildrunner.Config) (buildrunner.BuildRunner, error) {
@@ -532,7 +534,7 @@ func (f analyzerFactory) For(cfg conflict.Config) (conflict.Analyzer, error) {
532534
return f.reg.get(cfg.QueueName).analyzer, nil
533535
}
534536

535-
func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, cpf changeprovider.Factory, pshf pusher.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) {
537+
func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, cpf changeprovider.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) {
536538
var count int
537539
requestController := start.NewController(
538540
logger,
@@ -663,7 +665,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger,
663665
scope,
664666
store,
665667
registry,
666-
pshf,
668+
runwaymq.TopicKeyMerge,
667669
topickey.TopicKeyMerge,
668670
"orchestrator-merge",
669671
)
@@ -672,6 +674,19 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger,
672674
}
673675
count++
674676

677+
mergesignalController := mergesignal.NewController(
678+
logger,
679+
scope,
680+
store,
681+
registry,
682+
runwaymq.TopicKeyMergeSignal,
683+
"orchestrator-mergesignal",
684+
)
685+
if err := c.Register(mergesignalController); err != nil {
686+
return count, fmt.Errorf("failed to register mergesignal controller: %w", err)
687+
}
688+
count++
689+
675690
concludeController := conclude.NewController(
676691
logger,
677692
scope,
@@ -708,6 +723,7 @@ func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scop
708723
{"build_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyBuild), "orchestrator-build-dlq")},
709724
{"buildsignal_dlq", dlq.NewDLQBuildSignalController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq")},
710725
{"merge_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyMerge), "orchestrator-merge-dlq")},
726+
{"mergesignal_dlq", dlq.NewDLQMergeSignalController(logger, dlqScope, store, dlq.TopicKey(runwaymq.TopicKeyMergeSignal), "orchestrator-mergesignal-dlq")},
711727
{"conclude_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyConclude), "orchestrator-conclude-dlq")},
712728
}
713729
var count int
@@ -768,29 +784,8 @@ func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.Ch
768784
}), nil
769785
}
770786

771-
// newPusher creates a git-backed Pusher bound to the configured checkout path,
772-
// remote, and target branch (PUSHER_CHECKOUT_PATH, PUSHER_REMOTE default
773-
// "origin", PUSHER_TARGET default "main"). When PUSHER_CHECKOUT_PATH is unset it
774-
// returns the fake pusher (commits succeed unless a change URI carries a failure
775-
// marker, see pusher/fake), keeping the example runnable without a git checkout.
776-
func newPusher(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolver) (pusher.Pusher, error) {
777-
checkout := os.Getenv("PUSHER_CHECKOUT_PATH")
778-
if checkout == "" {
779-
logger.Warn("PUSHER_CHECKOUT_PATH not set; using fake pusher (commits succeed unless URI-marked)")
780-
return pushfake.New(resolver), nil
781-
}
782-
return gitpusher.NewPusher(gitpusher.Params{
783-
CheckoutPath: checkout,
784-
Remote: getEnv("PUSHER_REMOTE", "origin"),
785-
Target: getEnv("PUSHER_TARGET", "main"),
786-
Resolver: resolver,
787-
Logger: logger.Sugar(),
788-
MetricsScope: scope.SubScope("pusher"),
789-
}), nil
790-
}
791-
792787
// newQueueRegistry builds the per-queue extension profiles for the example.
793-
// Edge integrations (merge checker, change provider, pusher) and the build
788+
// Edge integrations (change provider) and the build
794789
// runner form a shared baseline; each per-queue profile starts from that
795790
// baseline and overrides only the extensions that differ — here the scorer and
796791
// conflict analyzer. Queues without an explicit profile fall back to the
@@ -801,10 +796,6 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset.
801796
if err != nil {
802797
return queueRegistry{}, fmt.Errorf("failed to create change provider: %w", err)
803798
}
804-
psh, err := newPusher(logger, scope, resolver)
805-
if err != nil {
806-
return queueRegistry{}, fmt.Errorf("failed to create pusher: %w", err)
807-
}
808799

809800
// batchLines buckets a batch by total lines changed across all its changes —
810801
// larger batches are likelier to fail to land.
@@ -826,7 +817,6 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset.
826817
// below does.
827818
base := queueExtensions{
828819
changeProvider: cp,
829-
pusher: psh,
830820
buildRunner: buildfake.New(resolver),
831821
scorer: scorerfake.New(resolver, heuristic.New(
832822
resolver,

submitqueue/orchestrator/controller/dlq/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ go_library(
88
"dlq.go",
99
"log.go",
1010
"mergeconflictsignal.go",
11+
"mergesignal.go",
1112
"request.go",
1213
],
1314
importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq",
@@ -31,6 +32,7 @@ go_test(
3132
"dlq_test.go",
3233
"log_test.go",
3334
"mergeconflictsignal_test.go",
35+
"mergesignal_test.go",
3436
"request_test.go",
3537
],
3638
embed = [":dlq"],
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dlq
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"github.com/uber-go/tally"
22+
runwaymq "github.com/uber/submitqueue/api/runway/messagequeue"
23+
"github.com/uber/submitqueue/platform/consumer"
24+
"github.com/uber/submitqueue/platform/metrics"
25+
"github.com/uber/submitqueue/submitqueue/extension/storage"
26+
"go.uber.org/zap"
27+
)
28+
29+
// mergeSignalController is the DLQ reconciler for the mergesignal topic. Its
30+
// payload carries a runway MergeResult whose id is the batch id echoed back, so
31+
// reconciliation fails that batch directly via failBatch (which also fans out
32+
// to the member requests).
33+
type mergeSignalController struct {
34+
logger *zap.SugaredLogger
35+
metricsScope tally.Scope
36+
store storage.Storage
37+
topicKey consumer.TopicKey
38+
consumerGroup string
39+
}
40+
41+
// Verify mergeSignalController implements consumer.Controller at compile time.
42+
var _ consumer.Controller = (*mergeSignalController)(nil)
43+
44+
// NewDLQMergeSignalController builds a DLQ controller for the mergesignal topic.
45+
func NewDLQMergeSignalController(
46+
logger *zap.SugaredLogger,
47+
scope tally.Scope,
48+
store storage.Storage,
49+
topicKey consumer.TopicKey,
50+
consumerGroup string,
51+
) consumer.Controller {
52+
name := string(topicKey) + "_controller"
53+
return &mergeSignalController{
54+
logger: logger.Named(name),
55+
metricsScope: scope.SubScope(name),
56+
store: store,
57+
topicKey: topicKey,
58+
consumerGroup: consumerGroup,
59+
}
60+
}
61+
62+
// Process reconciles a single DLQ delivery for the mergesignal topic.
63+
func (c *mergeSignalController) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
64+
const opName = "process"
65+
66+
op := metrics.Begin(c.metricsScope, opName)
67+
defer func() { op.Complete(retErr) }()
68+
69+
msg := delivery.Message()
70+
71+
result, err := runwaymq.MergeResultFromBytes(msg.Payload)
72+
if err != nil {
73+
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
74+
return fmt.Errorf("failed to decode merge result from dlq payload: %w", err)
75+
}
76+
77+
dmeta := delivery.Metadata()
78+
c.logger.Warnw("dlq message received",
79+
"batch_id", result.Id,
80+
"attempt", delivery.Attempt(),
81+
"dlq_original_topic", dmeta["dlq.original_topic"],
82+
"dlq_failure_count", dmeta["dlq.failure_count"],
83+
"dlq_last_error", dmeta["dlq.last_error"],
84+
)
85+
86+
if err := failBatch(ctx, c.store, c.logger, result.Id); err != nil {
87+
metrics.NamedCounter(c.metricsScope, opName, "reconcile_errors", 1)
88+
return err
89+
}
90+
91+
metrics.NamedCounter(c.metricsScope, opName, "reconciled", 1)
92+
return nil
93+
}
94+
95+
// Name returns the controller name for logging and metrics.
96+
func (c *mergeSignalController) Name() string {
97+
return string(c.topicKey)
98+
}
99+
100+
// TopicKey returns the topic key this controller subscribes to.
101+
func (c *mergeSignalController) TopicKey() consumer.TopicKey {
102+
return c.topicKey
103+
}
104+
105+
// ConsumerGroup returns the consumer group for offset tracking.
106+
func (c *mergeSignalController) ConsumerGroup() string {
107+
return c.consumerGroup
108+
}

0 commit comments

Comments
 (0)