Skip to content

engine

ASSISTANT_ROLE module-attribute

ASSISTANT_ROLE = MessageRole(ASSISTANT)

The assistant role with name not specified.

MaybeOneOrMany module-attribute

MaybeOneOrMany = Union[_T, Sequence[_T], None]

A type that can be either a single item, a sequence of items, or None.

OneOrMany module-attribute

OneOrMany = Union[_T, Sequence[_T]]

A type that can be either a single item or a sequence of items.

SYSTEM_ROLE module-attribute

SYSTEM_ROLE = MessageRole(SYSTEM)

The system role with name not specified.

StrOrImg module-attribute

StrOrImg = Union[String, Image]

A type that can be either a string or an image.

String module-attribute

String = Union[StringFuture, str]

String is a type alias for StringFuture or str.

TOOL_ROLE module-attribute

TOOL_ROLE = MessageRole(TOOL)

The tool role with name not specified.

USER_ROLE module-attribute

USER_ROLE = MessageRole(USER)

The user role with name not specified.

CallFuture

CallFuture(
    func: Callable,
    *args: Any,
    use_process: bool = False,
    lazy_eval: bool = False,
    **kwargs: Any
)

Bases: FutureValue

Represent a function call that may not be ready yet.

Parameters:

  • func (Callable) –

    The function to call.

  • *args (Any, default: () ) –

    The arguments of the function.

  • use_process (bool, default: False ) –

    Whether to use a process pool executor.

  • lazy_eval (bool, default: False ) –

    Whether to delay the start of the call until needed.

  • **kwargs (Any, default: {} ) –

    The keyword arguments of the function.

Source code in src/appl/core/types/futures.py
def __init__(
    self,
    func: Callable,
    *args: Any,
    use_process: bool = False,
    lazy_eval: bool = False,
    **kwargs: Any,
):
    """Initialize the CallFuture.

    Args:
        func: The function to call.
        *args: The arguments of the function.
        use_process: Whether to use a process pool executor.
        lazy_eval: Whether to delay the start of the call until needed.
        **kwargs: The keyword arguments of the function.
    """
    # ? maybe use a global executor from the config, or use thread-level executor if running in multi-threading.
    self._executor = (
        ProcessPoolExecutor(max_workers=1)
        if use_process
        else ThreadPoolExecutor(
            max_workers=1, thread_name_prefix=threading.current_thread().name
        )
    )
    self._submit_fn = lambda: self._executor.submit(func, *args, **kwargs)
    self._submitted = False
    self._info = func.__name__
    # self._debug = False
    # if self._debug:
    #     # arg and kwargs might contains future objects
    #     args_list = [f"{arg}" for arg in args] + [
    #         f"{k}={v!r}" for k, v in kwargs.items()
    #     ]
    #     args_str = ", ".join(args_list)
    #     self._info += f"({args_str})"
    if not lazy_eval:
        # delay the start of the call until needed
        self._submit()

future property

future

The future object of the call.

val property

val

The value of the future.

cancel

cancel() -> bool

Cancel the call.

Source code in src/appl/core/types/futures.py
def cancel(self) -> bool:
    """Cancel the call."""
    # Attempt to cancel the call
    res = self.future.cancel()
    if res:
        self._executor.shutdown()  # the executor is not needed anymore
    return res

done

done() -> bool

Check if the call has completed.

Source code in src/appl/core/types/futures.py
def done(self) -> bool:
    """Check if the call has completed."""
    # Check if the future has completed
    return self.future.done()

result

result(timeout: Optional[float] = None) -> Any

Get the result of the call.

Source code in src/appl/core/types/futures.py
def result(self, timeout: Optional[float] = None) -> Any:
    """Get the result of the call."""
    # This will block until the result is available
    res = self.future.result(timeout)
    self._executor.shutdown()  # the executor is not needed anymore
    return res

CmpStringFuture

CmpStringFuture(
    a: StringFuture,
    b: StringFuture,
    op: Callable[[str, str], bool],
)

Bases: FutureValue

Represent a comparison between a StringFuture and another value.

Source code in src/appl/core/types/futures.py
def __init__(
    self, a: "StringFuture", b: "StringFuture", op: Callable[[str, str], bool]
):
    """Initialize the CmpStringFuture."""
    self._a = a
    self._b = b
    self._op = op

