Skip to content

response

CompletionResponse

Bases: BaseModel

A class wrapping the response from the LLM model.

For a streaming response, it tracks the chunks of the response and builds the complete response when the streaming is finished.

chunks class-attribute instance-attribute

chunks: List[Union[ModelResponse, ChatCompletionChunk]] = (
    Field(
        [],
        description="The chunks of the response when streaming",
    )
)

The chunks of the response when streaming.

complete_response property

complete_response: Union[ModelResponse, ChatCompletion]

The complete response from the model. This will block until the response is finished.

cost class-attribute instance-attribute

cost: Optional[float] = Field(
    None, description="The cost of the completion"
)

The cost of the completion.

finish_reason class-attribute instance-attribute

finish_reason: Optional[str] = Field(
    None,
    description="The reason why the completion is finished for the top-choice",
)

The reason why the completion is finished for the top-choice.

is_finished class-attribute instance-attribute

is_finished: bool = Field(
    False,
    description="Whether the response stream is finished",
)

Whether the response stream is finished.

is_stream class-attribute instance-attribute

is_stream: bool = Field(
    False, description="Whether the response is a stream"
)

Whether the response is a stream.

message class-attribute instance-attribute

message: Optional[str] = Field(
    None,
    description="The top-choice message from the completion",
)

The top-choice message from the completion.

num_raw_completions class-attribute instance-attribute

num_raw_completions: int = Field(
    1, description="The number of raw completions"
)

The number of raw completions.

post_finish_callbacks class-attribute instance-attribute

post_finish_callbacks: List[Callable] = Field(
    [], description="The post finish callbacks"
)

The post finish callbacks.

raw_response class-attribute instance-attribute

raw_response: Any = Field(
    None, description="The raw response from the model"
)

The raw response from the model.

response_model class-attribute instance-attribute

response_model: Any = Field(
    None,
    description="The BaseModel's subclass specifying the response format.",
)

The BaseModel's subclass specifying the response format.

response_obj class-attribute instance-attribute

response_obj: Any = Field(
    None,
    description="The response object of response model, could be a stream",
)

The response object of response model, could be a stream.

results property

results: Any

The results of the response.

Returns:

  • message ( str ) –

    The message if the response is a text completion.

  • tool_calls ( List[ToolCall] ) –

    The tool calls if the response is a list of tool calls.

  • response_obj ( Any ) –

    The object if the response is a response object.

tool_calls class-attribute instance-attribute

tool_calls: List[ToolCall] = Field(
    [], description="The tool calls"
)

The tool calls.

type property

The type of the response.

usage class-attribute instance-attribute

usage: Optional[CompletionUsage] = Field(
    None, description="The usage of the completion"
)

The usage of the completion.

format_stream

format_stream()

Format the stream response as a text generator.

Source code in src/appl/core/response.py
def format_stream(self):
    """Format the stream response as a text generator."""
    suffix = ""
    for chunk in iter(self):
        # chunk: Union[ModelResponse, ChatCompletionChunk]
        delta: Union[Delta, ChoiceDelta] = chunk.choices[0].delta  # type: ignore

        if delta is not None:
            if delta.content is not None:
                yield delta.content
            elif getattr(delta, "tool_calls", None):
                f: Union[Function, ChoiceDeltaToolCallFunction] = delta.tool_calls[
                    0
                ].function  # type: ignore
                if f.name is not None:
                    if suffix:
                        yield f"{suffix}, "
                    yield f"{f.name}("
                    suffix = ")"
                if f.arguments is not None:
                    yield f.arguments
    yield suffix

register_post_finish_callback

register_post_finish_callback(
    callback: Callable,
    order: Literal["first", "last"] = "last",
) -> None

Register a post finish callback.

The callback will be called after the response is finished.

Source code in src/appl/core/response.py
def register_post_finish_callback(
    self,
    callback: Callable,
    order: Literal["first", "last"] = "last",
) -> None:
    """Register a post finish callback.

    The callback will be called after the response is finished.
    """
    if self.is_finished:
        callback(self)
    else:
        if order not in ["first", "last"]:
            raise ValueError(
                f"Unknown order argument: {order}, only 'first' and 'last' are supported"
            )
        if order == "last":
            self.post_finish_callbacks.append(callback)
        else:
            self.post_finish_callbacks.insert(0, callback)

set_response_obj

set_response_obj(response_obj: Any) -> None

Set the response object.

Source code in src/appl/core/response.py
def set_response_obj(self, response_obj: Any) -> None:
    """Set the response object."""
    self.response_obj = response_obj

streaming

streaming(
    display: Optional[str] = None,
    title: str = "APPL Streaming",
    display_prefix_content: str = "",
    live: Optional[Live] = None,
) -> CompletionResponse

Stream the response object and finish the response.

