Source code for agent_framework_evaluator.runtime.debug_subscriber
from __future__ import annotations
from collections import defaultdict, deque
from agent_framework.tracing import TraceEvent
[docs]
class DebuggerSubscriber:
def __init__(self) -> None:
self._events: dict[str, deque[TraceEvent]] = defaultdict(deque)
[docs]
def consume(self, event: TraceEvent) -> None:
session_id = event.context.session_id or "global"
self._events[session_id].append(event)
[docs]
def drain(self, session_id: str) -> list[TraceEvent]:
queue = self._events[session_id]
items = list(queue)
queue.clear()
return items