val property

val

The value of the future.

ContentList

Bases: BaseModel

Represent a list of contents containing text and images.

append

append(content: StrOrImg) -> None

Append a content to the list.

If the last content is a string, it will be concatenated with the new content.

Source code in src/appl/core/types/content.py
def append(self, content: StrOrImg) -> None:
    """Append a content to the list.

    If the last content is a string, it will be concatenated with the new content.
    """
    if is_string(content) and len(self.contents) and is_string(self.contents[-1]):
        self.contents[-1] += content  # type: ignore
    else:
        self.contents.append(content)

extend

extend(contents: list[StrOrImg]) -> None

Extend the list with multiple contents.

Source code in src/appl/core/types/content.py
def extend(self, contents: list[StrOrImg]) -> None:
    """Extend the list with multiple contents."""
    for content in contents:
        self.append(content)

get_contents

get_contents() -> List[Dict[str, Any]]

Return the contents as a list of dictionaries.

Source code in src/appl/core/types/content.py
def get_contents(self) -> List[Dict[str, Any]]:
    """Return the contents as a list of dictionaries."""

    def get_dict(content):
        if isinstance(content, Image):
            image_args = {"url": content.url}
            if content.detail:
                image_args["detail"] = content.detail
            return {"type": "image_url", "image_url": image_args}
        return {"type": "text", "text": str(content)}

    return [get_dict(c) for c in self.contents]

FutureValue

Bases: ABC

Represents a value that may not be ready yet.

val property

val

The value of the future.

Image

Image(url: str, detail: Optional[str] = None)

Bases: BaseModel

Represent an image in the message.

See the guide for more information about the detail level.

Source code in src/appl/core/types/content.py
def __init__(self, url: str, detail: Optional[str] = None) -> None:
    """Initialize the image with the URL and detail level.

    See [the guide](https://platform.openai.com/docs/guides/vision/low-or-high-fidelity-image-understanding)
    for more information about the detail level.
    """
    super().__init__(url=url, detail=detail)

from_file classmethod

from_file(
    file: PathLike, detail: Optional[str] = None
) -> Image

Construct an image prompt from an image file.

Source code in src/appl/core/types/content.py
@classmethod
def from_file(cls, file: PathLike, detail: Optional[str] = None) -> "Image":
    """Construct an image prompt from an image file."""
    image = PIL.Image.open(file)
    return cls.from_image(image, detail)

from_image classmethod

from_image(
    image: ImageFile, detail: Optional[str] = None
) -> Image

Construct an image prompt from a PIL ImageFile.

Source code in src/appl/core/types/content.py
@classmethod
def from_image(cls, image: ImageFile, detail: Optional[str] = None) -> "Image":
    """Construct an image prompt from a PIL ImageFile."""
    buffered = BytesIO()
    # Save the image to the buffer in PNG format
    image.save(buffered, format="PNG")
    # Get the byte data from the buffer
    img_byte = buffered.getvalue()
    img_base64 = base64.b64encode(img_byte).decode("utf-8")
    return cls(url=f"data:image/png;base64,{img_base64}", detail=detail)

MessageRole

MessageRole(
    type: Optional[str] = None, name: Optional[str] = None
)

Bases: BaseModel

The role of the message owner.

Parameters:

  • type (Optional[str], default: None ) –

    The type of the role.

  • name (Optional[str], default: None ) –

    An optional name for the role, differentiate between roles of the same type."

Source code in src/appl/core/types/role.py
def __init__(self, type: Optional[str] = None, name: Optional[str] = None):
    """Initialize the MessageRole object.

    Args:
        type: The type of the role.
        name: An optional name for the role, differentiate between roles of the same type."
    """
    super().__init__(type=type, name=name)

is_assistant property

is_assistant: bool

Whether the role is an assistant role.

is_system property

is_system: bool

Whether the role is a system role.

is_tool property

is_tool: bool

Whether the role is a tool role.

is_user property

is_user: bool

Whether the role is a user role.

get_dict

get_dict() -> Dict[str, Any]

Get the role as a dictionary.

Source code in src/appl/core/types/role.py
def get_dict(self) -> Dict[str, Any]:
    """Get the role as a dictionary."""
    data = {"role": self.type}
    if self.name:
        data["name"] = self.name
    return data

