Source code for agent_framework_evaluator.runtime.debug_subscriber

from __future__ import annotations

from collections import defaultdict, deque

from agent_framework.tracing import TraceEvent


[docs] class DebuggerSubscriber: def __init__(self) -> None: self._events: dict[str, deque[TraceEvent]] = defaultdict(deque)
[docs] def consume(self, event: TraceEvent) -> None: session_id = event.context.session_id or "global" self._events[session_id].append(event)
[docs] def drain(self, session_id: str) -> list[TraceEvent]: queue = self._events[session_id] items = list(queue) queue.clear() return items