Source code in src/appl/core/response.py
def streaming(
    self,
    display: Optional[str] = None,
    title: str = "APPL Streaming",
    display_prefix_content: str = "",
    live: Optional[Live] = None,
) -> "CompletionResponse":
    """Stream the response object and finish the response."""
    if not self.is_stream:
        raise ValueError("Cannot iterate over non-streaming response")
    if self.is_finished:
        return self

    if self.response_obj is not None:
        target = self.response_obj
    else:
        target = self.format_stream()

    streaming_display_mode = (
        display or global_vars.configs.settings.logging.display.streaming_mode
    )
    if streaming_display_mode == "live":
        start_time = time.time()

        def panel(
            content: str, iter_index: Optional[int] = None, truncate: bool = False
        ) -> Panel:
            style = "magenta"
            display_title = title
            if iter_index is not None:
                time_elapsed = time.time() - start_time
                avg_iters_per_sec = (iter_index + 1) / time_elapsed
                stream_info = (
                    f"[{time_elapsed:.3f}s ({avg_iters_per_sec:.2f} it/s)]"
                )
                display_title += f" - {stream_info}"
            return make_panel(
                content, title=display_title, style=style, truncate=truncate
            )

        if live is None:
            live = get_live()
            need_stop = True
        else:
            need_stop = False
        content = display_prefix_content
        for i, chunk in enumerate(iter(target)):
            if isinstance(chunk, BaseModel):
                content = json.dumps(chunk.model_dump(), indent=2)
            else:
                content += str(chunk)
            live.update(panel(content, i, truncate=True))
            # live.refresh()  # might be too frequent
        # display untruncated content at the end
        live.update(panel(content, i))
        live.refresh()
        if need_stop:
            stop_live()
    elif streaming_display_mode == "print":
        last_content = ""

        def eprint(content: str, color: Optional[Color] = None) -> None:
            print(colored(content, color) if color else content, end="")
            sys.stdout.flush()

        eprint("\n===== START APPL STREAMING =====\n", color="magenta")
        self.register_post_finish_callback(
            lambda _: eprint("\n===== END APPL STREAMING =====\n", color="magenta"),
            order="first",
        )
        eprint(display_prefix_content, color="grey")
        for chunk in iter(target):
            if isinstance(chunk, BaseModel):
                content = json.dumps(chunk.model_dump(), indent=2)
                if last_content in content:
                    eprint(
                        content[content.index(last_content) :], color="dark_grey"
                    )
                else:
                    eprint(content, color="dark_grey")
                last_content = content
            else:
                eprint(str(chunk), color="dark_grey")

    elif streaming_display_mode == "none":
        for chunk in iter(target):
            pass
    else:
        raise ValueError(
            f"Unknown display mode for streaming: {streaming_display_mode}, only 'live', 'print' and 'none' are supported"
        )
    if self.response_obj is not None:
        self.set_response_obj(chunk)
    return self

update

update(
    other: CompletionResponse, split_marker: str = "\n"
) -> CompletionResponse

Update the response with the information contained in the other response.

Source code in src/appl/core/response.py
def update(
    self, other: "CompletionResponse", split_marker: str = "\n"
) -> "CompletionResponse":
    """Update the response with the information contained in the other response."""
    if not self.is_finished:
        raise ValueError("Cannot update unfinished response")
    if self.type != other.type:
        raise ValueError(
            f"Cannot update response with type {self.type} "
            f"with another response of type {other.type}"
        )
    if self.type != ResponseType.TEXT:
        raise NotImplementedError("Not supported for non-text response")
    if self.message is None or other.message is None:
        raise ValueError("Not supported for empty message when updating")

    stripped_message = strip_for_continue(self.message)
    _, last_part = split_last(stripped_message, split_marker)
    message = other.message
    if last_part in message:
        # truncate the overlapping part, patch the messages together
        self.message = (
            stripped_message + message[message.index(last_part) + len(last_part) :]
        )
    else:
        self.message += message  # extend the message
        logger.warning(
            f"Last part {last_part} not found in the message. "
            "Appending the message directly."
        )

    def as_list(obj: Any) -> List[Any]:
        if isinstance(obj, list):
            return obj
        return [obj]

    for k in ["finish_reason", "response_model", "response_obj", "tool_calls"]:
        if getattr(self, k) is None:
            setattr(self, k, getattr(other, k))
    self.raw_response = as_list(self.raw_response) + as_list(other.raw_response)
    self.chunks += other.chunks
    self.num_raw_completions += other.num_raw_completions
    if other.cost is not None:
        self.cost = (self.cost or 0) + other.cost
    if other.usage is not None:

        def merge_usage(usage1: BaseModel, usage2: BaseModel) -> None:
            """Merge the usage from two responses recursively."""
            for k, v in usage2.model_dump().items():
                if isinstance(v, int) or isinstance(v, float):
                    if hasattr(usage1, k):
                        setattr(usage1, k, getattr(usage1, k) + v)
                elif isinstance(v, BaseModel):
                    merge_usage(getattr(usage1, k), v)

        merge_usage(self.usage, other.usage)  # type: ignore

    return self