ResponseType

Bases: str, Enum

The type of generation response.

IMAGE class-attribute instance-attribute

IMAGE = 'image'

An image.

OBJECT class-attribute instance-attribute

OBJECT = 'obj'

An instance of a response model.

TEXT class-attribute instance-attribute

TEXT = 'text'

A text completion.

TOOL_CALL class-attribute instance-attribute

TOOL_CALL = 'tool_calls'

A list of tool calls.

UNFINISHED class-attribute instance-attribute

UNFINISHED = 'unfinished'

The response is not finished.

StringFuture

StringFuture(content: Any = '', set_value: bool = False)

Bases: FutureValue, BaseModel

StringFuture is a string that may not be ready yet.

Source code in src/appl/core/types/futures.py
def __init__(self, content: Any = "", set_value: bool = False):
    """Initialize the StringFuture."""
    if set_value:
        if not isinstance(content, List):
            raise ValueError("Cannot set value to non-list.")
        s = content
    else:
        s = [content]
    super().__init__(s=s)

val property

val

The value of the future.

from_list classmethod

from_list(content: List[Any]) -> StringFuture

Create a StringFuture from a list of content.

Source code in src/appl/core/types/futures.py
@classmethod
def from_list(cls, content: List[Any]) -> "StringFuture":
    """Create a StringFuture from a list of content."""
    return cls(content, set_value=True)

join

join(iterable: Iterable[StringFuture]) -> StringFuture

Concatenate any number of strings.

The StringFuture whose method is called is inserted in between each given StringFuture. The result is returned as a new StringFuture.

Source code in src/appl/core/types/futures.py
def join(self, iterable: Iterable["StringFuture"]) -> "StringFuture":
    """Concatenate any number of strings.

    The StringFuture whose method is called is inserted in between each
    given StringFuture. The result is returned as a new StringFuture.
    """
    result = []
    for i, x in enumerate(iterable):
        if i != 0:
            result.append(self)
        result.append(x)
    return StringFuture.from_list(result)

materialized

materialized() -> StringFuture

Materialize the StringFuture.

Source code in src/appl/core/types/futures.py
def materialized(self) -> "StringFuture":
    """Materialize the StringFuture."""
    self.s = [self._collapse()]
    return self

serialize

serialize() -> str

Serialize the StringFuture.

Source code in src/appl/core/types/futures.py
def serialize(self) -> str:
    """Serialize the StringFuture."""
    return str(self)

TraceEngine

TraceEngine(
    filename: str, mode: str = "write", strict: bool = True
)

Bases: TraceEngineBase

The engine used to record the trace of a program execution.

Parameters:

  • filename (str) –

    The filename storing the trace.

  • mode (str, default: 'write' ) –

    The mode of the trace, "write" or "read". Defaults to "write".

  • strict (bool, default: True ) –

    Whether to match strictly when used as a cache. Defaults to True.

    • True: matching according to the generation id, prompts, and parameters. And cache stops to work whenever a match failed.
    • False: only matching prompts and parameters.
Source code in src/appl/tracing/engine.py
def __init__(self, filename: str, mode: str = "write", strict: bool = True) -> None:
    """Initialize the TraceEngine.

    Args:
        filename: The filename storing the trace.
        mode: The mode of the trace, "write" or "read". Defaults to "write".
        strict:
            Whether to match strictly when used as a cache. Defaults to True.

            - True: matching according to the generation id, prompts, and
                parameters. And cache stops to work whenever a match failed.
            - False: only matching prompts and parameters.
    """
    self._mode = mode
    self._strict = strict
    self._events: List[TraceEventBase] = []  # events read from the file
    self._trace_nodes: Dict[str, TraceNode] = {}
    self._gen_cache: Dict[str, List[Any]] = {}
    self._lock = Lock()
    self._func_stack: List[str] = []

    if mode == "write":
        if os.path.exists(filename):
            logger.warning(f"Trace file {filename} already exists, overwriting")
        self._file = open(filename, "wb+")
    elif mode == "read":
        if not os.path.exists(filename):
            raise FileNotFoundError(f"Trace file {filename} not found")
        self._file = open(filename, "rb+")
        self._read()
    else:
        raise ValueError(f"Invalid mode {mode}, only 'write' or 'read' allowed.")

