Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-16930 pool: Share map bulk resources #15780

Merged
merged 2 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/include/daos_srv/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ struct ds_pool_svc;
/* age of an entry in svc_ops KVS before it may be evicted */
#define DEFAULT_SVC_OPS_ENTRY_AGE_SEC_MAX 300ULL

/* Pool map buffer cache */
struct ds_pool_map_bc {
struct pool_buf *pmc_buf;
crt_bulk_t pmc_bulk;
uint32_t pmc_ref;
};

/*
* Pool object
*
Expand All @@ -49,7 +56,8 @@ struct ds_pool {
uuid_t sp_uuid; /* pool UUID */
d_list_t sp_hdls;
ABT_rwlock sp_lock;
struct pool_map *sp_map;
struct pool_map *sp_map;
struct ds_pool_map_bc *sp_map_bc;
uint32_t sp_map_version; /* temporary */
uint32_t sp_ec_cell_sz;
uint64_t sp_reclaim;
Expand Down
9 changes: 9 additions & 0 deletions src/pool/srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

bool ec_agg_disabled;
uint32_t pw_rf = -1; /* pool wise redundancy factor */
uint32_t ps_cache_intvl = 2; /* pool space cache expiration time, in seconds */
#define PW_RF_DEFAULT (2)
#define PW_RF_MIN (0)
#define PW_RF_MAX (4)
Expand Down Expand Up @@ -77,6 +78,14 @@ init(void)
pw_rf = PW_RF_DEFAULT;
D_INFO("pool redundancy factor %d\n", pw_rf);

d_getenv_uint32_t("DAOS_POOL_SPACE_CACHE_INTVL", &ps_cache_intvl);
if (ps_cache_intvl > 20) {
D_WARN("pool space cache expiration time %u is too large, use default value\n",
ps_cache_intvl);
ps_cache_intvl = 2;
}
D_INFO("pool space cache expiration time set to %u seconds\n", ps_cache_intvl);

ds_pool_rsvc_class_register();

bio_register_ract_ops(&nvme_reaction_ops);
Expand Down
15 changes: 11 additions & 4 deletions src/pool/srv_internal.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand All @@ -17,6 +18,7 @@
#include <gurt/telemetry_common.h>

extern uint32_t pw_rf;
extern uint32_t ps_cache_intvl;

