CapstoneFastAPI / app /utils /streaming.py
AlekhyaC2005's picture
first commit
46fb1fc
import asyncio
import random
from functools import partial
from app.memory.qdrant_memory import add_memory
async def _store_memory(user_id, user_query, full_response):
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None, # uses default ThreadPoolExecutor
partial(add_memory, user_id, user_query, full_response)
)
except Exception as e:
print(f"[MEMORY STORE ERROR]: {e}")
async def stream_response(response, user_id, user_query):
full_response = ""
try:
async for chunk in response:
delta = chunk.choices[0].delta
if delta and delta.content:
token = delta.content
for char in token:
full_response += char
yield char
await asyncio.sleep(random.uniform(0.01, 0.02))
except Exception as e:
yield f"\n[ERROR]: {str(e)}"
finally:
if full_response:
asyncio.create_task(
_store_memory(user_id, user_query, full_response)
)