TaskFlow / tests /performance /test_concurrency.py
BilalCode's picture
taskflow todo app
310260a
Raw
History Blame Contribute Delete
3.74 kB
"""
Concurrency Tests
Tests concurrent access and tool invocations:
- Multiple agents create tasks concurrently
- Multiple agents update same task concurrently
- Concurrent chat requests
"""
import pytest
import asyncio
from src.tools.create_task import create_task_internal
from src.tools.update_task import update_task_internal
from tests.utils.task_helpers import create_test_task, count_tasks, get_task_by_id
from tests.utils.agent_helpers import invoke_agent_with_message
@pytest.mark.performance
@pytest.mark.asyncio
async def test_multiple_agents_create_tasks_concurrently_without_conflicts(mock_mcp_context, test_session):
"""
Test: Multiple agents create tasks concurrently without conflicts
Verifies that concurrent task creation doesn't cause database conflicts.
"""
# Define concurrent operations
async def create_task(index):
return await create_task_internal(
ctx=mock_mcp_context,
title=f"Concurrent Task {index}"
)
# Execute 10 concurrent task creations
tasks = [create_task(i) for i in range(10)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Assert all succeeded
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.get("status") == "success")
assert success_count == 10
# Verify all tasks persisted
task_count = count_tasks(test_session, mock_mcp_context.user_id)
assert task_count == 10
@pytest.mark.performance
@pytest.mark.asyncio
async def test_multiple_agents_update_same_task_concurrently(mock_mcp_context, test_session):
"""
Test: Multiple agents update same task concurrently
Verifies that concurrent updates to same task are handled safely by database.
"""
# Setup: Create a task
task = create_test_task(test_session, mock_mcp_context.user_id, title="Original")
# Define concurrent update operations
async def update_task(suffix):
return await update_task_internal(
ctx=mock_mcp_context,
task_id=task.id,
title=f"Updated {suffix}"
)
# Execute 5 concurrent updates
updates = [update_task(i) for i in range(5)]
results = await asyncio.gather(*updates, return_exceptions=True)
# Assert at least some succeeded (database handles conflicts)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.get("status") == "success")
assert success_count > 0
# Verify task still exists and has one of the updated titles
updated_task = get_task_by_id(test_session, task.id)
assert updated_task is not None
assert "Updated" in updated_task.title
@pytest.mark.performance
@pytest.mark.slow
@pytest.mark.asyncio
async def test_100_concurrent_chat_requests_complete_successfully(test_user, test_session):
"""
Test: 100 concurrent chat requests complete successfully
Verifies that system handles high concurrent load.
"""
# Define concurrent chat operations
async def send_chat_message(index):
try:
return await invoke_agent_with_message(
user_id=test_user.id,
message=f"Add task number {index}"
)
except Exception as e:
return {"error": str(e)}
# Execute 100 concurrent requests
requests = [send_chat_message(i) for i in range(100)]
results = await asyncio.gather(*requests, return_exceptions=True)
# Assert majority succeeded (allow some failures due to rate limits)
success_count = sum(
1 for r in results
if not isinstance(r, Exception) and "error" not in r
)
# At least 80% should succeed
assert success_count >= 80, f"Only {success_count}/100 requests succeeded"