Skip to content

Commit 29823ca

Browse files
committed
DPL: combine resource offers to reduce processing latency
1 parent 109c166 commit 29823ca

3 files changed

Lines changed: 100 additions & 27 deletions

File tree

Framework/Core/src/ArrowSupport.cxx

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -120,31 +120,37 @@ struct ResourceStats {
120120
struct ResourceSpec {
121121
char const* name;
122122
char const* unit;
123-
char const* api; /// The callback to give resources to a device
124123
int64_t maxAvailable; /// Maximum available quantity for a resource
125124
int64_t maxQuantum; /// Largest offer which can be given
126125
int64_t minQuantum; /// Smallest offer which can be given
127126
int64_t metricOfferScaleFactor; /// The scale factor between the metric accounting and offers accounting
128127
};
128+
struct PendingResourceOffer {
129+
int64_t amount = 0;
130+
int64_t candidate = -1;
131+
};
129132

130-
auto offerResources(ResourceState& resourceState,
131-
ResourceSpec const& resourceSpec,
132-
ResourceStats& resourceStats,
133-
std::vector<DeviceSpec> const& specs,
134-
std::vector<DeviceInfo> const& infos,
135-
DevicesManager& manager,
136-
int64_t offerConsumedCurrentValue,
137-
int64_t offerExpiredCurrentValue,
138-
int64_t acquiredResourceCurrentValue,
139-
int64_t disposedResourceCurrentValue,
140-
size_t timestamp,
141-
DeviceMetricsInfo& driverMetrics,
142-
std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& availableResourceMetric,
143-
std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& unusedOfferedResourceMetric,
144-
std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& offeredResourceMetric,
145-
void* signpostId) -> void
133+
/// Compute how much of a resource can be offered and to which device,
134+
/// update accounting, but do NOT send the message yet.
135+
/// Returns the candidate device index and offer amount.
136+
auto computeResourceOffer(ResourceState& resourceState,
137+
ResourceSpec const& resourceSpec,
138+
ResourceStats& resourceStats,
139+
std::vector<DeviceSpec> const& specs,
140+
std::vector<DeviceInfo> const& infos,
141+
int64_t offerConsumedCurrentValue,
142+
int64_t offerExpiredCurrentValue,
143+
int64_t acquiredResourceCurrentValue,
144+
int64_t disposedResourceCurrentValue,
145+
size_t timestamp,
146+
DeviceMetricsInfo& driverMetrics,
147+
std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& availableResourceMetric,
148+
std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& unusedOfferedResourceMetric,
149+
std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& offeredResourceMetric,
150+
void* signpostId) -> PendingResourceOffer
146151
{
147152
O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, signpostId);
153+
PendingResourceOffer result;
148154
/// We loop over the devices, starting from where we stopped last time
149155
/// offering the minimum offer to each one
150156
int64_t lastCandidate = -1;
@@ -196,10 +202,11 @@ auto offerResources(ResourceState& resourceState,
196202
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
197203
"Offering %llu %{public}s out of %llu to %{public}s",
198204
possibleOffer, resourceSpec.unit, resourceState.available, specs[candidate].id.c_str());
199-
manager.queueMessage(specs[candidate].id.c_str(), fmt::format(fmt::runtime(resourceSpec.api), possibleOffer).data());
200205
resourceState.available -= possibleOffer;
201206
resourceState.offered += possibleOffer;
202207
lastCandidate = candidate;
208+
result.amount = possibleOffer;
209+
result.candidate = candidate;
203210
}
204211
// We had at least a valid candidate, so
205212
// next time we offer to the next device.
@@ -236,6 +243,7 @@ auto offerResources(ResourceState& resourceState,
236243
unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource, timestamp);
237244

238245
offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
246+
return result;
239247
};
240248