events property

events: List[TraceEventBase]

The list of events in the trace.

min_timestamp cached property

min_timestamp: float

The minimum time stamp of the events in the trace.

trace_nodes property

trace_nodes: Dict[str, TraceNode]

The dictionary of trace nodes.

append

append(event: TraceEventBase) -> None

Append an event to the trace.

Source code in src/appl/tracing/engine.py
def append(self, event: TraceEventBase) -> None:
    """Append an event to the trace."""
    if hasattr(event, "args"):
        event.args = self.args_to_json(event.args)

    if self._mode == "write":
        with self._lock:
            logger.debug(f"add to trace {event}")
            pickle.dump(event, self._file)
            self._file.flush()

    self._events.append(event)
    name, time_stamp = event.name, event.time_stamp
    assert name is not None
    if isinstance(event, FunctionCallEvent):
        newnode = self._add_node(name, self._last_func, type="func")
        newnode.start_time = time_stamp
        newnode.args = event.args
        self._func_stack.append(name)
    elif isinstance(event, FunctionReturnEvent):
        node = self._get_node(name)
        if node:
            node.end_time = time_stamp
        self._pop_func()
    elif isinstance(event, GenerationInitEvent):
        newnode = self._add_node(name, self._last_func)
        newnode.start_time = time_stamp
    elif isinstance(event, GenerationResponseEvent):
        node = self._get_node(name)
        if node:
            node.end_time = time_stamp
            node.args = event.args
            node.ret = event.ret
    elif isinstance(event, CompletionRequestEvent):
        # Use name + "_raw" to represent the raw completion request
        newnode = self._add_node(name + "_raw", name)
        newnode.start_time = time_stamp
    elif isinstance(event, CompletionResponseEvent):
        node = self._get_node(name + "_raw")
        if node:
            node.end_time = time_stamp
            node.args = event.args
            node.ret = event.ret
            node.info["cost"] = event.cost

        # cached for raw completion response
        key = self._cache_key(name, event.args)
        if key not in self._gen_cache:
            self._gen_cache[key] = []
        self._gen_cache[key].append(event.ret)

args_to_json classmethod

args_to_json(args: Dict) -> Dict

Serialize the values of the arguments to JSON format.

Source code in src/appl/tracing/engine.py
@classmethod
def args_to_json(cls, args: Dict) -> Dict:
    """Serialize the values of the arguments to JSON format."""
    args_json = {}
    for k, v in args.items():
        if isinstance(v, type) and issubclass(v, BaseModel):
            v = cls.convert_pydantic_class_to_schema(v)
        # TODO: shall we serialize everything?
        # elif k != "message":
        #     try:
        #         v = json.dumps(v)
        #     except:
        #         v = str(v)
        args_json[k] = v
    return args_json

convert_pydantic_class_to_schema classmethod

convert_pydantic_class_to_schema(class_: Type) -> Dict

Convert a class to a schema.

Parameters:

  • class_ (Type) –

    The class to convert

Source code in src/appl/tracing/engine.py
@classmethod
def convert_pydantic_class_to_schema(cls, class_: Type) -> Dict:
    """Convert a class to a schema.

    Args:
        class_: The class to convert
    """
    if issubclass(class_, BaseModel):
        return class_.model_json_schema()
    raise ValueError(f"Cannot convert class {class_} to schema")

find_cache

find_cache(name: str, args: Dict) -> Any

Find a cached response for a generation request.

Parameters:

  • name (str) –

    The name of the generation request.

  • args (Dict) –

    The arguments of the generation request.

Source code in src/appl/tracing/engine.py
def find_cache(self, name: str, args: Dict) -> Any:
    """Find a cached response for a generation request.

    Args:
        name: The name of the generation request.
        args: The arguments of the generation request.
    """
    args = self.args_to_json(args)
    with self._lock:
        entry_list = self._gen_cache.get(self._cache_key(name, args), None)
        if not entry_list or len(entry_list) == 0:
            return None
        entry = entry_list.pop(0)
        return entry

is_string

is_string(s: Any) -> bool

Check if the object is a StringFuture or str.

Source code in src/appl/core/types/futures.py
def is_string(s: Any) -> bool:
    """Check if the object is a StringFuture or str."""
    return isinstance(s, StringFuture) or isinstance(s, str)