/**
* Global pool metrics
Expand Down Expand Up @@ -223,7 +225,12 @@ int ds_pool_tgt_connect(struct ds_pool *pool, struct pool_iv_conn *pic);
void ds_pool_tgt_query_map_handler(crt_rpc_t *rpc);
void ds_pool_tgt_discard_handler(crt_rpc_t *rpc);
void
ds_pool_tgt_warmup_handler(crt_rpc_t *rpc);
ds_pool_tgt_warmup_handler(crt_rpc_t *rpc);
int
ds_pool_lookup_map_bc(struct ds_pool *pool, crt_context_t ctx, struct ds_pool_map_bc **map_bc_out,
uint32_t *map_version_out);
void
ds_pool_put_map_bc(struct ds_pool_map_bc *map_bc);

/*
* srv_util.c
Expand All @@ -232,9 +239,9 @@ bool ds_pool_map_rank_up(struct pool_map *map, d_rank_t rank);
int ds_pool_plan_svc_reconfs(int svc_rf, struct pool_map *map, d_rank_list_t *replicas,
d_rank_t self, bool filter_only, d_rank_list_t **to_add_out,
d_rank_list_t **to_remove_out);
int ds_pool_transfer_map_buf(struct pool_buf *map_buf, uint32_t map_version,
crt_rpc_t *rpc, crt_bulk_t remote_bulk,
uint32_t *required_buf_size);
int
ds_pool_transfer_map_buf(struct ds_pool_map_bc *map_bc, crt_rpc_t *rpc, crt_bulk_t remote_bulk,
uint32_t *required_buf_size);
extern struct bio_reaction_ops nvme_reaction_ops;

/*
Expand Down
83 changes: 62 additions & 21 deletions src/pool/srv_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ sched_cancel_and_wait(struct pool_svc_sched *sched)
sched_wait(sched);
}

struct pool_space_cache {
struct daos_pool_space psc_space;
uint64_t psc_memfile_bytes;
uint64_t psc_timestamp;
ABT_mutex psc_lock;
};

/* Pool service */
struct pool_svc {
struct ds_rsvc ps_rsvc;
Expand All @@ -216,6 +223,7 @@ struct pool_svc {
rdb_path_t ps_ops; /* metadata ops KVS */
int ps_error; /* in DB data (see pool_svc_lookup_leader) */
struct pool_svc_events ps_events;
struct pool_space_cache ps_space_cache;
uint32_t ps_global_version;
int ps_svc_rf;
bool ps_force_notify; /* MS of PS membership */
Expand Down Expand Up @@ -1249,9 +1257,16 @@ pool_svc_alloc_cb(d_iov_t *id, struct ds_rsvc **rsvc)
goto err_pool;
}

rc = ABT_mutex_create(&svc->ps_space_cache.psc_lock);
if (rc != ABT_SUCCESS) {
D_ERROR("failed to create psc_lock: %d\n", rc);
rc = dss_abterr2der(rc);
goto err_lock;
}

rc = rdb_path_init(&svc->ps_root);
if (rc != 0)
goto err_lock;
goto err_psc_lock;
rc = rdb_path_push(&svc->ps_root, &rdb_path_root_key);
if (rc != 0)
goto err_root;
Expand Down Expand Up @@ -1320,6 +1335,8 @@ pool_svc_alloc_cb(d_iov_t *id, struct ds_rsvc **rsvc)
rdb_path_fini(&svc->ps_handles);
err_root:
rdb_path_fini(&svc->ps_root);
err_psc_lock:
ABT_mutex_free(&svc->ps_space_cache.psc_lock);
err_lock:
ABT_rwlock_free(&svc->ps_lock);
err_pool:
Expand Down Expand Up @@ -3879,9 +3896,7 @@ pool_connect_handler(crt_rpc_t *rpc, int handler_version)
{
struct pool_connect_in *in = crt_req_get(rpc);
struct pool_connect_out *out = crt_reply_get(rpc);
struct pool_svc *svc;
struct pool_buf *map_buf = NULL;
uint32_t map_version;
struct pool_svc *svc;
uint32_t connectable;
uint32_t global_ver;
uint32_t obj_layout_ver;
Expand Down Expand Up @@ -4102,12 +4117,6 @@ pool_connect_handler(crt_rpc_t *rpc, int handler_version)
goto out_map_version;
}

rc = read_map_buf(&tx, &svc->ps_root, &map_buf, &map_version);
if (rc != 0) {
D_ERROR(DF_UUID": failed to read pool map: "DF_RC"\n",
DP_UUID(svc->ps_uuid), DP_RC(rc));
D_GOTO(out_map_version, rc);
}
transfer_map = true;
if (skip_update)
D_GOTO(out_map_version, rc = 0);
Expand Down Expand Up @@ -4216,13 +4225,20 @@ pool_connect_handler(crt_rpc_t *rpc, int handler_version)
ABT_rwlock_unlock(svc->ps_lock);
rdb_tx_end(&tx);
if (rc == 0 && transfer_map) {
rc = ds_pool_transfer_map_buf(map_buf, map_version, rpc, bulk,
&out->pco_map_buf_size);
struct ds_pool_map_bc *map_bc;
uint32_t map_version;

rc = ds_pool_lookup_map_bc(svc->ps_pool, rpc->cr_ctx, &map_bc, &map_version);
if (rc == 0) {
rc = ds_pool_transfer_map_buf(map_bc, rpc, bulk, &out->pco_map_buf_size);
ds_pool_put_map_bc(map_bc);
/* Ensure the map version matches the map buffer. */
out->pco_op.po_map_version = map_version;
}
/** TODO: roll back tx if transfer fails? Perhaps rdb_tx_discard()? */
}
if (rc == 0)
rc = op_val.ov_rc;
D_FREE(map_buf);
D_FREE(hdl);
D_FREE(machine);
if (prop)
Expand Down Expand Up @@ -4480,8 +4496,25 @@ pool_space_query_bcast(crt_context_t ctx, struct pool_svc *svc, uuid_t pool_hdl,
struct pool_tgt_query_in *in;
struct pool_tgt_query_out *out;
crt_rpc_t *rpc;
struct pool_space_cache *cache = &svc->ps_space_cache;
uint64_t cur_time = 0;
bool unlock = false;
int rc;

if (ps_cache_intvl > 0) {
ABT_mutex_lock(cache->psc_lock);

cur_time = daos_gettime_coarse();
if (cur_time < cache->psc_timestamp + ps_cache_intvl) {
*ps = cache->psc_space;
if (mem_file_bytes != NULL)
*mem_file_bytes = cache->psc_memfile_bytes;
ABT_mutex_unlock(cache->psc_lock);
return 0;
}
unlock = true;
}

D_DEBUG(DB_MD, DF_UUID": bcasting\n", DP_UUID(svc->ps_uuid));

rc = bcast_create(ctx, svc, POOL_TGT_QUERY, NULL, &rpc);
Expand All @@ -4507,6 +4540,13 @@ pool_space_query_bcast(crt_context_t ctx, struct pool_svc *svc, uuid_t pool_hdl,
*ps = out->tqo_space;
if (mem_file_bytes != NULL)
*mem_file_bytes = out->tqo_mem_file_bytes;

if (ps_cache_intvl > 0 && cur_time > cache->psc_timestamp) {
cache->psc_timestamp = cur_time;
cache->psc_space = *ps;
if (mem_file_bytes != NULL)
cache->psc_memfile_bytes = *mem_file_bytes;
}
} else {
D_ERROR(DF_UUID ": failed to query from targets: " DF_RC "\n",
DP_UUID(svc->ps_uuid), DP_RC(rc));
Expand All @@ -4515,6 +4555,9 @@ pool_space_query_bcast(crt_context_t ctx, struct pool_svc *svc, uuid_t pool_hdl,
out_rpc:
crt_req_decref(rpc);
out:
if (unlock)
ABT_mutex_unlock(cache->psc_lock);

D_DEBUG(DB_MD, DF_UUID": bcasted: "DF_RC"\n", DP_UUID(svc->ps_uuid),
DP_RC(rc));
return rc;
Expand Down Expand Up @@ -4961,7 +5004,7 @@ pool_query_handler(crt_rpc_t *rpc, int handler_version)
struct pool_query_in *in = crt_req_get(rpc);
struct pool_query_out *out = crt_reply_get(rpc);
daos_prop_t *prop = NULL;
struct pool_buf *map_buf;
struct ds_pool_map_bc *map_bc;
uint32_t map_version = 0;
struct pool_svc *svc;
struct pool_metrics *metrics;
Expand Down Expand Up @@ -5126,19 +5169,17 @@ pool_query_handler(crt_rpc_t *rpc, int handler_version)
}
}

rc = read_map_buf(&tx, &svc->ps_root, &map_buf, &map_version);
if (rc != 0)
D_ERROR(DF_UUID": failed to read pool map: "DF_RC"\n",
DP_UUID(svc->ps_uuid), DP_RC(rc));

out_lock:
ABT_rwlock_unlock(svc->ps_lock);
rdb_tx_end(&tx);
if (rc != 0)
goto out_svc;

rc = ds_pool_transfer_map_buf(map_buf, map_version, rpc, bulk, &out->pqo_map_buf_size);
D_FREE(map_buf);
rc = ds_pool_lookup_map_bc(svc->ps_pool, rpc->cr_ctx, &map_bc, &map_version);
if (rc != 0)
goto out_svc;
rc = ds_pool_transfer_map_buf(map_bc, rpc, bulk, &out->pqo_map_buf_size);
ds_pool_put_map_bc(map_bc);
if (rc != 0)
goto out_svc;

Expand Down
Loading
Loading