241249
auto processTimeslices = [](size_t index, DeviceMetricsInfo& deviceMetrics, bool& changed,
@@ -472,7 +480,6 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
472480
static const ResourceSpec shmResourceSpec{
473481
.name = "shared memory",
474482
.unit = "MB",
475-
.api = "/shm-offer {}",
476483
.maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory,
477484
.maxQuantum = 100,
478485
.minQuantum = 50,
@@ -481,7 +488,6 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
481488
static const ResourceSpec timesliceResourceSpec{
482489
.name = "timeslice",
483490
.unit = "timeslices",
484-
.api = "/timeslice-offer {}",
485491
.maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
486492
.maxQuantum = 1,
487493
.minQuantum = 1,
@@ -502,17 +508,33 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
502508
.lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
503509
};
504510

505-
offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
506-
specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
511+
auto timesliceOffer = computeResourceOffer(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
512+
specs, infos, totalTimeframesConsumed, totalTimeslicesExpired,
507513
totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
508514
availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
509515
(void*)&sm);
510516

511-
offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
512-
specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
517+
auto shmOffer = computeResourceOffer(shmResourceState, shmResourceSpec, shmResourceStats,
518+
specs, infos, shmOfferBytesConsumed, totalBytesExpired,
513519
totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
514520
availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
515-
(void*)&sm); },
521+
(void*)&sm);
522+
523+
// Send a single combined offer so the reader gets both resources atomically
524+
if (timesliceOffer.candidate >= 0 && shmOffer.candidate >= 0) {
525+
O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, (void*)&sm);
526+
O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
527+
"Sending combined offer: %lld MB shm + %lld timeslices to %{public}s",
528+
shmOffer.amount, timesliceOffer.amount, specs[timesliceOffer.candidate].id.c_str());
529+
manager.queueMessage(specs[timesliceOffer.candidate].id.c_str(),
530+
fmt::format("/combined-offer {} {}", shmOffer.amount, timesliceOffer.amount).data());
531+
} else if (timesliceOffer.candidate >= 0) {
532+
manager.queueMessage(specs[timesliceOffer.candidate].id.c_str(),
533+
fmt::format("/timeslice-offer {}", timesliceOffer.amount).data());
534+
} else if (shmOffer.candidate >= 0) {
535+
manager.queueMessage(specs[shmOffer.candidate].id.c_str(),
536+
fmt::format("/shm-offer {}", shmOffer.amount).data());
537+
} },
516538
.postDispatching = [](ProcessingContext& ctx, void* service) {
517539
using DataHeader = o2::header::DataHeader;
518540
auto* arrow = reinterpret_cast<ArrowContext*>(service);

Framework/Core/src/ComputingQuotaEvaluator.cxx

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,9 +240,18 @@ void ComputingQuotaEvaluator::dispose(int taskId)
240240
if (offer.valid == false) {
241241
continue;
242242
}
243-
if (offer.sharedMemory <= 0) {
243+
// Decrement timeslices after each use and track if a decrement happened.
244+
// - SHM-only offers (ts=0 from start): no decrement, kept alive while shm > 0
245+
// - Timeslice-only offers (shm=0, ts>0): ts decremented, invalidated when shm <= 0
246+
// - Combined offers (shm>0, ts>0): ts decremented, invalidated when ts reaches 0
247+
bool timesliceConsumed = false;
248+
if (offer.timeslices > 0) {
249+
offer.timeslices--;
250+
timesliceConsumed = true;
251+
}
252+
if (offer.sharedMemory <= 0 || (timesliceConsumed && offer.timeslices <= 0)) {
244253
O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(oi * 8));
245-
O2_SIGNPOST_END(quota, oid, "offers", "Offer %d back to not needed.", oi);
254+
O2_SIGNPOST_END(quota, oid, "offers", "Offer %d back to not needed (shm=%lli, ts=%lli).", oi, offer.sharedMemory, offer.timeslices);
246255
offer.valid = false;
247256
offer.score = OfferScore::Unneeded;
248257
}

Framework/Core/src/WSDriverClient.cxx

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,48 @@ void on_connect(uv_connect_t* connection, int status)
120120

121121
state.pendingOffers.push_back(offer);
122122
});
123+
client->observe("/combined-offer", [ref = context->ref](std::string_view cmd) {
124+
O2_SIGNPOST_ID_GENERATE(wid, ws_client);
125+
O2_SIGNPOST_START(ws_client, wid, "combined-offer", "Received combined offer.");
126+
auto& state = ref.get<DeviceState>();
127+
static constexpr int prefixSize = std::string_view{"/combined-offer "}.size();
128+
if (prefixSize > cmd.size()) {
129+
O2_SIGNPOST_END_WITH_ERROR(ws_client, wid, "combined-offer", "Malformed combined offer");
130+
return;
131+
}
132+
cmd.remove_prefix(prefixSize);
133+
// Parse "<shm_mb> <timeslices>"
134+
size_t shmMB;
135+
auto shmError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), shmMB);
136+
if (shmError.ec != std::errc() || shmError.ptr >= cmd.data() + cmd.size()) {
137+
O2_SIGNPOST_END_WITH_ERROR(ws_client, wid, "combined-offer", "Malformed combined offer (shm)");
138+
return;
139+
}
140+
// Skip space
141+
auto remaining = std::string_view(shmError.ptr, cmd.data() + cmd.size() - shmError.ptr);
142+
if (remaining.empty() || remaining[0] != ' ') {
143+
O2_SIGNPOST_END_WITH_ERROR(ws_client, wid, "combined-offer", "Malformed combined offer (separator)");
144+
return;
145+
}
146+
remaining.remove_prefix(1);
147+
int64_t timeslices;
148+
auto tsError = std::from_chars(remaining.data(), remaining.data() + remaining.size(), timeslices);
149+
if (tsError.ec != std::errc()) {
150+
O2_SIGNPOST_END_WITH_ERROR(ws_client, wid, "combined-offer", "Malformed combined offer (timeslices)");
151+
return;
152+
}
153+
ComputingQuotaOffer offer{
154+
.cpu = 0,
155+
.memory = 0,
156+
.sharedMemory = (int64_t)(shmMB * 1000000),
157+
.timeslices = timeslices,
158+
.runtime = 10000,
159+
.user = -1,
160+
.valid = true};
161+
state.pendingOffers.push_back(offer);
162+
O2_SIGNPOST_END(ws_client, wid, "combined-offer", "Received combined offer: %zu MB shm + %lli timeslices. Total pending %zu.",
163+
shmMB, timeslices, state.pendingOffers.size());
164+
});
123165
client->observe("/timeslice-offer", [ref = context->ref](std::string_view cmd) {
124166
O2_SIGNPOST_ID_GENERATE(wid, ws_client);
125167
O2_SIGNPOST_START(ws_client, wid, "timeslice-offer", "Received timeslice offer.");

0 commit comments

Comments
 (0)