Coverage for src/appl/core/utils.py: 68%

65 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-22 15:39 -0800

1import functools 

2import os 

3import shutil 

4from typing import Any, Callable, Optional, Tuple, TypeVar 

5 

6from rich.live import Live 

7from rich.panel import Panel 

8from rich.style import StyleType 

9from rich.syntax import Syntax 

10 

11from .config import configs 

12from .globals import global_vars 

13from .types import ParamSpec 

14 

15 

16def strip_for_continue(content: str, chars: str = " \t`{[('\"\n") -> str: 

17 """Strip the content so that the last part is more informative to be matched.""" 

18 return content.rstrip(chars) 

19 

20 

21def split_last(content: str, split_marker: str = "\n") -> Tuple[Optional[str], str]: 

22 """Split the content at the last split marker, return content before and after the last marker. 

23 

24 If the split marker is not found in the content, return None and the original content. 

25 

26 Args: 

27 content: The content to split. 

28 split_marker: The marker to split the content at. 

29 

30 Returns: 

31 A tuple of the content before and after the last marker. 

32 The first element could be None when the split marker is not found. 

33 """ 

34 if split_marker not in content: 

35 return None, content 

36 result = content.rsplit(split_marker, 1) 

37 assert len(result) == 2 

38 return result[0], result[1] 

39 

40 

41def make_panel( 

42 content: str, 

43 *, 

44 title: Optional[str] = None, 

45 style: StyleType = "none", 

46 language: Optional[str] = None, 

47 truncate: bool = False, 

48) -> Panel: 

49 """Display a panel in the terminal with the given title and content.""" 

50 if truncate: 

51 terminal_height = shutil.get_terminal_size().lines 

52 content = os.linesep.join(content.splitlines()[-terminal_height + 2 :]) 

53 

54 rich_configs = configs.getattrs("settings.logging.display.rich") 

55 if language is None: 

56 language = rich_configs.get("language", "markdown") 

57 theme = rich_configs.get("theme", "monokai") 

58 line_numbers = rich_configs.get("line_numbers", False) 

59 word_wrap = rich_configs.get("word_wrap", True) 

60 

61 syntax = Syntax( 

62 content, language, theme=theme, line_numbers=line_numbers, word_wrap=word_wrap 

63 ) 

64 return Panel(syntax, title=title, style=style) 

65 

66 

67def get_live() -> Live: 

68 """Get the live object, create one if not exists.""" 

69 with global_vars.live_lock: 

70 if global_vars.live is None: 

71 refresh_per_second = configs.getattrs( 

72 "settings.logging.display.rich.refresh_per_second", 4 

73 ) 

74 global_vars.live = Live( 

75 make_panel( 

76 "Waiting for Response ...", 

77 title="APPL Streaming", 

78 style="magenta", 

79 ), 

80 refresh_per_second=refresh_per_second, 

81 ) 

82 global_vars.live.start() 

83 return global_vars.live 

84 

85 

86def stop_live() -> None: 

87 """Stop the live object.""" 

88 with global_vars.live_lock: 

89 if global_vars.live is not None: 

90 global_vars.live.stop() 

91 global_vars.live = None 

92 

93 

94P = ParamSpec("P") 

95T = TypeVar("T") 

96F = TypeVar("F", bound=Callable) # function 

97R = TypeVar("R") # return value 

98APPL_NEED_CTX_ATTR = "__need_ctx__" 

99 

100 

101def _copy_appl_attrs(func: Callable, new_func: Callable) -> None: 

102 for attr in [APPL_NEED_CTX_ATTR]: 

103 if hasattr(func, attr): 

104 setattr(new_func, attr, getattr(func, attr)) 

105 

106 

107def need_ctx(func: Callable[P, T]) -> Callable[P, T]: 

108 """Decorate a function to mark it as needing a prompt context.""" 

109 setattr(func, APPL_NEED_CTX_ATTR, True) 

110 return func 

111 

112 

113def partial(func: Callable[..., R], *args: Any, **kwargs: Any) -> Callable[..., R]: 

114 """Create a new function with partial application of the given arguments and keywords.""" 

115 new_func = functools.partial(func, *args, **kwargs) 

116 _copy_appl_attrs(func, new_func) 

117 return new_func 

118 

119 

120def wraps(func: F) -> Callable[[Callable], F]: 

121 """Replace the functools.wraps to take care of the type hint.""" 

122 

123 def decorator(wrapper: Callable) -> F: 

124 new_wrapper = functools.wraps(func)(wrapper) 

125 _copy_appl_attrs(func, new_wrapper) 

126 return new_wrapper # type: ignore 

127 

128 return decorator