Coverage for src/appl/core/utils.py: 68%
65 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-22 15:39 -0800
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-22 15:39 -0800
1import functools
2import os
3import shutil
4from typing import Any, Callable, Optional, Tuple, TypeVar
6from rich.live import Live
7from rich.panel import Panel
8from rich.style import StyleType
9from rich.syntax import Syntax
11from .config import configs
12from .globals import global_vars
13from .types import ParamSpec
16def strip_for_continue(content: str, chars: str = " \t`{[('\"\n") -> str:
17 """Strip the content so that the last part is more informative to be matched."""
18 return content.rstrip(chars)
21def split_last(content: str, split_marker: str = "\n") -> Tuple[Optional[str], str]:
22 """Split the content at the last split marker, return content before and after the last marker.
24 If the split marker is not found in the content, return None and the original content.
26 Args:
27 content: The content to split.
28 split_marker: The marker to split the content at.
30 Returns:
31 A tuple of the content before and after the last marker.
32 The first element could be None when the split marker is not found.
33 """
34 if split_marker not in content:
35 return None, content
36 result = content.rsplit(split_marker, 1)
37 assert len(result) == 2
38 return result[0], result[1]
41def make_panel(
42 content: str,
43 *,
44 title: Optional[str] = None,
45 style: StyleType = "none",
46 language: Optional[str] = None,
47 truncate: bool = False,
48) -> Panel:
49 """Display a panel in the terminal with the given title and content."""
50 if truncate:
51 terminal_height = shutil.get_terminal_size().lines
52 content = os.linesep.join(content.splitlines()[-terminal_height + 2 :])
54 rich_configs = configs.getattrs("settings.logging.display.rich")
55 if language is None:
56 language = rich_configs.get("language", "markdown")
57 theme = rich_configs.get("theme", "monokai")
58 line_numbers = rich_configs.get("line_numbers", False)
59 word_wrap = rich_configs.get("word_wrap", True)
61 syntax = Syntax(
62 content, language, theme=theme, line_numbers=line_numbers, word_wrap=word_wrap
63 )
64 return Panel(syntax, title=title, style=style)
67def get_live() -> Live:
68 """Get the live object, create one if not exists."""
69 with global_vars.live_lock:
70 if global_vars.live is None:
71 refresh_per_second = configs.getattrs(
72 "settings.logging.display.rich.refresh_per_second", 4
73 )
74 global_vars.live = Live(
75 make_panel(
76 "Waiting for Response ...",
77 title="APPL Streaming",
78 style="magenta",
79 ),
80 refresh_per_second=refresh_per_second,
81 )
82 global_vars.live.start()
83 return global_vars.live
86def stop_live() -> None:
87 """Stop the live object."""
88 with global_vars.live_lock:
89 if global_vars.live is not None:
90 global_vars.live.stop()
91 global_vars.live = None
94P = ParamSpec("P")
95T = TypeVar("T")
96F = TypeVar("F", bound=Callable) # function
97R = TypeVar("R") # return value
98APPL_NEED_CTX_ATTR = "__need_ctx__"
101def _copy_appl_attrs(func: Callable, new_func: Callable) -> None:
102 for attr in [APPL_NEED_CTX_ATTR]:
103 if hasattr(func, attr):
104 setattr(new_func, attr, getattr(func, attr))
107def need_ctx(func: Callable[P, T]) -> Callable[P, T]:
108 """Decorate a function to mark it as needing a prompt context."""
109 setattr(func, APPL_NEED_CTX_ATTR, True)
110 return func
113def partial(func: Callable[..., R], *args: Any, **kwargs: Any) -> Callable[..., R]:
114 """Create a new function with partial application of the given arguments and keywords."""
115 new_func = functools.partial(func, *args, **kwargs)
116 _copy_appl_attrs(func, new_func)
117 return new_func
120def wraps(func: F) -> Callable[[Callable], F]:
121 """Replace the functools.wraps to take care of the type hint."""
123 def decorator(wrapper: Callable) -> F:
124 new_wrapper = functools.wraps(func)(wrapper)
125 _copy_appl_attrs(func, new_wrapper)
126 return new_wrapper # type: ignore
128 return decorator