Initialize APPL to work with multi-threading, including logging and tracing.
Parameters:
-
log_file_prefix
(Optional[str]
, default:
None
)
–
The prefix for the log file. Defaults to use the path of the main log file.
-
gen_name_prefix
(Optional[str]
, default:
None
)
–
The prefix for the generation name. Defaults to use the thread name.
Examples:
def function_run_in_thread():
with appl.init_within_thread():
# do something within the thread
Source code in src/appl/__init__.py
| @contextmanager
def init_within_thread(
log_file_prefix: Optional[str] = None, gen_name_prefix: Optional[str] = None
) -> Any:
"""Initialize APPL to work with multi-threading, including logging and tracing.
Args:
log_file_prefix: The prefix for the log file. Defaults to use the path of the main log file.
gen_name_prefix: The prefix for the generation name. Defaults to use the thread name.
Examples:
```python
def function_run_in_thread():
with appl.init_within_thread():
# do something within the thread
```
"""
handler_id = None
try:
thread_name = threading.current_thread().name
logging_settings = global_vars.configs.settings.logging
def filter_thread_record(record: Dict) -> bool:
assert hasattr(record["thread"], "name")
# Use prefix match to filter the log records in different threads
name = record["thread"].name
return name == thread_name or name.startswith(thread_name + "_")
if logging_settings.enable_file:
if log_file_prefix is None:
assert (
global_vars.metadata.log_file is not None
), "should have log file set"
thread_log_path = os.path.join(
global_vars.metadata.log_file[: -len(".log")] + "_logs",
f"{thread_name}.log",
)
else:
thread_log_path = f"{log_file_prefix}_{thread_name}.log"
log_level = (
logging_settings.log_file.log_level or logging_settings.log_level
)
# The logger append to the file by default, not overwrite.
handler_id = logger.add(
thread_log_path,
level=log_level,
format=logging_settings.format,
filter=filter_thread_record, # type: ignore
)
if gen_name_prefix:
set_gen_name_prefix(gen_name_prefix)
# ? shall we reset the prefix after exiting the context?
logger.info(
f"Thread {thread_name}, set generation name prefix as: {gen_name_prefix}"
)
if handler_id is None:
logger.warning("logging is not enabled")
yield thread_log_path
except Exception as e:
logger.error(f"Error in thread: {e}")
raise e
finally:
if handler_id:
logger.remove(handler_id)
|