Coverage for src/appl/core/types/futures.py: 85%

139 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-22 15:39 -0800

1import operator 

2from abc import ABC, abstractmethod 

3from concurrent.futures import Future 

4from enum import Enum 

5from typing import ( 

6 Any, 

7 Callable, 

8 Dict, 

9 Generic, 

10 Iterable, 

11 List, 

12 Optional, 

13 SupportsIndex, 

14 TypeVar, 

15 Union, 

16) 

17 

18from pydantic import BaseModel, Field 

19 

20from ..globals import ExecutorType, get_executor 

21 

22R = TypeVar("R") 

23 

24 

25class FutureValue(ABC): 

26 """Represents a value that may not be ready yet.""" 

27 

28 @abstractmethod 

29 def _get_val(self): 

30 """Get the value of the future. 

31 

32 If the future is not ready, it will block until the value is ready. 

33 """ 

34 raise NotImplementedError 

35 

36 @property 

37 def val(self): 

38 """The value of the future.""" 

39 return self._get_val() 

40 

41 def __call__(self): 

42 """Use call to get the value of the future.""" 

43 return self.val 

44 

45 

46class CallFuture(FutureValue, Generic[R]): 

47 """Represent a function call that may not be ready yet.""" 

48 

49 def __init__( 

50 self, 

51 func: Callable[..., R], 

52 *args: Any, 

53 executor_type: ExecutorType = ExecutorType.GENERAL_THREAD_POOL, 

54 lazy_eval: bool = False, 

55 **kwargs: Any, 

56 ): 

57 """Initialize the CallFuture. 

58 

59 Args: 

60 func: The function to call. 

61 *args: The arguments of the function. 

62 executor_type: The type of the executor to run the call. 

63 lazy_eval: Whether to delay the start of the call until needed. 

64 **kwargs: The keyword arguments of the function. 

65 """ 

66 self._executor_type = executor_type 

67 self._executor = get_executor(executor_type) 

68 self._submit_fn = lambda: self._executor.submit(func, *args, **kwargs) 

69 self._submitted = False 

70 self._info = func.__name__ 

71 # self._debug = False 

72 # if self._debug: 

73 # # arg and kwargs might contains future objects 

74 # args_list = [f"{arg}" for arg in args] + [ 

75 # f"{k}={v!r}" for k, v in kwargs.items() 

76 # ] 

77 # args_str = ", ".join(args_list) 

78 # self._info += f"({args_str})" 

79 if not lazy_eval: 

80 # delay the start of the call until needed 

81 self._submit() 

82 

83 def _submit(self) -> None: 

84 if not self._submitted: 

85 self._future = self._submit_fn() 

86 self._submitted = True 

87 

88 @property 

89 def future(self) -> Future: 

90 """The future object of the call.""" 

91 if not self._submitted: 

92 self._submit() 

93 return self._future 

94 

95 def result(self, timeout: Optional[float] = None) -> R: 

96 """Get the result of the call.""" 

97 # This will block until the result is available 

98 res = self.future.result(timeout) 

99 if self._executor_type in [ExecutorType.NEW_THREAD, ExecutorType.NEW_PROCESS]: 

100 self._executor.shutdown() # the executor is not needed anymore 

101 return res 

102 

103 def cancel(self) -> bool: 

104 """Cancel the call.""" 

105 # Attempt to cancel the call 

106 res = self.future.cancel() 

107 if res: 

108 self._executor.shutdown() # the executor is not needed anymore 

109 return res 

110 

111 def done(self) -> bool: 

112 """Check if the call has completed.""" 

113 # Check if the future has completed 

114 return self.future.done() 

115 

116 def _get_val(self): 

117 return self.result() 

118 

119 def __str__(self): 

120 return str(self.val) 

121 

122 def __repr__(self): 

123 return repr(self.future) 

124 

125 

126# TODO: boolean future 

127class CmpStringFuture(FutureValue): 

128 """Represent a comparison between a StringFuture and another value.""" 

129 

130 def __init__( 

131 self, a: "StringFuture", b: "StringFuture", op: Callable[[str, str], bool] 

132 ): 

133 """Initialize the CmpStringFuture.""" 

134 self._a = a 

135 self._b = b 

136 self._op = op 

137 

138 def __bool__(self): 

139 return self._op(str(self._a), str(self._b)) 

140 

141 def _get_val(self): 

142 return self.__bool__() 

143 

144 

145class StringFuture(FutureValue, BaseModel): 

146 """StringFuture is a string that may not be ready yet.""" 

147 

148 s: List[Any] = Field([], description="The string content") 

149 

150 def __init__(self, content: Any = "", set_value: bool = False): 

