Skip to content

Commit

Permalink
Support free-fragment recycling in shared-segment. Add fingerprint ob…
Browse files Browse the repository at this point in the history
…ject mgmt

The main change with this commit is the support for free-fragment lists
and recycling of small fragments from shared memory. This was a main
limitation of the support added in previous commits. Another driving
factor for implementing some free-list support was that previous
multi-user concurrent insert performance benchmarking was not functional
beyond a point, and we'd frequently run into shmem Out-Of-Memory (OOMs),
even with shmem sizes > 8 GiB (which worked in a prior dev/perf-test cycle).

The main design changes to manage small-fragments are follows:

Managing memory allocation / free using platform_memfrag{} fragments:

- Allocation and free of memory is dealt with in terms of "memory
  fragments", a small structure that holds the memory->{addr, size}.
  All memory requests (as is being done previously) are aligned to
  the cacheline.

  - Allocation: All clients of memory allocation have to "hand-in"
    an opaque platform_memfrag{} handle, which will be returned populated
    with the memory address, and more importantly, the size-of-the-fragment
    that was used to satisfy the memory request.

  - Free: Clients now have to safely keep a handle to this returned
    platform_memfrag{}, and hand it back to the free() method.
    free() will rely "totally" on the size specified in this input
    fragment handle supplied. And the free'd memory fragment will
    be returned to the corresponding free-list bucket.

- Upon free(), the freed-fragment is tracked in a few free-lists
  bucketed by size of the freed-fragment. For now, we support 4 buckets,
  size <= 64, <= 128, <= 256 & <= 512 bytes. (These sizes are sufficient
  for current benchmarking requirements.) A free'd fragment is hung off
  of the corresponding list, threading the free-fragments using
  the fragment's memory itself. New struct free_frag_hdr{} provides the
  threading structure. It tracks the current fragment's size and
  free_frag_next pointer. The 'size' provided to the free() call is
  is recorded as the free'd fragment's size.

- Subsequently, a new alloc() request is 1st satisfied by searching the
  free-list corresponding to the memory request. For example, a request
  from a client for 150 bytes will be rounded-up a cacheline boundary,
  i.e. 192 bytes. The free-list for bucket 256 bytes will be searched
  to find the 1st free-fragment of the right size. If no free fragment
  is found in the target list, we then allocate a new fragment.
  The returned fragment will have a size of 256 (for an original request
  of 150 bytes).

- An immediate consequence of this approach is that there is a small,
  but significant, change in the allocation, free APIs; i.e. TYPED_MALLOC(),
  TYPED_ARRAY_MALLOC() and TYPED_FLEXIBLE_STRUCT_MALLOC(), and their 'Z'
  equivalents, which return 0'ed out memory.

- All existing clients of the various TYPED*() memory allocation
  calls have been updated to declare an on-stack platform_memfrag{}
  handle, which is passed back to platform_free().

  - In some places memory is allocated to initialize sub-systems and
    then torn down during deinit(). In a few places existing structures
    are extended to track an additional 'size' field. The size of the
    memory fragment allocated during init() is recorded here, and then
    used to invoke platform_free() as part of the deinit() method.
    An example is clockcache_init() where this kind of work to record
    the 'size' of the fragment is done and passed-down to clockcache_deinit(),
    where the memory fragment is then freed with the right 'size'.
    This pattern is now to be seen in many such init()/deinit() methods
    of diff sub-systems; e.g. pcq_alloc(), pcq_free(), ...

- Cautionary Note:

  If the 'ptr' handed to platform_free() is not of type platform_memfrag{} *,
  it is treated as a generic <struct> *, and its sizeof() will be used
  as the 'size' of the fragment to free. This works in most cases. Except
  for some lapsed cases where, when allocating a structure, the allocator
  ended up selecting a "larger" fragment that just happened to be
  available in the free-list. The consequence is that we might end-up
  free'ing a larger fragment to a smaller-sized free-list. Or, even if
  we do free it to the right-sized bucket, we still end-up marking the
  free-fragment's size as smaller that what it really is. Over time, this
  may add up to a small memory leak, but hasn't been found to be crippling
  in current runs. (There is definitely no issue here with over-writing
  memory due to incorrect sizes.)

