Coverage for src/appl/core/types/futures.py: 85%
139 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-22 15:39 -0800
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-22 15:39 -0800
1import operator
2from abc import ABC, abstractmethod
3from concurrent.futures import Future
4from enum import Enum
5from typing import (
6 Any,
7 Callable,
8 Dict,
9 Generic,
10 Iterable,
11 List,
12 Optional,
13 SupportsIndex,
14 TypeVar,
15 Union,
16)
18from pydantic import BaseModel, Field
20from ..globals import ExecutorType, get_executor
22R = TypeVar("R")
25class FutureValue(ABC):
26 """Represents a value that may not be ready yet."""
28 @abstractmethod
29 def _get_val(self):
30 """Get the value of the future.
32 If the future is not ready, it will block until the value is ready.
33 """
34 raise NotImplementedError
36 @property
37 def val(self):
38 """The value of the future."""
39 return self._get_val()
41 def __call__(self):
42 """Use call to get the value of the future."""
43 return self.val
46class CallFuture(FutureValue, Generic[R]):
47 """Represent a function call that may not be ready yet."""
49 def __init__(
50 self,
51 func: Callable[..., R],
52 *args: Any,
53 executor_type: ExecutorType = ExecutorType.GENERAL_THREAD_POOL,
54 lazy_eval: bool = False,
55 **kwargs: Any,
56 ):
57 """Initialize the CallFuture.
59 Args:
60 func: The function to call.
61 *args: The arguments of the function.
62 executor_type: The type of the executor to run the call.
63 lazy_eval: Whether to delay the start of the call until needed.
64 **kwargs: The keyword arguments of the function.
65 """
66 self._executor_type = executor_type
67 self._executor = get_executor(executor_type)
68 self._submit_fn = lambda: self._executor.submit(func, *args, **kwargs)
69 self._submitted = False
70 self._info = func.__name__
71 # self._debug = False
72 # if self._debug:
73 # # arg and kwargs might contains future objects
74 # args_list = [f"{arg}" for arg in args] + [
75 # f"{k}={v!r}" for k, v in kwargs.items()
76 # ]
77 # args_str = ", ".join(args_list)
78 # self._info += f"({args_str})"
79 if not lazy_eval:
80 # delay the start of the call until needed
81 self._submit()
83 def _submit(self) -> None:
84 if not self._submitted:
85 self._future = self._submit_fn()
86 self._submitted = True
88 @property
89 def future(self) -> Future:
90 """The future object of the call."""
91 if not self._submitted:
92 self._submit()
93 return self._future
95 def result(self, timeout: Optional[float] = None) -> R:
96 """Get the result of the call."""
97 # This will block until the result is available
98 res = self.future.result(timeout)
99 if self._executor_type in [ExecutorType.NEW_THREAD, ExecutorType.NEW_PROCESS]:
100 self._executor.shutdown() # the executor is not needed anymore
101 return res
103 def cancel(self) -> bool:
104 """Cancel the call."""
105 # Attempt to cancel the call
106 res = self.future.cancel()
107 if res:
108 self._executor.shutdown() # the executor is not needed anymore
109 return res
111 def done(self) -> bool:
112 """Check if the call has completed."""
113 # Check if the future has completed
114 return self.future.done()
116 def _get_val(self):
117 return self.result()
119 def __str__(self):
120 return str(self.val)
122 def __repr__(self):
123 return repr(self.future)
126# TODO: boolean future
127class CmpStringFuture(FutureValue):
128 """Represent a comparison between a StringFuture and another value."""
130 def __init__(
131 self, a: "StringFuture", b: "StringFuture", op: Callable[[str, str], bool]
132 ):
133 """Initialize the CmpStringFuture."""
134 self._a = a
135 self._b = b
136 self._op = op
138 def __bool__(self):
139 return self._op(str(self._a), str(self._b))
141 def _get_val(self):
142 return self.__bool__()
145class StringFuture(FutureValue, BaseModel):
146 """StringFuture is a string that may not be ready yet."""
148 s: List[Any] = Field([], description="The string content")
150 def __init__(self, content: Any = "", set_value: bool = False):
151 """Initialize the StringFuture."""
152 if set_value:
153 if not isinstance(content, List):
154 raise ValueError("Cannot set value to non-list.")
155 s = content
156 else:
157 s = [content]
158 super().__init__(s=s)
160 @classmethod
161 def from_list(cls, content: List[Any]) -> "StringFuture":
162 """Create a StringFuture from a list of content."""
163 return cls(content, set_value=True)
165 def _collapse(self) -> str:
166 return "".join([str(x) for x in self.s])
168 def materialized(self) -> "StringFuture":
169 """Materialize the StringFuture."""
170 self.s = [self._collapse()]
171 return self
173 def serialize(self) -> str:
174 """Serialize the StringFuture."""
175 return str(self)
177 def join(self, iterable: Iterable["StringFuture"]) -> "StringFuture":
178 """Concatenate any number of strings.
180 The StringFuture whose method is called is inserted in between each
181 given StringFuture. The result is returned as a new StringFuture.
182 """
183 result = []
184 for i, x in enumerate(iterable):
185 if i != 0:
186 result.append(self)
187 result.append(x)
188 return StringFuture.from_list(result)
190 def _get_val(self):
191 return str(self)
193 def __str__(self) -> str:
194 return self.materialized().s[0]
196 def __hash__(self) -> int:
197 return hash(str(self))
199 def __contains__(self, item: str) -> bool:
200 return item in str(self)
202 def __getattr__(self, key):
203 if not hasattr(str, key):
204 raise AttributeError("str has no attribute " + key)
205 return getattr(str(self), key)
207 def __iadd__(self, other: "String") -> "StringFuture":
208 if isinstance(other, str):
209 self.s.append(other)
210 return self
211 elif isinstance(other, StringFuture):
212 self.s += other.s
213 return self
214 else:
215 raise RuntimeError("Cannot add StringFuture to non-string.")
217 def __radd__(self, other: str) -> "StringFuture":
218 if isinstance(other, str):
219 return StringFuture.from_list([other] + self.s)
220 else:
221 raise RuntimeError("Cannot add StringFuture to non-string.")
223 def __add__(self, other: "String") -> "StringFuture":
224 if isinstance(other, str):
225 return StringFuture.from_list(self.s + [other])
226 elif isinstance(other, StringFuture):
227 return StringFuture.from_list(self.s + other.s)
228 elif hasattr(other, "str_future"): # type: ignore # For custom type
229 return StringFuture.from_list(self.s + [other.str_future])
230 else:
231 raise RuntimeError("Cannot add StringFuture to non-string.")
233 def __eq__(self, other: Any) -> CmpStringFuture: # type: ignore
234 return CmpStringFuture(self, other, operator.eq)
236 def __ge__(self, other: Any) -> CmpStringFuture:
237 return CmpStringFuture(self, other, operator.ge)
239 def __gt__(self, other: Any) -> CmpStringFuture:
240 return CmpStringFuture(self, other, operator.gt)
242 def __le__(self, other: Any) -> CmpStringFuture:
243 return CmpStringFuture(self, other, operator.le)
245 def __lt__(self, other: Any) -> CmpStringFuture:
246 return CmpStringFuture(self, other, operator.lt)
248 def __ne__(self, other: Any) -> CmpStringFuture: # type: ignore
249 return CmpStringFuture(self, other, operator.ne)
251 def __format__(self, __format_spec: str) -> str:
252 return str(self).__format__(__format_spec)
254 def __getitem__(self, key: Union[SupportsIndex, slice]) -> "StringFuture":
255 def func():
256 return str(self)[key]
258 return StringFuture(CallFuture(func))
260 def __deepcopy__(self, memo: Optional[Dict[int, Any]] = None) -> "StringFuture":
261 # materialize before copying, to avoid functions wrapped in StringFuture running twice.
262 return StringFuture(str(self))
265# Type aliases
266String = Union[StringFuture, str]
267"""String is a type alias for StringFuture or str."""
270def is_string(s: Any) -> bool:
271 """Check if the object is a StringFuture or str."""
272 return isinstance(s, StringFuture) or isinstance(s, str)