Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions src/object/srv_obj_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
#endif

/* Max in-flight transfer size per xstream */
/* Set the total in-flight size to be 50% of MAX DMA size for
/* Set the total in-flight size to be 1/3 of MAX DMA size for
* the moment, will adjust it later if needed.
*/
#define MIGR_TGT_INF_DATA (1 << 29)
#define MIGR_TGT_INF_DATA (300 << 20)

/* Threshold for very large transfers.
* This may exceed the MIGR_TGT_INF_DATA limit to prevent starvation.
Expand All @@ -51,10 +51,10 @@

/* Number of migration ULTs per target */
#define MIGR_TGT_ULTS_MIN 100
#define MIGR_TGT_ULTS_DEF 500
#define MIGR_TGT_ULTS_MAX 2000
#define MIGR_TGT_ULTS_DEF 300
#define MIGR_TGT_ULTS_MAX 1000

/* 1/3 object ults, 2/3 key ULTs */
/* 1/3 object ults (100), 2/3 key ULTs (200) */
#define MIGR_OBJ_ULT_PERCENT 33

#define MIGR_TGT_OBJ_ULTS(ults) ((ults * MIGR_OBJ_ULT_PERCENT) / 100)
Expand Down Expand Up @@ -715,16 +715,16 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_
*/
DL_WARN(rc, DF_RB ": retry " DF_UOID, DP_RB_MPT(tls), DP_UOID(mrone->mo_oid));
if (rc == -DER_NOMEM) {
/* sleep 10 seconds before retry, give other layers a chance to
/* sleep a few seconds before retry, give other layers a chance to
* release resources.
*/
dss_sleep(10 * 1000);
dss_sleep((10 + rand() % 20) * 1000);
if (waited != 0 && waited % 3600 == 0) {
DL_ERROR(rc, DF_RB ": waited memory for %d hour(s)",
DP_RB_MRO(mrone), waited / 3600);
}
}
waited += 10;
waited += 20;
D_GOTO(retry, rc);
}

Expand Down Expand Up @@ -1945,12 +1945,16 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y
*yielded = waited;

/* per-pool counters for rebuild status tracking */
if (res_type == MIGR_OBJ)
if (res_type == MIGR_OBJ) {
tls->mpt_tgt_obj_ult_cnt++;
else if (res_type == MIGR_KEY)
} else if (res_type == MIGR_KEY) {
tls->mpt_tgt_dkey_ult_cnt++;
else
} else {
tls->mpt_inflight_size += units;
/* remaining resource may be sufficient for more waiters */
if (waited && res->res_units < res->res_limit)
ABT_cond_signal(res->res_cond);
}

D_DEBUG(DB_REBUILD,
"res=%s, hold=%lu, used=%lu, limit=%lu, waited=%d)\n" DF_RB
Expand Down Expand Up @@ -2010,6 +2014,7 @@ migrate_one_ult(void *arg)
struct migrate_one *mrone = arg;
struct migrate_pool_tls *tls;
daos_size_t data_size;
daos_size_t degraded_size = 0;
int rc = 0;

while (daos_fail_check(DAOS_REBUILD_TGT_REBUILD_HANG))
Expand All @@ -2022,21 +2027,35 @@ migrate_one_ult(void *arg)
}

data_size = daos_iods_len(mrone->mo_iods, mrone->mo_iod_num);
data_size += daos_iods_len(mrone->mo_iods_from_parity,
mrone->mo_iods_num_from_parity);
data_size += daos_iods_len(mrone->mo_iods_from_parity, mrone->mo_iods_num_from_parity);
if (daos_oclass_is_ec(&mrone->mo_oca)) {
/* NB: this is a workaround for EC object:
* The fetch buffer is taken from a pre-registered (R)DMA buffer;
* however, a degraded EC read will allocate and register an extra
* buffer to recover data.
*
* Currently, the resource manager cannot control this extra allocation,
* which can lead to increased memory consumption.
*
* While this workaround does not prevent dynamic buffer allocation and
* registration, it does provide relatively precise control over the
* resources consumed by degraded EC reads.
*/
degraded_size = data_size * MIN(16, obj_ec_data_tgt_nr(&mrone->mo_oca));
}

D_DEBUG(DB_TRACE, DF_RB ": mrone %p data size is " DF_U64 " %d/%d\n", DP_RB_MPT(tls), mrone,
data_size, mrone->mo_iod_num, mrone->mo_iods_num_from_parity);

D_ASSERT(data_size != (daos_size_t)-1);

rc = migrate_res_hold(tls, MIGR_DATA, data_size, NULL);
rc = migrate_res_hold(tls, MIGR_DATA, data_size + degraded_size, NULL);
if (rc)
D_GOTO(out, rc);

rc = migrate_dkey(tls, mrone, data_size);

migrate_res_release(tls, MIGR_DATA, data_size);
migrate_res_release(tls, MIGR_DATA, data_size + degraded_size);

D_DEBUG(DB_REBUILD,
DF_RB ": " DF_UOID " layout %u migrate dkey " DF_KEY " inflight_size " DF_U64
Expand Down
Loading