- Copious debug and platform asserts have been added in shmem alloc/free
  methods to cross-check to some extent illegal calls.

Fingerprint Object Management:

  Managing memory for fingerprint arrays was particularly problematic.
  This was the case even in a previous commit, before the introduction
  of the memfrag{} approach. Managing fingerprint memory was found to
  be especially cantankerous due to the way filter-building and compaction
  tasks are queued and asynchronously processed by some other
  thread / process.

  The requirements from the new interfaces are handled as follows:

   - Added a new fingerprint{} object, struct fp_hdr{}, which embeds
     at its head a platform_memfrag{}. And few other short fields are
     added for tracking fingerprint memory mgmt gyrations.

   - Various accessor methods are added to manage memory for fingerprint
     arrays through this object. E.g.,

     - fingerprint_init() - Will allocate required fingerprint for 'ntuples'.
     - fingerprint_deinit() - Will dismantle object and free the memory
     - fingerprint_start() - Returns start of fingerprint array's memory
     - fingerprint_nth() - Returns n'th element of fingerprint

  Packaging the handling of fingerprint array through this object and
  its interfaces helped greatly to stabilize the memory histrionics.

- When SplinterDB is closed, shared memory dismantling routine will tag
  any large-fragments that are still found "in-use". This is percolated
  all the way back to splinterdb_close(), unmount() and to
  platform_heap_destory() as a failure $rc. Test will fail if they have
  left some un-freed large fragments. (Similar approach was considered to
  book-keep all small fragments used/freed, but due to some rounding
  errors, it cannot be a reliable check at this time. So hasn't been done.)

Test changes:

Miscellaneous:
 - Elaborate and illustrative tracing added to track memory mgmt done
   for fingerprint arrays, especially when they are bounced around
   queued / re-queued tasks. (Was a very problematic debugging issue.)
 - Extended tests to exercise core memory allocation / free APIs, and
   to exercise fingerprint object mgmt, and writable_buffer interfaces:
    - platform_apis_test:
    - splinter_shmem_test.c: Adds specific test-cases to verify that
        free-list mgmt is happening correctly.
 - Enhanced various diagnostics, asserts, tracing
 - Improved memory usage stats gathering and reporting
 - Added hooks to cross-check multiple-frees of fragments, and testing
   hooks to verify if a free'd fragment is relocated to the right free-list
  - Add diagram for large-free fragment tracking.
  • Loading branch information
gapisback committed Jan 25, 2024
1 parent 23f5e4f commit 33a95f2
Show file tree
Hide file tree
Showing 64 changed files with 5,357 additions and 1,727 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,10 @@ $(BINDIR)/$(UNITDIR)/splinter_shmem_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ) \
$(LIBDIR)/libsplinterdb.so

$(BINDIR)/$(UNITDIR)/splinter_shmem_oom_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ) \
$(LIBDIR)/libsplinterdb.so

$(BINDIR)/$(UNITDIR)/splinter_ipc_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ)

Expand All @@ -495,8 +499,6 @@ $(BINDIR)/$(UNITDIR)/splinterdb_heap_id_mgmt_test: $(COMMON_TESTOBJ) \
$(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \
$(LIBDIR)/libsplinterdb.so



########################################
# Convenience mini unit-test targets
unit/util_test: $(BINDIR)/$(UNITDIR)/util_test
Expand Down
2 changes: 1 addition & 1 deletion include/splinterdb/splinterdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ splinterdb_open(const splinterdb_config *cfg, splinterdb **kvs);
// Close a splinterdb
//
// This will flush all data to disk and release all resources
void
int
splinterdb_close(splinterdb **kvs);

// Register the current thread so that it can be used with splinterdb.
Expand Down
1 change: 0 additions & 1 deletion src/PackedArray.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ void PACKEDARRAY_JOIN(__PackedArray_unpack_, PACKEDARRAY_IMPL_BITS_PER_ITEM)(con
#include "poison.h"

#define PACKEDARRAY_MALLOC(size) platform_malloc(size)
#define PACKEDARRAY_FREE(p) platform_free(p)

void PackedArray_pack(uint32* a, const uint32 offset, const uint32* in, uint32 count, size_t bitsPerItem)
{
Expand Down
18 changes: 9 additions & 9 deletions src/btree.c
Original file line number Diff line number Diff line change
Expand Up @@ -3103,6 +3103,12 @@ btree_pack_link_extent(btree_pack_req *req,
req->num_edges[height] = 0;
}

static inline bool
btree_pack_can_fit_tuple(btree_pack_req *req)
{
return req->num_tuples < req->max_tuples;
}

static inline btree_node *
btree_pack_create_next_node(btree_pack_req *req, uint64 height, key pivot)
{
Expand Down Expand Up @@ -3167,8 +3173,8 @@ btree_pack_loop(btree_pack_req *req, // IN/OUT
log_trace_key(tuple_key, "btree_pack_loop (bottom)");

if (req->hash) {
platform_assert(req->num_tuples < req->max_tuples);
req->fingerprint_arr[req->num_tuples] =
platform_assert(btree_pack_can_fit_tuple(req));
fingerprint_start(&req->fingerprint)[req->num_tuples] =
req->hash(key_data(tuple_key), key_length(tuple_key), req->seed);
}

Expand Down Expand Up @@ -3216,12 +3222,6 @@ btree_pack_post_loop(btree_pack_req *req, key last_key)
mini_release(&req->mini, last_key);
}

static bool32
btree_pack_can_fit_tuple(btree_pack_req *req, key tuple_key, message data)
{
return req->num_tuples < req->max_tuples;
}

static void
btree_pack_abort(btree_pack_req *req)
{
Expand Down Expand Up @@ -3259,7 +3259,7 @@ btree_pack(btree_pack_req *req)

while (iterator_can_next(req->itor)) {
iterator_curr(req->itor, &tuple_key, &data);
if (!btree_pack_can_fit_tuple(req, tuple_key, data)) {
if (!btree_pack_can_fit_tuple(req)) {
platform_error_log("%s(): req->num_tuples=%lu exceeded output size "
"limit, req->max_tuples=%lu\n",
__func__,
Expand Down
24 changes: 15 additions & 9 deletions src/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ typedef struct btree_pack_req {
btree_config *cfg;
iterator *itor; // the itor which is being packed
uint64 max_tuples;
hash_fn hash; // hash function used for calculating filter_hash
unsigned int seed; // seed used for calculating filter_hash
uint32 *fingerprint_arr; // IN/OUT: hashes of the keys in the tree
hash_fn hash; // hash function used for calculating filter_hash
unsigned int seed; // seed used for calculating filter_hash
fp_hdr fingerprint; // IN/OUT: hashes of the keys in the tree

// internal data
uint16 height;
Expand All @@ -168,6 +168,7 @@ typedef struct btree_pack_req {
uint64 num_tuples; // no. of tuples in the output tree
uint64 key_bytes; // total size of keys in tuples of the output tree
uint64 message_bytes; // total size of msgs in tuples of the output tree
uint64 line; // Caller's line #
} btree_pack_req;

struct btree_async_ctxt;
Expand Down Expand Up @@ -325,6 +326,10 @@ btree_iterator_init(cache *cc,
void
btree_iterator_deinit(btree_iterator *itor);

/*
* Initialize BTree Pack request structure. May allocate memory for fingerprint
* array.
*/
static inline platform_status
btree_pack_req_init(btree_pack_req *req,
cache *cc,
Expand All @@ -343,26 +348,27 @@ btree_pack_req_init(btree_pack_req *req,
req->hash = hash;
req->seed = seed;
if (hash != NULL && max_tuples > 0) {
req->fingerprint_arr =
TYPED_ARRAY_ZALLOC(hid, req->fingerprint_arr, max_tuples);

fingerprint_init(&req->fingerprint, hid, max_tuples); // Allocates memory

// When we run with shared-memory configured, we expect that it is sized
// big-enough to not get OOMs from here. Hence, only a debug_assert().
debug_assert(req->fingerprint_arr,
debug_assert(!fingerprint_is_empty(&req->fingerprint),
"Unable to allocate memory for %lu tuples",
max_tuples);
if (!req->fingerprint_arr) {
if (fingerprint_is_empty(&req->fingerprint)) {
return STATUS_NO_MEMORY;
}
}
return STATUS_OK;
}

// Free memory if any was allocated for fingerprint array.
static inline void
btree_pack_req_deinit(btree_pack_req *req, platform_heap_id hid)
{
if (req->fingerprint_arr) {
platform_free(hid, req->fingerprint_arr);
if (!fingerprint_is_empty(&req->fingerprint)) {
fingerprint_deinit(hid, &req->fingerprint);
}
}

Expand Down
41 changes: 28 additions & 13 deletions src/clockcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1818,20 +1818,25 @@ clockcache_init(clockcache *cc, // OUT
cc->heap_id = hid;

/* lookup maps addrs to entries, entry contains the entries themselves */
cc->lookup =
TYPED_ARRAY_MALLOC(cc->heap_id, cc->lookup, allocator_page_capacity);
platform_memfrag memfrag_cc_lookup;
cc->lookup = TYPED_ARRAY_MALLOC_MF(
cc->heap_id, cc->lookup, allocator_page_capacity, &memfrag_cc_lookup);
if (!cc->lookup) {
goto alloc_error;
}
cc->lookup_size = memfrag_size(&memfrag_cc_lookup);

for (i = 0; i < allocator_page_capacity; i++) {
cc->lookup[i] = CC_UNMAPPED_ENTRY;
}

cc->entry =
TYPED_ARRAY_ZALLOC(cc->heap_id, cc->entry, cc->cfg->page_capacity);
platform_memfrag memfrag_cc_entry;
cc->entry = TYPED_ARRAY_ZALLOC_MF(
cc->heap_id, cc->entry, cc->cfg->page_capacity, &memfrag_cc_entry);
if (!cc->entry) {
goto alloc_error;
}
cc->entry_size = memfrag_size(&memfrag_cc_entry);

platform_status rc = STATUS_NO_MEMORY;

Expand Down Expand Up @@ -1860,11 +1865,13 @@ clockcache_init(clockcache *cc, // OUT
cc->refcount = platform_buffer_getaddr(&cc->rc_bh);

/* Separate ref counts for pins */
cc->pincount =
TYPED_ARRAY_ZALLOC(cc->heap_id, cc->pincount, cc->cfg->page_capacity);
platform_memfrag memfrag_cc_pincount;
cc->pincount = TYPED_ARRAY_ZALLOC_MF(
cc->heap_id, cc->pincount, cc->cfg->page_capacity, &memfrag_cc_pincount);
if (!cc->pincount) {
goto alloc_error;
}
cc->pincount_size = memfrag_size(&memfrag_cc_pincount);

/* The hands and associated page */
cc->free_hand = 0;
Expand All @@ -1873,13 +1880,16 @@ clockcache_init(clockcache *cc, // OUT
cc->per_thread[thr_i].free_hand = CC_UNMAPPED_ENTRY;
cc->per_thread[thr_i].enable_sync_get = TRUE;
}
platform_memfrag memfrag_cc_batch_busy;
cc->batch_busy =
TYPED_ARRAY_ZALLOC(cc->heap_id,
cc->batch_busy,
cc->cfg->page_capacity / CC_ENTRIES_PER_BATCH);
TYPED_ARRAY_ZALLOC_MF(cc->heap_id,
cc->batch_busy,
(cc->cfg->page_capacity / CC_ENTRIES_PER_BATCH),
&memfrag_cc_batch_busy);
if (!cc->batch_busy) {
goto alloc_error;
}
cc->batch_busy_size = memfrag_size(&memfrag_cc_batch_busy);

return STATUS_OK;

Expand Down Expand Up @@ -1907,10 +1917,12 @@ clockcache_deinit(clockcache *cc) // IN/OUT
}

if (cc->lookup) {
platform_free(cc->heap_id, cc->lookup);
platform_free_mem(cc->heap_id, cc->lookup, cc->lookup_size);
cc->lookup = NULL;
}
if (cc->entry) {
platform_free(cc->heap_id, cc->entry);
platform_free_mem(cc->heap_id, cc->entry, cc->entry_size);
cc->entry = NULL;
}

debug_only platform_status rc = STATUS_TEST_FAILED;
Expand All @@ -1929,11 +1941,14 @@ clockcache_deinit(clockcache *cc) // IN/OUT
cc->refcount = NULL;
}

platform_memfrag mf = {0};
if (cc->pincount) {
platform_free_volatile(cc->heap_id, cc->pincount);
memfrag_init(&mf, (void *)cc->pincount, cc->pincount_size);
platform_free_volatile(cc->heap_id, &mf);
}
if (cc->batch_busy) {
platform_free_volatile(cc->heap_id, cc->batch_busy);
memfrag_init(&mf, (void *)cc->batch_busy, cc->batch_busy_size);
platform_free_volatile(cc->heap_id, &mf);
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/clockcache.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,17 @@ struct clockcache {

// Stats
cache_stats stats[MAX_THREADS];
size_t lookup_size;
size_t entry_size;
size_t pincount_size;
size_t batch_busy_size;
};


/*
*-----------------------------------------------------------------------------
* Function declarations
*-----------------------------------------------------------------------------
*/

void
clockcache_config_init(clockcache_config *cache_config,
io_config *io_cfg,
Expand Down
13 changes: 8 additions & 5 deletions src/memtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,19 @@ memtable_context_create(platform_heap_id hid,
process_fn process,
void *process_ctxt)
{
platform_memfrag memfrag_ctxt = {0};
memtable_context *ctxt =
TYPED_FLEXIBLE_STRUCT_ZALLOC(hid, ctxt, mt, cfg->max_memtables);
ctxt->cc = cc;
ctxt->mf_size = memfrag_size(&memfrag_ctxt);
ctxt->cc = cc;
memmove(&ctxt->cfg, cfg, sizeof(ctxt->cfg));

platform_mutex_init(
&ctxt->incorporation_mutex, platform_get_module_id(), hid);
ctxt->rwlock = TYPED_MALLOC(hid, ctxt->rwlock);
platform_memfrag memfrag_rwlock = {0};
ctxt->rwlock = TYPED_MALLOC_MF(hid, ctxt->rwlock, &memfrag_rwlock);
platform_batch_rwlock_init(ctxt->rwlock);
ctxt->rwlock_mf_size = memfrag_size(&memfrag_rwlock);

for (uint64 mt_no = 0; mt_no < cfg->max_memtables; mt_no++) {
uint64 generation = mt_no;
Expand Down Expand Up @@ -343,9 +347,8 @@ memtable_context_destroy(platform_heap_id hid, memtable_context *ctxt)
}

platform_mutex_destroy(&ctxt->incorporation_mutex);
platform_free(hid, ctxt->rwlock);

platform_free(hid, ctxt);
platform_free_mem(hid, ctxt->rwlock, ctxt->rwlock_mf_size);
platform_free_mem(hid, ctxt, ctxt->mf_size);
}

void
Expand Down
4 changes: 3 additions & 1 deletion src/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ typedef struct memtable_context {
// read lock to read and write lock to modify.
volatile uint64 generation_retired;

bool32 is_empty;
bool is_empty;
size_t mf_size; // # of bytes of memory allocated to this struct
size_t rwlock_mf_size; // # of bytes of memory allocated to rwlock

// Effectively thread local, no locking at all:
btree_scratch scratch[MAX_THREADS];
Expand Down
1 change: 1 addition & 0 deletions src/merge.c
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ merge_iterator_create(platform_heap_id hid,
== ARRAY_SIZE(merge_itor->ordered_iterators),
"size mismatch");

platform_memfrag memfrag_merge_itor;
merge_itor = TYPED_ZALLOC(PROCESS_PRIVATE_HEAP_ID, merge_itor);
if (merge_itor == NULL) {
return STATUS_NO_MEMORY;
Expand Down
14 changes: 9 additions & 5 deletions src/pcq.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

typedef struct {
uint32 num_elems;
cache_aligned_uint32 tail; // Producers enqueue to here
cache_aligned_uint32 head; // Consumer dequeues from here
cache_aligned_uint32 tail; // Producers enqueue to here
cache_aligned_uint32 head; // Consumer dequeues from here
size_t mf_size; // of memory fragment allocated for this struct
void *elems[];
} pcq;

Expand All @@ -28,9 +29,11 @@ pcq_alloc(platform_heap_id hid, size_t num_elems)
{
pcq *q;

platform_memfrag memfrag_q;
q = TYPED_FLEXIBLE_STRUCT_ZALLOC(hid, q, elems, num_elems);
if (q != NULL) {
q->num_elems = num_elems;
q->mf_size = memfrag_size(&memfrag_q);
}

return q;
Expand Down Expand Up @@ -61,11 +64,12 @@ pcq_is_full(const pcq *q)
return pcq_count(q) == q->num_elems;
}

// Deallocate a PCQ
// Deallocate a PCQ, and NULL out input handle
static inline void
pcq_free(platform_heap_id hid, pcq *q)
pcq_free(platform_heap_id hid, pcq **q)
{
platform_free(hid, q);
platform_free_mem(hid, *q, (*q)->mf_size);
*q = NULL;
}

// Enqueue an elem to a PCQ. Element must not be NULL
Expand Down
10 changes: 8 additions & 2 deletions src/platform_linux/laio.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,20 @@ io_handle_init(laio_handle *io, io_config *cfg, platform_heap_id hid)
* structures. Each request struct nests within it async_max_pages
* pages on which IO can be outstanding.
*/
platform_memfrag memfrag_io_req;
req_size =
sizeof(io_async_req) + cfg->async_max_pages * sizeof(struct iovec);
total_req_size = req_size * cfg->async_queue_size;
io->req = TYPED_MANUAL_ZALLOC(io->heap_id, io->req, total_req_size);
io->req = TYPED_MANUAL_ZALLOC(
io->heap_id, io->req, total_req_size, &memfrag_io_req);
platform_assert((io->req != NULL),
"Failed to allocate memory for array of %lu Async IO"
" request structures, for %ld outstanding IOs on pages.",
cfg->async_queue_size,
cfg->async_max_pages);

io->req_mf_size = memfrag_size(&memfrag_io_req);

// Initialize each Async IO request structure
for (int i = 0; i < cfg->async_queue_size; i++) {
req = laio_get_kth_req(io, i);
Expand Down Expand Up @@ -276,7 +280,9 @@ io_handle_deinit(laio_handle *io)
}
platform_assert(status == 0);

platform_free(io->heap_id, io->req);
platform_free_mem(io->heap_id, io->req, io->req_mf_size);
io->req = NULL;
io->req_mf_size = 0;
}

/*
Expand Down
Loading

0 comments on commit 33a95f2

Please sign in to comment.