151 """Initialize the StringFuture.""" 

152 if set_value: 

153 if not isinstance(content, List): 

154 raise ValueError("Cannot set value to non-list.") 

155 s = content 

156 else: 

157 s = [content] 

158 super().__init__(s=s) 

159 

160 @classmethod 

161 def from_list(cls, content: List[Any]) -> "StringFuture": 

162 """Create a StringFuture from a list of content.""" 

163 return cls(content, set_value=True) 

164 

165 def _collapse(self) -> str: 

166 return "".join([str(x) for x in self.s]) 

167 

168 def materialized(self) -> "StringFuture": 

169 """Materialize the StringFuture.""" 

170 self.s = [self._collapse()] 

171 return self 

172 

173 def serialize(self) -> str: 

174 """Serialize the StringFuture.""" 

175 return str(self) 

176 

177 def join(self, iterable: Iterable["StringFuture"]) -> "StringFuture": 

178 """Concatenate any number of strings. 

179 

180 The StringFuture whose method is called is inserted in between each 

181 given StringFuture. The result is returned as a new StringFuture. 

182 """ 

183 result = [] 

184 for i, x in enumerate(iterable): 

185 if i != 0: 

186 result.append(self) 

187 result.append(x) 

188 return StringFuture.from_list(result) 

189 

190 def _get_val(self): 

191 return str(self) 

192 

193 def __str__(self) -> str: 

194 return self.materialized().s[0] 

195 

196 def __hash__(self) -> int: 

197 return hash(str(self)) 

198 

199 def __contains__(self, item: str) -> bool: 

200 return item in str(self) 

201 

202 def __getattr__(self, key): 

203 if not hasattr(str, key): 

204 raise AttributeError("str has no attribute " + key) 

205 return getattr(str(self), key) 

206 

207 def __iadd__(self, other: "String") -> "StringFuture": 

208 if isinstance(other, str): 

209 self.s.append(other) 

210 return self 

211 elif isinstance(other, StringFuture): 

212 self.s += other.s 

213 return self 

214 else: 

215 raise RuntimeError("Cannot add StringFuture to non-string.") 

216 

217 def __radd__(self, other: str) -> "StringFuture": 

218 if isinstance(other, str): 

219 return StringFuture.from_list([other] + self.s) 

220 else: 

221 raise RuntimeError("Cannot add StringFuture to non-string.") 

222 

223 def __add__(self, other: "String") -> "StringFuture": 

224 if isinstance(other, str): 

225 return StringFuture.from_list(self.s + [other]) 

226 elif isinstance(other, StringFuture): 

227 return StringFuture.from_list(self.s + other.s) 

228 elif hasattr(other, "str_future"): # type: ignore # For custom type 

229 return StringFuture.from_list(self.s + [other.str_future]) 

230 else: 

231 raise RuntimeError("Cannot add StringFuture to non-string.") 

232 

233 def __eq__(self, other: Any) -> CmpStringFuture: # type: ignore 

234 return CmpStringFuture(self, other, operator.eq) 

235 

236 def __ge__(self, other: Any) -> CmpStringFuture: 

237 return CmpStringFuture(self, other, operator.ge) 

238 

239 def __gt__(self, other: Any) -> CmpStringFuture: 

240 return CmpStringFuture(self, other, operator.gt) 

241 

242 def __le__(self, other: Any) -> CmpStringFuture: 

243 return CmpStringFuture(self, other, operator.le) 

244 

245 def __lt__(self, other: Any) -> CmpStringFuture: 

246 return CmpStringFuture(self, other, operator.lt) 

247 

248 def __ne__(self, other: Any) -> CmpStringFuture: # type: ignore 

249 return CmpStringFuture(self, other, operator.ne) 

250 

251 def __format__(self, __format_spec: str) -> str: 

252 return str(self).__format__(__format_spec) 

253 

254 def __getitem__(self, key: Union[SupportsIndex, slice]) -> "StringFuture": 

255 def func(): 

256 return str(self)[key] 

257 

258 return StringFuture(CallFuture(func)) 

259 

260 def __deepcopy__(self, memo: Optional[Dict[int, Any]] = None) -> "StringFuture": 

261 # materialize before copying, to avoid functions wrapped in StringFuture running twice. 

262 return StringFuture(str(self)) 

263 

264 

265# Type aliases 

266String = Union[StringFuture, str] 

267"""String is a type alias for StringFuture or str.""" 

268 

269 

270def is_string(s: Any) -> bool: 

271 """Check if the object is a StringFuture or str.""" 

272 return isinstance(s, StringFuture) or isinstance(s, str)