Skip to content

engine

TraceEngine

TraceEngine(
    filename: str, mode: str = "write", strict: bool = True
)

Bases: TraceEngineBase

The engine used to record the trace of a program execution.

Parameters:

  • filename (str) –

    The filename storing the trace.

  • mode (str, default: 'write' ) –

    The mode of the trace, "write" or "read". Defaults to "write".

  • strict (bool, default: True ) –

    Whether to match strictly when used as a cache. Defaults to True.

    • True: matching according to the generation id, prompts, and parameters. And cache stops to work whenever a match failed.
    • False: only matching prompts and parameters.
Source code in src/appl/tracing/engine.py
def __init__(self, filename: str, mode: str = "write", strict: bool = True) -> None:
    """Initialize the TraceEngine.

    Args:
        filename: The filename storing the trace.
        mode: The mode of the trace, "write" or "read". Defaults to "write".
        strict:
            Whether to match strictly when used as a cache. Defaults to True.

            - True: matching according to the generation id, prompts, and
                parameters. And cache stops to work whenever a match failed.
            - False: only matching prompts and parameters.
    """
    self._mode = mode
    self._strict = strict
    self._events: List[TraceEventBase] = []  # events read from the file
    self._trace_nodes: Dict[str, TraceNode] = {}
    self._gen_cache: Dict[str, List[Any]] = {}
    self._lock = Lock()

    if mode == "write":
        if os.path.exists(filename):
            logger.warning(f"Trace file {filename} already exists, overwriting")
        os.makedirs(os.path.dirname(filename), exist_ok=True)
        self._file = open(filename, "wb+")
    elif mode == "read":
        if not os.path.exists(filename):
            raise FileNotFoundError(f"Trace file {filename} not found")
        self._file = open(filename, "rb+")
        self._read()
    else:
        raise ValueError(f"Invalid mode {mode}, only 'write' or 'read' allowed.")

events property

The list of events in the trace.

min_timestamp cached property

min_timestamp: float

The minimum time stamp of the events in the trace.

trace_nodes property

trace_nodes: Dict[str, TraceNode]

The dictionary of trace nodes.

append

append(event: TraceEventBase) -> None

Append an event to the trace.

Source code in src/appl/tracing/engine.py
def append(self, event: TraceEventBase) -> None:
    """Append an event to the trace."""
    # print(
    #     event.name,
    #     global_vars.current_func.get(),
    #     getattr(event, "parent_func", None),
    # )

    if hasattr(event, "args"):
        event.args = self.args_to_json(event.args)

    self._events.append(event)
    name, time_stamp = event.name, event.time_stamp
    if self._mode == "write":
        if isinstance(event, (FunctionCallEvent, GenerationInitEvent)):
            event.parent_func = self._last_func
        elif isinstance(event, CompletionRequestEvent):
            match = re.match(r"(.+)_raw_\d+", event.name)
            if match:
                event.parent_func = match.group(1)
            else:
                event.parent_func = self._last_func
                logger.warning(
                    f"Unusual completion request name: {event.name}. "
                    "Using last function as parent event"
                )

        with self._lock:
            logger.trace(f"add to trace {event}")
            pickle.dump(event, self._file)
            self._file.flush()

    assert name is not None

    def _merge_metadata(
        data: Optional[Dict], other: Optional[Dict]
    ) -> Optional[Dict]:
        if data is None:
            return other
        if other is None:
            return data
        return {**data, **other}

    if isinstance(event, FunctionCallEvent):
        newnode = self._add_node(name, event.parent_func, type="func")
        newnode.start_time = time_stamp
        newnode.args = event.args
        newnode.metadata = event.metadata
    elif isinstance(event, FunctionReturnEvent):
        node = self._get_node(name)
        if node:
            node.ret = event.ret
            node.end_time = time_stamp
            node.metadata = _merge_metadata(node.metadata, event.metadata)
    elif isinstance(event, GenerationInitEvent):
        newnode = self._add_node(name, event.parent_func, type="gen")
        newnode.start_time = time_stamp
        newnode.metadata = event.metadata
    elif isinstance(event, GenerationResponseEvent):
        node = self._get_node(name)
        if node:
            node.end_time = time_stamp
            node.args = event.args
            node.ret = event.ret
            node.metadata = _merge_metadata(node.metadata, event.metadata)
    elif isinstance(event, CompletionRequestEvent):
        newnode = self._add_node(name, event.parent_func, type="raw_llm")
        newnode.start_time = time_stamp
        newnode.metadata = event.metadata
    elif isinstance(event, CompletionResponseEvent):
        node = self._get_node(name)
        if node:
            node.end_time = time_stamp
            node.args = event.args
            node.ret = event.ret
            node.info["cost"] = event.cost
            node.metadata = _merge_metadata(node.metadata, event.metadata)

        # cached for raw completion response
        key = self._cache_key(name, event.args)
        # logger.debug(f"add to cache with key: {key}")
        if key not in self._gen_cache:
            self._gen_cache[key] = []
        self._gen_cache[key].append(event.ret)

args_to_json classmethod

args_to_json(args: Dict) -> Dict

Serialize the values of the arguments to JSON format.

Source code in src/appl/tracing/engine.py
@classmethod
def args_to_json(cls, args: Dict) -> Dict:
    """Serialize the values of the arguments to JSON format."""
    args_json = {}
    for k, v in args.items():
        if isinstance(v, type) and issubclass(v, BaseModel):
            v = cls.convert_pydantic_class_to_schema(v)
        # TODO: shall we serialize everything?
        # elif k != "message":
        #     try:
        #         v = json.dumps(v)
        #     except:
        #         v = str(v)
        args_json[k] = v
    return args_json

convert_pydantic_class_to_schema classmethod

convert_pydantic_class_to_schema(class_: Type) -> Dict

Convert a class to a schema.

Parameters:

  • class_ (Type) –

    The class to convert

Source code in src/appl/tracing/engine.py
@classmethod
def convert_pydantic_class_to_schema(cls, class_: Type) -> Dict:
    """Convert a class to a schema.

    Args:
        class_: The class to convert
    """
    if issubclass(class_, BaseModel):
        return class_.model_json_schema()
    raise ValueError(f"Cannot convert class {class_} to schema")

find_cache

find_cache(name: str, args: Dict) -> Any

Find a cached response for a generation request.

Parameters:

  • name (str) –

    The name of the generation request.

  • args (Dict) –

    The arguments of the generation request.

Source code in src/appl/tracing/engine.py
def find_cache(self, name: str, args: Dict) -> Any:
    """Find a cached response for a generation request.

    Args:
        name: The name of the generation request.
        args: The arguments of the generation request.
    """
    args = self.args_to_json(args)
    with self._lock:
        key = self._cache_key(name, args)
        # logger.debug(f"try to find cache with key: {key}")
        entry_list = self._gen_cache.get(key, None)
        if not entry_list or len(entry_list) == 0:
            return None
        entry = entry_list.pop(0)
        return entry