Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abatch_as_completed does not respect max_concurrency config the same way that abatch does #29425

Open
5 tasks done
keenanpepper opened this issue Jan 25, 2025 · 0 comments
Open
5 tasks done
Labels
🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature

Comments

@keenanpepper
Copy link

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.
  • The bug is not resolved by updating to the latest stable version of LangChain (or the specific integration package).

Example Code

Test code illustrating the issue:

import asyncio
import time

from langchain_core.runnables import RunnableConfig, RunnableLambda


async def test_concurrency():
    # Track currently running tasks
    running_tasks = 0
    max_running_tasks = 0
    lock = asyncio.Lock()

    async def tracked_function(x):
        nonlocal running_tasks, max_running_tasks
        async with lock:
            running_tasks += 1
            max_running_tasks = max(max_running_tasks, running_tasks)
            print(f"Starting task. Current running: {running_tasks}")

        # Simulate some work
        await asyncio.sleep(1)

        async with lock:
            running_tasks -= 1
            print(f"Finishing task. Current running: {running_tasks}")

        return f"Completed {x}"

    # Create runnable
    runnable = RunnableLambda(tracked_function)

    # Test parameters
    num_tasks = 10
    max_concurrency = 3

    # Test abatch_as_completed
    print(f"\nTesting abatch_as_completed with max_concurrency={max_concurrency}...")
    start_time = time.time()
    config = RunnableConfig(max_concurrency=max_concurrency)
    results = []

    async for idx, result in runnable.abatch_as_completed(
        range(num_tasks), config=config
    ):
        print(f"Got result {idx}: {result}")
        results.append(result)

    end_time = time.time()

    print("\nResults for abatch_as_completed:")
    print(f"Max concurrent tasks observed: {max_running_tasks}")
    print(f"Expected max concurrent tasks: {max_concurrency}")
    print(f"Total time: {end_time - start_time:.2f} seconds")
    print(f"Number of results: {len(results)}")

    # Reset counters
    running_tasks = 0
    max_running_tasks = 0

    # Test abatch
    print(f"\nTesting abatch with max_concurrency={max_concurrency}...")
    start_time = time.time()
    results = await runnable.abatch(range(num_tasks), config=config)
    end_time = time.time()

    print("\nResults for abatch:")
    print(f"Max concurrent tasks observed: {max_running_tasks}")
    print(f"Expected max concurrent tasks: {max_concurrency}")
    print(f"Total time: {end_time - start_time:.2f} seconds")
    print(f"Number of results: {len(results)}")


# Run the test
if __name__ == "__main__":
    asyncio.run(test_concurrency())

Error Message and Stack Trace (if applicable)

I expect the results for both abatch and abatch_as_completed to show that only 3 concurrent tasks are observed at any one time. Instead, for abatch_as_completed it shows all 10 tasks running at the same time. Results:

Testing abatch_as_completed with max_concurrency=3...
Starting task. Current running: 1
Starting task. Current running: 2
Starting task. Current running: 3
Starting task. Current running: 4
Starting task. Current running: 5
Starting task. Current running: 6
Starting task. Current running: 7
Starting task. Current running: 8
Starting task. Current running: 9
Starting task. Current running: 10
Finishing task. Current running: 9
Finishing task. Current running: 8
Finishing task. Current running: 7
Finishing task. Current running: 6
Finishing task. Current running: 5
Finishing task. Current running: 4
Finishing task. Current running: 3
Finishing task. Current running: 2
Finishing task. Current running: 1
Finishing task. Current running: 0
Got result 7: Completed 7
Got result 9: Completed 9
Got result 0: Completed 0
Got result 2: Completed 2
Got result 4: Completed 4
Got result 6: Completed 6
Got result 8: Completed 8
Got result 1: Completed 1
Got result 3: Completed 3
Got result 5: Completed 5

Results for abatch_as_completed:
Max concurrent tasks observed: 10
Expected max concurrent tasks: 3
Total time: 1.08 seconds
Number of results: 10

Testing abatch with max_concurrency=3...
Starting task. Current running: 1
Starting task. Current running: 2
Starting task. Current running: 3
Finishing task. Current running: 2
Finishing task. Current running: 1
Finishing task. Current running: 0
Starting task. Current running: 1
Starting task. Current running: 2
Starting task. Current running: 3
Finishing task. Current running: 2
Finishing task. Current running: 1
Finishing task. Current running: 0
Starting task. Current running: 1
Starting task. Current running: 2
Starting task. Current running: 3
Finishing task. Current running: 2
Finishing task. Current running: 1
Finishing task. Current running: 0
Starting task. Current running: 1
Finishing task. Current running: 0

Results for abatch:
Max concurrent tasks observed: 3
Expected max concurrent tasks: 3
Total time: 4.02 seconds
Number of results: 10```

### Description

* I'm trying to use abatch_as_completed on a Runnable but limit the max number of concurrently running tasks.
* I expect the "max_concurrency" config value to effectively limit this, and it appears it indeed has this behavior with abatch.
* When I try it with abatch_as_completed, it appears all of the tasks are started concurrently, ignoring the "max_concurrency" limit I specify.

### System Info

(.venv) keenanpepper@mac langchain-goodfire % python -m langchain_core.sys_info

System Information
------------------
> OS:  Darwin
> OS Version:  Darwin Kernel Version 24.2.0: Fri Dec  6 19:03:40 PST 2024; root:xnu-11215.61.5~2/RELEASE_ARM64_T6041
> Python Version:  3.12.8 (main, Dec  3 2024, 18:42:41) [Clang 16.0.0 (clang-1600.0.26.4)]

Package Information
-------------------
> langchain_core: 0.3.31
> langchain: 0.3.15
> langchain_community: 0.3.15
> langsmith: 0.3.1
> langchain_anthropic: 0.3.4
> langchain_openai: 0.3.2
> langchain_text_splitters: 0.3.5

Optional packages not installed
-------------------------------
> langserve

Other Dependencies
------------------
> aiohttp: 3.11.11
> anthropic: 0.45.0
> async-timeout: Installed. No version info available.
> dataclasses-json: 0.6.7
> defusedxml: 0.7.1
> httpx: 0.27.2
> httpx-sse: 0.4.0
> jsonpatch: 1.33
> langsmith-pyo3: Installed. No version info available.
> numpy: 2.2.2
> openai: 1.60.1
> orjson: 3.10.15
> packaging: 24.2
> pydantic: 2.10.6
> pydantic-settings: 2.7.1
> pytest: Installed. No version info available.
> PyYAML: 6.0.2
> requests: 2.32.3
> requests-toolbelt: 1.0.0
> rich: Installed. No version info available.
> SQLAlchemy: 2.0.37
> tenacity: 9.0.0
> tiktoken: 0.8.0
> typing-extensions: 4.12.2
> zstandard: 0.23.0
@dosubot dosubot bot added the 🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature label Jan 25, 2025
keenanpepper added a commit to keenanpepper/langchain that referenced this issue Jan 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature
Projects
None yet
Development

No branches or pull requests

1 participant