Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-9104: Port over chunk cache improvements from DSE #1495

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

blambov
Copy link

@blambov blambov commented Jan 9, 2025

What is the issue

Using buffers of different size in the chunk cache causes fragmentation, which in turn results in excessive memory use and lack of pooling for a large fraction of the buffers used by the chunk cache.

What does this PR fix and why was it fixed

Ports over single-size chunk cache buffers (DB-2904), caching memory addresses (parts of DB-2509) and file cache ids (DB-2489) from DSE.

This does not port any of the BufferPool refactoring in DSE. As C* already has distinct buffer pools for short vs. longer-term buffers, we should already be receiving similar benefits.

The size of the per-entry on-heap overhead of the chunk cache is reduced from ~350 bytes to ~220. As part of this reduction, the patch drops the collection of lists of keys per file and replaces it with the ability to drop a file id for invalidation, making a file's entries in the cache unreachable and reclaimed with some delay using the normal cache eviction.

Before this patch the cache could use on-heap memory if this was the preference of the compressor in use (e.g. Deflate specifies an ON_HEAP preference). This is highly unexpected and put very low limits on the useable cache size. The cache is now changed to always store data off heap.

Also changes the source of some temporary buffers to the short lived "networking" pool.

Checklist before you submit for review

  • Make sure there is a PR in the CNDB project updating the Converged Cassandra version
  • Use NoSpamLogger for log lines that may appear frequently in the logs
  • Verify test results on Butler
  • Test coverage for new/modified code is > 80%
  • Proper code formatting
  • Proper title for each commit staring with the project-issue number, like CNDB-1234
  • Each commit has a meaningful description
  • Each commit is not very long and contains related changes
  • Renames, moves and reformatting are in distinct commits

Using CachingRebuffererTest.calculateMemoryOverhead with 1.5M entries. Cache size set at 4096 MiB.

Bytes on heap per entry: 320
Saves at least 40 bytes per cache entry (12.5%) and 20% of the insertion time.

Bytes on heap per entry: 280
With fileIDs this has no effect on the performance of the cache

Bytes on heap per entry: 222
This reverts commit be317b2.
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
57.8% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube Cloud

private final ConcurrentHashMap<File, Long> fileIdMap = new ConcurrentHashMap<>();
private final AtomicLong nextFileId = new AtomicLong(0);

// number of bits required to store the log2 of the chunk size (highestOneBit(highestOneBit(Integer.MAX_VALUE)))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't think you meant to repeat highestOnBit twice in that comment (and highestOneBit(highestOneBit(Integer.MAX_VALUE)) is not 5).

private final static int CHUNK_SIZE_LOG2_BITS = 5;

// number of bits required to store the ready type
private final static int READER_TYPE_BITS = Integer.highestOneBit(ChunkReader.ReaderType.COUNT - 1);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly confused by the logic of this. I works here because ChunkReader.ReaderType.COUNT is 2, but say it was 5, then highestOnBit(4) is 4, but in theory you only need 3 bits to 5 values. Did you meant to put the -1 after the call to highestOneBit?

* are occupied by log 2 of chunk size (we assume the chunk size is the power of 2), and the rest of the bits
* are occupied by fileId counter which is incremented with for each unseen file name.
*/
protected long fileIdFor(File file, ChunkReader.ReaderType type, int chunkSize)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: there is tad of overloading of "fileId": the fileId returned here is not quire the same thing as what assignFileId return technically. Not a big deal, but having slightly separate naming could save some future error (some new code using the result of fileIdMap directly).

if (buffers.length > 1)
return new MultiRegionChunk(position, buffers);
else
return new SingleRegionChunk(position, buffers[0]);
}

public ChunkCache(BufferPool pool, int cacheSizeInMB, Function<ChunkCache, ChunkCacheMetrics> createMetrics)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I personally find it a tad inconvenient to have the ctor so deep within the class, would prefer moving this at the beginning (not new to those changes admittedly, but there is a lot of code before this now.

* @param chunkSize the amount of data to read
* @param pages an array of page-sized memory allocations whose total capacity needs to be >= chunkSize
*/
int readScattering(ChunkReader file, long position, int chunkSize, long[] pages)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this isn't part of MultiRegionChunk (even maybe just inlined into MultiRegionChunk.read)? I would have expected to see it there.

@@ -124,6 +125,9 @@ private static FileChannel openChannel(File file)
try { channel.close(); }
catch (Throwable t2) { t.addSuppressed(t2); }
}

