Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion activitysim/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,18 @@ def prog():


def main():
# set all these before we import numpy or any other math library
prevent_multithreading = False
if len(sys.argv) > 1 and sys.argv[1] == "benchmark":
# We cannot use multiple threads for benchmarking, as it can foil timing
prevent_multithreading = True
if "--fast" not in sys.argv:
# If we are not in fast mode, we want to prevent multithreading to avoid
# issues with some libraries crashing. Fast mode is not stable enough
# at scale to use by default, but can be used for testing and development.
prevent_multithreading = True

# set all these before we import numpy or any other math library
if prevent_multithreading:
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
Expand Down
19 changes: 18 additions & 1 deletion activitysim/core/mp_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from activitysim.core import config, mem, tracing, util, workflow
from activitysim.core.configuration import FileSystem, Settings
from activitysim.core.exceptions import *
from activitysim.core.run_id import RunId
from activitysim.core.workflow.checkpoint import (
CHECKPOINT_NAME,
Expand All @@ -27,7 +28,6 @@
NON_TABLE_COLUMNS,
ParquetStore,
)
from activitysim.core.exceptions import *

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -257,6 +257,21 @@ def exception(state: workflow.State, msg, write_to_log_file=True):
logger.log(logging.ERROR, f"\n---\n{traceback.format_exc()}---\n")


def log_environment_info(state: workflow.State):
"""log environment info for debugging purposes."""
if os.environ.get("ASIM_LOG_ENVIRON", False):
process_name = multiprocessing.current_process().name
environ_summary = f"OS.ENVIRON in process {process_name}:"
for k, v in os.environ.items():
environ_summary += f"\n--- {k}: {v}"
info(state, environ_summary)
else:
info(
state,
"ASIM_LOG_ENVIRON not set, skipping logging of environment variables. Set ASIM_LOG_ENVIRON=1 to log environment variables.",
)


"""
### child process methods (called within sub process)
"""
Expand Down Expand Up @@ -1097,6 +1112,7 @@ def mp_run_simulation(
state,
f"mp_run_simulation {step_info['name']} locutor={state.get_injectable('locutor', False)} ",
)
log_environment_info(state)

try:
if step_info["num_processes"] > 1:
Expand Down Expand Up @@ -1417,6 +1433,7 @@ def check_proc_status(state: workflow.State):
completed = set(previously_completed)
failed = set([]) # so we can log process failure first time it happens
drop_breadcrumb(state, step_name, "completed", list(completed))
log_environment_info(state)

for i, process_name in enumerate(process_names):
q = multiprocessing.Queue()
Expand Down
56 changes: 56 additions & 0 deletions other_resources/scripts/clear_sharrow_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env -S uv run --script --no-project
#
# /// script
# requires-python = ">=3.10,<3.12"
# dependencies = [
# "platformdirs",
# ]
# ///

from __future__ import annotations

import shutil
from datetime import datetime
from pathlib import Path

import platformdirs


def clear_sharrow_cache(archive: bool = True):
"""Disable all files in the sharrow cache directory.

Parameters
----------
archive : bool, optional
If True, move the files to an 'archive' subdirectory instead of deleting them,
by default True.
"""
main_dir = Path(platformdirs.user_cache_dir(appname="ActivitySim"))
print(f"Scanning ActivitySim cache directory for sharrow files: {main_dir}")
sharrow_cache_dirs = main_dir.glob("sharrow-*-numba-*")
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")

# define the archive path if archiving is enabled
if archive:
archive_path = Path(
platformdirs.user_cache_dir(appname="ActivitySim")
).joinpath(f"archive-{timestamp}")
else:
archive_path = None

for sharrow_cache_dir in sharrow_cache_dirs:
if sharrow_cache_dir.is_dir():
print(f"Clearing sharrow cache directory: {sharrow_cache_dir}")
if archive:
archive_path.mkdir(parents=True, exist_ok=True)
shutil.move(
str(sharrow_cache_dir), str(archive_path / sharrow_cache_dir.name)
)
else:
shutil.rmtree(str(sharrow_cache_dir))
else:
print("No sharrow cache directories found.")


if __name__ == "__main__":
clear_sharrow_cache(archive=True)
Loading