Loading...
Loading...
Concurrent operations with asyncio and Tokio, focusing on race condition prevention, resource safety, and performance
npx skill4agent add martinholovsky/claude-skills-generator async-programming| Situation | Approach |
|---|---|
| Shared mutable state | Use asyncio.Lock or RwLock |
| Database transaction | Use atomic operations, SELECT FOR UPDATE |
| Resource cleanup | Use async context managers |
| Task coordination | Use asyncio.Event, Queue, or Semaphore |
| Background tasks | Track tasks, handle cancellation |
import pytest
import asyncio
@pytest.mark.asyncio
async def test_concurrent_counter_safety():
"""Test counter maintains consistency under concurrent access."""
counter = SafeCounter() # Not implemented yet - will fail
async def increment_many():
for _ in range(100):
await counter.increment()
# Run 10 concurrent incrementers
await asyncio.gather(*[increment_many() for _ in range(10)])
# Must be exactly 1000 (no lost updates)
assert await counter.get() == 1000
@pytest.mark.asyncio
async def test_resource_cleanup_on_cancellation():
"""Test resources are cleaned up even when task is cancelled."""
cleanup_called = False
async def task_with_resource():
nonlocal cleanup_called
async with managed_resource() as resource: # Not implemented yet
await asyncio.sleep(10) # Long operation
cleanup_called = True
task = asyncio.create_task(task_with_resource())
await asyncio.sleep(0.1)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert cleanup_called # Cleanup must happenimport asyncio
from contextlib import asynccontextmanager
class SafeCounter:
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._value
@asynccontextmanager
async def managed_resource():
resource = await acquire_resource()
try:
yield resource
finally:
await release_resource(resource) # Always runs# Run async tests
pytest tests/ -v --asyncio-mode=auto
# Check for blocking calls
python -m asyncio debug
# Run with concurrency stress test
pytest tests/ -v -n auto --asyncio-mode=auto# BAD - Sequential execution
async def fetch_all_sequential(urls: list[str]) -> list[str]:
results = []
for url in urls:
result = await fetch(url) # Waits for each
results.append(result)
return results # Total time: sum of all fetches
# GOOD - Concurrent execution
async def fetch_all_concurrent(urls: list[str]) -> list[str]:
return await asyncio.gather(*[fetch(url) for url in urls])
# Total time: max of all fetches# BAD - Unbounded concurrency (may overwhelm server)
async def fetch_many(urls: list[str]):
return await asyncio.gather(*[fetch(url) for url in urls])
# GOOD - Bounded concurrency with semaphore
async def fetch_many_limited(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(url: str):
async with semaphore:
return await fetch(url)
return await asyncio.gather(*[fetch_with_limit(url) for url in urls])# BAD - Manual task tracking
async def process_items_manual(items):
tasks = []
for item in items:
task = asyncio.create_task(process(item))
tasks.append(task)
return await asyncio.gather(*tasks)
# GOOD - Task groups with automatic cleanup
async def process_items_taskgroup(items):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process(item)) for item in items]
return [task.result() for task in tasks]
# Automatic cancellation on any failure# BAD - Creating new event loop each time
def run_async_bad():
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(main())
finally:
loop.close()
# GOOD - Reuse running loop or use asyncio.run
def run_async_good():
return asyncio.run(main()) # Handles loop lifecycle
# GOOD - For library code, get existing loop
async def library_function():
loop = asyncio.get_running_loop()
future = loop.create_future()
# Use the existing loop# BAD - Blocks event loop
async def process_file_bad(path: str):
with open(path) as f: # Blocking I/O
data = f.read()
result = hashlib.sha256(data).hexdigest() # CPU-bound blocks loop
return result
# GOOD - Non-blocking with aiofiles and executor
import aiofiles
async def process_file_good(path: str):
async with aiofiles.open(path, 'rb') as f:
data = await f.read()
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, lambda: hashlib.sha256(data).hexdigest()
)
return result| Component | Version | Notes |
|---|---|---|
| Python | 3.11+ | asyncio improvements, TaskGroup |
| Rust | 1.75+ | Stable async |
| Tokio | 1.35+ | Async runtime |
| aioredis | Use redis-py | Better maintenance |
# Python async ecosystem
asyncio # Core async
aiohttp # HTTP client
asyncpg # PostgreSQL
aiofiles # File I/O
pytest-asyncio # Testingimport asyncio
class SafeCounter:
"""Thread-safe counter for async contexts."""
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._valuefrom sqlalchemy.ext.asyncio import AsyncSession
async def transfer_safe(db: AsyncSession, from_id: int, to_id: int, amount: int):
"""Atomic transfer using row locks."""
async with db.begin():
stmt = (
select(Account)
.where(Account.id.in_([from_id, to_id]))
.with_for_update() # Lock rows
)
accounts = {a.id: a for a in (await db.execute(stmt)).scalars()}
if accounts[from_id].balance < amount:
raise ValueError("Insufficient funds")
accounts[from_id].balance -= amount
accounts[to_id].balance += amountfrom contextlib import asynccontextmanager
@asynccontextmanager
async def get_connection():
"""Ensure connection cleanup even on cancellation."""
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)import asyncio, signal
class GracefulApp:
def __init__(self):
self.shutdown_event = asyncio.Event()
self.tasks: set[asyncio.Task] = set()
async def run(self):
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self.shutdown_event.set)
self.tasks.add(asyncio.create_task(self.worker()))
await self.shutdown_event.wait()
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)| Issue | Severity | Mitigation |
|---|---|---|
| Race Conditions | HIGH | Use locks or atomic ops |
| TOCTOU | HIGH | Atomic DB operations |
| Resource Leaks | MEDIUM | Context managers |
| CVE-2024-12254 | HIGH | Upgrade Python |
| Deadlocks | MEDIUM | Lock ordering, timeouts |
# RACE CONDITION - read/await/write pattern
class UserSession:
async def update(self, key, value):
current = self.data.get(key, 0) # Read
await validate(value) # Await = context switch
self.data[key] = current + value # Write stale value
# FIXED - validate outside lock, atomic update inside
class SafeUserSession:
async def update(self, key, value):
await validate(value)
async with self._lock:
self.data[key] = self.data.get(key, 0) + value# NEVER - race condition on cache
async def get_or_fetch(self, key):
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]
# ALWAYS - lock protection
async def get_or_fetch(self, key):
async with self._lock:
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]# NEVER - task may be garbage collected
asyncio.create_task(background_work())
# ALWAYS - track tasks
task = asyncio.create_task(background_work())
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)# NEVER - blocks all async tasks
time.sleep(5)
# ALWAYS - use async
await asyncio.sleep(5)
result = await loop.run_in_executor(None, cpu_bound_func)pytest --asyncio-mode=autoasyncio.gatherrun_in_executor