// Invalidate any cache entries that may exist for a previous file with the same name.
ChunkCache.instance.invalidateFile(file);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most importantly, ChunkCache.instance can be null, which would break here.

Also, I don't have a super good alternative in mind at the moment, but it does feel rather fragile to me that we have to call this everywhere were we write files that may be used by the chunk cache. If some code, especially in cndb, decided to write files by another mean in some special case, this would be really easy to forget/get wrong (side note: I'm not saying it's the case right now, I don't think it is, but I'm also really not 100% sure).

Admittedly a half baked idea, but couldn't we, say, shove the file modification file into the cache fileId so we don't need to do this?

@@ -130,6 +131,7 @@ public class BufferPool
private static final Logger logger = LoggerFactory.getLogger(BufferPool.class);
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES);
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
private static boolean DISABLE_COMBINED_ALLOCATION = Boolean.getBoolean("cassandra.bufferpool.disable_combined_allocation");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can be final?

}

/**
* A chunk with a single memory region. This can be used for reading chunks of up to PageAware.PAGE_SIZE but note
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "This can be used for reading chunks of up to PageAware.PAGE_SIZE" sentence here is a bit confusing, given we use this for reading larger chunks when we're able to get contiguous regions.

if (chunkSize < PageAware.PAGE_SIZE)
return new SingleRegionChunk(position, bufferPool.get(PageAware.PAGE_SIZE, BufferType.OFF_HEAP).limit(chunkSize).slice());

ByteBuffer[] buffers = bufferPool.getMultiple(chunkSize, PageAware.PAGE_SIZE, BufferType.OFF_HEAP);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code relies on BufferPool.getMultiple behaving in a ways that are not well seemingly guarantees by the javadoc of that method. That is, the javadoc of getMultiple is:

    /**
     * Allocate the given amount of memory, where the caller can accept the space to be split into multiple buffers.
     *
     * @param totalSize the total size to be allocated
     * @param chunkSize the minimum size of each buffer returned
     *
     * @return an array of allocated buffers
     */

but that only say that chunkSize is a minimum buffer size and that thing may or may not be split in multiple buffers. But here, the code relies on getMultiple behaving in only one of 2 ways:

  1. it returns a single buffer with the full allocation.
  2. it return buffers that are all exactly of size chunkSize.

I'll note in particular that the 2nd point is relied on by MultiRegionChunk.getBuffer, which relies on each buffer to be exactly PageAware.PAGE_SIZE.

Now, I'm not 100% if getMultiple always behave as we rely on here, because I don't quite get the difference between BufferPool.get and BufferPool.getAtLeast, but at a minimum, I think we should clarify the stronger specification of getMultiple that we actually rely on.


while (remainingSize >= chunkSize)
{
buffers[idx] = pool.getAtLeast(chunkSize);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own curiosity, what is the rational for when to use getAtLeast versus get? It seems that getAtLeast still return a buffer whose capacity has been set to chunkSize (or does it not?) and if so, why do we use get instead of getAtLeast in the last call below?

@cassci-bot
Copy link

❌ Build ds-cassandra-pr-gate/PR-1495 rejected by Butler


341 new test failure(s) in 5 builds
See build details here


Found 341 new test failures

Showing only first 15 new test failures

Test Explanation Branch history Upstream history
...buted.test.IncrementalRepairCoordinatorFastTest regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
....test.PreviewRepairCoordinatorNeighbourDownTest regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...ibuted.test.PreviewRepairCoordinatorTimeoutTest regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...rg.apache.cassandra.distributed.test.RepairTest regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...distributed.test.UnableToParseClientMessageTest regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...ributed.test.sai.datamodels.QueryTimeToLiveTest regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...ted.test.sai.datamodels.QueryWriteLifecycleTest regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
r.TestReadRepairGuarantees.test_atomic_writes[n... regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
r.TestAllowFiltering.test_update_on_collection regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...ControllerConfigTest.testVectorControllerConfig regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...roupByTest.groupByWithDeletesAndSrpOnPartitions regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...sRepairStreamingTest.testWithCompressionEnabled regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...ToolEnableDisableBinaryTest.testMaybeChangeDocs regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
o.a.c.d.t.OptimiseStreamsRepairTest.testBasic regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...: provision strategy=MultipleNetworkInterfaces] regression 🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵

Found 36 known test failures

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants