Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak in unbounded channels #51

Open
rakbladsvalsen opened this issue Dec 18, 2024 · 2 comments
Open

Memory leak in unbounded channels #51

rakbladsvalsen opened this issue Dec 18, 2024 · 2 comments

Comments

@rakbladsvalsen
Copy link

Hey guys, I've already reported this in flume (another popular channel implementation), and noticed kanal also suffers the same issue.
Basically, when an unbounded channel gets "saturated" for whatever reason (usually a slow consumer), more memory will be used as a buffer, but that mem won't be freed even after the queue has been fully consumed. I'm 100% sure any channel implementation using VecDeque has the same issue, because that's VecDeque default behavior.

However, VecDeque provides the means to know the allocated capacity of it (kanal similarly does) as well as methods to manually free up that extra allocated memory. Currently, kanal doesn't allow users to manually reclaim that memory, and this is a problem in long running applications where fluctuations are common. A proxy method to the internal VecDeque shrink_to_fit (and probably even shrink_to) is needed to at least allow users to prevent memory usage from going nuts.

PoC (look at memory usage after "Capacity: " reaches 0):

use std::{hint::black_box, thread::park, time::Duration};

use kanal;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let (tx, rx) = kanal::unbounded();
    let mut cont: u64 = 0;
    tokio::spawn(async move {
        while let Ok(_val) = rx.try_recv() {
            if cont % 1000 == 0 {
                println!("Capacity: {}", rx.len());
                sleep(Duration::from_millis(1)).await;
                // rx.shrink_to_fit();
            }
            cont += 1;
        }
        println!("cont: {cont:?}");
    });
    for _ in 0..1000000u64 {
        let waste = black_box([10u8; 100]);
        tx.try_send(waste).ok();
    }
    park();
}
@fereidani
Copy link
Owner

Hey, Thanks for your report.

Wikipedia defines memory leak as follows.

In computer science, a memory leak is a type of resource leak that occurs when a computer program incorrectly manages memory allocations[1] in a way that memory which is no longer needed is not released. A memory leak may also happen when an object is stored in memory but cannot be accessed by the running code (i.e. unreachable memory).

I consider this to be a warmup phase. The memory is accessible and can be freed, so it’s not technically a leak, It's the behavior of the unbounded channel. If the user finds this behavior unsuitable, they might prefer using a bounded channel with a predefined size.

I'm undecided about adding shrink_to_fit, it gives the user more control, but I believe this is an anti-pattern, if this problem is happening user should consider using the bounded channel instead of unbounded imho.

I need some time to think more about it.

@rakbladsvalsen
Copy link
Author

rakbladsvalsen commented Dec 22, 2024

Whether it's a warmup phase (or not) is, IMO, debatable. It's absolutely true that such memory was allocated as a result of the queue being filled up, but this could've been the result of a very rare and special event, such as a network failure, or temporary CPU starvation. Note that this not only happens with unbounded channels, but also with bounded channels (both use a VecDeque internally).

This problem comes from the fact that kanal (and most channel implementations) use a VecDeque. All rust standard library collections also exhibit this behavior (of over-allocating and never freeing that memory automatically) but they also provide the means to free that "allocated, but unused" memory. kanal does not provide the means to do so.

I fail to see the antipattern here, considering this problem happens with both flavors of channels, and leads to completely wasted memory due to allocated, but unused memory. Consider the following example in Python that behaves properly and frees up channel's allocated memory:

import asyncio

async def main():
    rx = asyncio.Queue()

    consumer_done = asyncio.Event()

    async def consumer():
        cont = 0
        while True:
            try:
                _val = rx.get_nowait()
            except asyncio.QueueEmpty:
                break
            
            if cont % 1000 == 0:
                print(f"Queue length: {rx.qsize()}")
                await asyncio.sleep(0.001) 

            cont += 1
        consumer_done.set()

    async def producer():
        for _ in range(3000000):
            waste = bytearray([10] * 100)
            try:
                rx.put_nowait(waste)
            except asyncio.QueueFull:
                pass

    await producer()
    consumer_task = asyncio.create_task(consumer())

    await consumer_done.wait()
    print("done")
    input()

if __name__ == "__main__":
    asyncio.run(main())

I would support the idea of the shrink method being an antipattern if rust were a GC'd language, but this is not the case, and users can't recover unused memory in long-running programs.

If this is how kanal is meant to behave, then it's important to point this out in the README: don't use unbounded channels, or use small bounded channels to prevent memory usage from going up if you have slow consumers because the library provides no means to recover from that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants