Skip to content

General utilities

read_config

read_config(config_file: Union[str, Path]) -> ConfigParser

Read a .ini config file and return a ConfigParser object.

Parameters

config_file : str, Path Absolute path to the config file to be read. The config file should be in the .ini format [1].

Returns

configparser.ConfigParser A ConfigParser object containing the configuration parameters. To access the parameters, use the config.get(section, option) method, where section is the name of the section in the config file and option is the name of the option within that section.

Source code in tinkertool/utils/read_files.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def read_config(
    config_file: Union[str, Path]
) -> configparser.ConfigParser:
    """Read a .ini config file and return a ConfigParser object.

    Parameters
    ----------
    config_file : str, Path
        Absolute path to the config file to be read. The config file should be in the .ini format [1].

    Returns
    -------
    configparser.ConfigParser
        A ConfigParser object containing the configuration parameters.
        To access the parameters, use the `config.get(section, option)` method,
        where `section` is the name of the section in the config file and `option`
        is the name of the option within that section.
    """

    config_file = Path(config_file).resolve()
    with open(config_file) as f:
        config = configparser.ConfigParser()
        config.optionxform = str # Preserve case sensitivity of option names
        config.read_file(f)

    config.input_file = config_file
    return copy.copy(config)

safe_get_param_value

safe_get_param_value(
    config_section, option: str, fallback=None
) -> Any

Get a parameter value from config, converting 'nan', 'none', 'null', '' strings to None or fallback.

Parameters

config_section : configparser.SectionProxy The config section to read from option : str The option name to get fallback : any, optional Value to return if option doesn't exist or is nan/none/null/empty string, by default None

Returns

Any The parameter value, or None if it was a nan/none/null/empty string

Source code in tinkertool/utils/read_files.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def safe_get_param_value(
    config_section,
    option: str,
    fallback=None
) -> Any:
    """Get a parameter value from config,
    converting 'nan', 'none', 'null', '' strings to None or fallback.

    Parameters
    ----------
    config_section : configparser.SectionProxy
        The config section to read from
    option : str
        The option name to get
    fallback : any, optional
        Value to return if option doesn't exist or is nan/none/null/empty string, by default None

    Returns
    -------
    Any
        The parameter value, or None if it was a nan/none/null/empty string
    """
    try:
        # If the option is absent, config_section.get should return fallback.
        # Use get with fallback=None to detect "missing" vs "present-but-empty".
        raw = config_section.get(option, fallback=None)

        # If option is missing, return the caller's fallback unchanged.
        if raw is None:
            return fallback

        # If value is a string sentinel meaning "no value", return None.
        if isinstance(raw, str):
            if raw.strip() == "" or raw.strip().lower() in ("nan", "none", "null"):
                return fallback

        # Otherwise return the raw (present) value.
        return raw
    except (configparser.NoOptionError, configparser.NoSectionError):
        return fallback

check_if_ctsm_param_is_perturbed

check_if_ctsm_param_is_perturbed(
    param_ranges_inpath: str | Path,
) -> bool

Check if any CTSM parameters are perturbed in the parameter ranges file.

Parameters

param_ranges_inpath : str | Path The path to the parameter ranges file.

Returns

bool True if any CTSM parameters are perturbed, False otherwise.

Source code in tinkertool/utils/make_land_parameterfiles.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def check_if_ctsm_param_is_perturbed(
    param_ranges_inpath: str | Path
) -> bool:
    """Check if any CTSM parameters are perturbed in the parameter ranges file.

    Parameters
    ----------
    param_ranges_inpath : str | Path
        The path to the parameter ranges file.

    Returns
    -------
    bool
        True if any CTSM parameters are perturbed, False otherwise.
    """
    ctsm_param_flag = "CTSM_param_file"
    return _check_keyword_in_stream(param_ranges_inpath, ctsm_param_flag)

check_if_fates_param_is_perturbed

check_if_fates_param_is_perturbed(
    param_ranges_inpath: str | Path,
) -> bool

Check if any FATES parameters are perturbed in the parameter ranges file.

Parameters

param_ranges_inpath : str | Path The path to the parameter ranges file.

Returns

bool True if any FATES parameters are perturbed, False otherwise.

Source code in tinkertool/utils/make_land_parameterfiles.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def check_if_fates_param_is_perturbed(
    param_ranges_inpath: str | Path
) -> bool:
    """Check if any FATES parameters are perturbed in the parameter ranges file.

    Parameters
    ----------
    param_ranges_inpath : str | Path
        The path to the parameter ranges file.

    Returns
    -------
    bool
        True if any FATES parameters are perturbed, False otherwise.
    """
    fates_param_flag = "FATES_param_file"
    return _check_keyword_in_file(param_ranges_inpath, fates_param_flag)

make_new_ctsm_pamfile

make_new_ctsm_pamfile(
    pam_change_dict: dict,
    orig_pamfile: str | Path,
    file_dump: str | Path = "ctsm_pamfile.nc",
) -> Path

Make a new ctsm pamfile for the PPE, by changing parameters as specified in pam_change_dict.

Parameters

pam_change_dict : dict Dictionary with parameter names as keys and new values or scaling factors as values. The pam_change_dict has parameter should have name of parameter to change as key, and change value as value. For scalar parameters this is assumed to be the new value of the parameter. For vector parameters the value is assumed to be a scaling value to scale the value in the original file with orig_pamfile : str or Path Path to the original CTSM parameter file to be modified. file_dump : str or Path, optional Path to save the new CTSM parameter file, by default "ctsm_pamfile.nc".

Source code in tinkertool/utils/make_land_parameterfiles.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def make_new_ctsm_pamfile(
    pam_change_dict:    dict,
    orig_pamfile:       str | Path,
    file_dump:          str | Path = "ctsm_pamfile.nc"
) -> Path:
    """Make a new ctsm pamfile for the PPE, by changing parameters as specified in pam_change_dict.

    Parameters
    ----------
    pam_change_dict : dict
        Dictionary with parameter names as keys and new values or scaling
        factors as values. The pam_change_dict has parameter should have
        name of parameter to change as key, and change value as value.
        For scalar parameters this is assumed to be the new value of
        the parameter. For vector parameters the value is assumed to
        be a scaling value to scale the value in the original file with
    orig_pamfile : str or Path
        Path to the original CTSM parameter file to be modified.
    file_dump : str or Path, optional
        Path to save the new CTSM parameter file, by default "ctsm_pamfile.nc".
    """
    if not Path(orig_pamfile).exists():
        logging.error(f"Original CTSM parameter file {orig_pamfile} not found")
        raise FileNotFoundError(f"Original CTSM parameter file {orig_pamfile} not found")

    ctsm_orig = xr.open_dataset(orig_pamfile, decode_cf=False)
    logging.debug(f"CTSM parameter file keys: {list(ctsm_orig.keys())}")
    for pam, new_value in pam_change_dict.items():
        logging.debug(f"Processing parameter: {pam}")
        if pam not in ctsm_orig.keys():
            logging.warning(f"Parameter {pam} not found in CTSM file {orig_pamfile}")
            continue
        logging.debug(f"Parameter {pam} shape: {ctsm_orig[pam].shape}")
        logging.debug(f"Parameter {pam} original value: {ctsm_orig[pam].values}")
        if isinstance(ctsm_orig[pam], abc.Sequence) and not isinstance(ctsm_orig[pam], str):
            logging.debug(f"Parameter {pam} is sequence - applying scaling factor {new_value}")
            ctsm_orig[pam] = new_value * ctsm_orig[pam]
        else:
            logging.debug(f"Parameter {pam} is scalar - setting new value {new_value}")
            ctsm_orig[pam] = new_value

    validate_file(file_path=file_dump, expected_suffix=".nc", description="Generated CTSM parameter file", new_file=True)
    logging.debug(f"Generated CTSM parameter file: {file_dump}")
    ctsm_orig.to_netcdf(file_dump)

    return Path(file_dump).resolve()

make_new_fates_pamfile

make_new_fates_pamfile(
    pam_change_dict: dict,
    orig_pamfile: str | Path,
    file_dump: str | Path = "fates_pamfile.nc",
) -> Path

Make a new fates pamfile for the PPE, by changing parameters as specified in pam_change_dict.

Parameters

pam_change_dict : dict Dictionary with parameter names as keys and new values or scaling factors as values. The pam_change_dict has parameter should have name of parameter to change as key, and change value as value. For scalar parameters this is assumed to be the new value of the parameter. For vector parameters the value is assumed to be a scaling value to scale the value in the original file with. orig_pamfile : str | Path Path to the original FATES parameter file to be modified. file_dump : str | Path, optional Path to save the new FATES parameter file, by default "fates_pamfile.nc".

Returns

Path Path to the new FATES parameter file.

Raises

FileNotFoundError If the original FATES parameter file is not found.

Source code in tinkertool/utils/make_land_parameterfiles.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def make_new_fates_pamfile(
    pam_change_dict:    dict,
    orig_pamfile:       str | Path,
    file_dump:          str | Path = "fates_pamfile.nc"
) -> Path:
    """Make a new fates pamfile for the PPE, by changing parameters as
    specified in pam_change_dict.

    Parameters
    ----------
    pam_change_dict : dict
        Dictionary with parameter names as keys and new values or scaling
        factors as values. The pam_change_dict has parameter should have
        name of parameter to change as key, and change value as value.
        For scalar parameters this is assumed to be the new value of
        the parameter. For vector parameters the value is assumed to
        be a scaling value to scale the value in the original file with.
    orig_pamfile : str | Path
        Path to the original FATES parameter file to be modified.
    file_dump : str | Path, optional
        Path to save the new FATES parameter file, by default "fates_pamfile.nc".

    Returns
    -------
    Path
        Path to the new FATES parameter file.

    Raises
    ------
    FileNotFoundError
        If the original FATES parameter file is not found.
    """

    if not Path(orig_pamfile).exists():
        logging.error(f"Original FATES parameter file {orig_pamfile} not found")
        raise FileNotFoundError(f"Original FATES parameter file {orig_pamfile} not found")

    fates_orig = xr.open_dataset(orig_pamfile)
    logging.debug(f"FATES parameter file dimensions: {fates_orig.dims}")
    for pam, new_value in pam_change_dict.items():
        logging.debug(f"Processing parameter: {pam}")
        if pam not in fates_orig.keys():
            logging.warning(f"Parameter {pam} not found in FATES file {orig_pamfile}")
            continue
        if fates_orig[pam].shape[0] > 1:
            logging.debug(f"Parameter {pam} is sequence - applying scaling factor {new_value}")
            fates_orig[pam] = new_value * fates_orig[pam]
        else:
            logging.debug(f"Parameter {pam} is scalar - setting new value {new_value}")
            fates_orig[pam] = new_value

    validate_file(file_path=file_dump, expected_suffix=".nc", description="Generated FATES parameter file", new_file=True)
    logging.debug(f"Generated FATES parameter file: {file_dump}")
    fates_orig.to_netcdf(file_dump)

    return Path(file_dump).resolve()

generate_chem_in_ppe

generate_chem_in_ppe(
    scale_factor: float,
    input_file: str | Path,
    outfolder_base: str | Path,
    outfolder_name: str,
    verbose: bool = False,
) -> str

Generate a chemistry namelist file for a given scale factor and input file.

Parameters:

scale_factor : float The scale factor to use for the chemistry file. input_file : str | Path The input file to use for the chemistry file. outfolder_base : str | Path The base folder to use for the output files. outfolder_name : str The name of the folder to use for the output files. verbose : bool, optional If True, print verbose output. Default False.

Returns

str The path to the generated chemistry file.

Source code in tinkertool/utils/make_chem_in.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def generate_chem_in_ppe(
    scale_factor:   float,
    input_file:     str | Path,
    outfolder_base: str | Path,
    outfolder_name: str,
    verbose: bool = False,
) -> str:
    """Generate a chemistry namelist file for a given scale factor and input file.

    Parameters:
    -----------
    scale_factor : float
        The scale factor to use for the chemistry file.
    input_file : str | Path
        The input file to use for the chemistry file.
    outfolder_base : str | Path
        The base folder to use for the output files.
    outfolder_name : str
        The name of the folder to use for the output files.
    verbose : bool, optional
        If True, print verbose output. Default False.

    Returns
    -------
    str
        The path to the generated chemistry file.
    """
    # handle input arguments and defaults
    scale_factor = float(scale_factor)
    input_file = Path(input_file).resolve()

    outfolder = Path(outfolder_base).joinpath(outfolder_name).resolve()
    if not outfolder.exists():
        outfolder.mkdir(parents=True)
    outputfile = outfolder.joinpath(f"chem_mech_scale_{scale_factor:.3f}.in")
    if verbose:
        print(
            "creating chem_mech file for scale_factor",
            scale_factor,
            " in\n",
            outputfile,
        )

    with open(input_file, "r") as infile:
        infile_lines = infile.readlines()

    with open(outputfile, "w") as outfile:
        for line in infile_lines:
            replace = False
            if "monoterp" in line or "isoprene" in line:
                if "->" in line:
                    if "+" in line:
                        if ";" in line:
                            replace = True
            if replace:

                yld = line.split("->")[1].split("*")[0].strip()
                new_yld = float(yld) * scale_factor
                new_yld = f"{new_yld:.3f}"
                replacement_text = line.replace(yld, new_yld)
                if verbose:
                    print(f"Replacing \n {line} \n with \n {replacement_text}")
                outfile.write(replacement_text)

            else:
                outfile.write(line)

    return str(outputfile)

check_if_chem_mech_is_perturbed

check_if_chem_mech_is_perturbed(param_ranges: dict) -> bool

Check if the chemistry mechanism is perturbed. The check is performed by looking for specific section headers defined in chem_mech_variable_flags in the parameter ranges file.

Parameters

param_ranges_inpath : str | Path The path to the parameter ranges file.

Returns

bool True if the chemistry mechanism is perturbed, False otherwise.

Source code in tinkertool/utils/make_chem_in.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def check_if_chem_mech_is_perturbed(
    param_ranges: dict
) -> bool:
    """Check if the chemistry mechanism is perturbed. The check is
    performed by looking for specific section headers defined in
    chem_mech_variable_flags in the parameter ranges file.

    Parameters
    ----------
    param_ranges_inpath : str | Path
        The path to the parameter ranges file.

    Returns
    -------
    bool
        True if the chemistry mechanism is perturbed, False otherwise.
    """
    chem_mech_variable_flags = ["SOA_y_scale_chem_mech_in"]

    for param in param_ranges.sections():
        if param in chem_mech_variable_flags:
            return True
    return False

patch_info_detailed

patch_info_detailed()

Patch Logger class to add info_detailed method and custom level.

Source code in tinkertool/utils/custom_logging.py
10
11
12
13
14
15
16
17
18
def patch_info_detailed():
    """Patch Logger class to add info_detailed method and custom level."""
    if not hasattr(logging.Logger, "info_detailed"):
        logging.addLevelName(INFO_DETAILED, "INFO_DETAILED")

        def info_detailed(self, message, *args, **kwargs):
            if self.isEnabledFor(INFO_DETAILED):
                self._log(INFO_DETAILED, message, args, **kwargs)
        setattr(logging.Logger, 'info_detailed', info_detailed)

log_info_detailed

log_info_detailed(logger_name: str, message: str)

Helper function to log info_detailed messages with proper type handling.

Source code in tinkertool/utils/custom_logging.py
20
21
22
23
24
25
26
def log_info_detailed(logger_name: str, message: str):
    """Helper function to log info_detailed messages with proper type handling."""
    logger = logging.getLogger(logger_name)
    if hasattr(logger, 'info_detailed'):
        logger.info_detailed(message)  # type: ignore[attr-defined]
    else:
        logger.info(f"[DETAILED] {message}")

setup_logging

setup_logging(
    verbosity: int,
    log_file: Optional[Union[str, Path]] = None,
    log_mode: str = "w",
    logger_name: str = "tinkertool_log",
)

Set up logging configuration. Both for the root logger and a custom logger.

Parameters

verbosity : int Verbosity level for logging. 0 for WARNING, 1 for INFO, 2 for INFO_DETAILED, 3 for DEBUG. log_file : str or Path, optional Path to the log file where logs will be written. If None, logs will not be saved to a file. Default is None. log_mode : str Mode for opening the log file. 'w' for write (overwrite), 'a' for append. Default is 'w'. logger_name : str Name of the logger. Default is 'tinkertool_log'.

Source code in tinkertool/utils/custom_logging.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def setup_logging(
    verbosity: int,
    log_file: Optional[Union[str, Path]] = None,
    log_mode: str = "w",
    logger_name: str = "tinkertool_log",
):
    """Set up logging configuration. Both for the root logger and a custom logger.

    Parameters
    ----------
    verbosity : int
        Verbosity level for logging. 0 for WARNING, 1 for INFO, 2 for INFO_DETAILED, 3 for DEBUG.
    log_file : str or Path, optional
        Path to the log file where logs will be written. If None, logs will not be saved to a file.
        Default is None.
    log_mode : str
        Mode for opening the log file. 'w' for write (overwrite), 'a' for append.
        Default is 'w'.
    logger_name : str
        Name of the logger. Default is 'tinkertool_log'.
    """
    # root logger do not have a info_detailed level
    # so 2 would yeald debug level in root logger
    # therefore we set the root logger to one level lower than the custom logger
    # when verbosity is greater than 1.
    cime_verbosity = verbosity if verbosity <= 1 else verbosity - 1
    cime_logger(cime_verbosity, log_file, log_mode)
    return custom_logging(verbosity, log_file, log_mode, logger_name)

cime_logger

cime_logger(
    verbosity: int,
    log_file: Optional[Union[str, Path]] = None,
    log_mode: str = "w",
)

Set up the CIME logger with a stream handler and an optional file handler.

If a log file is provided, the CIME logger writes to a file with the same name as log_file, but with .CIME inserted before the suffix (e.g., build_ppe.logbuild_ppe.CIME.log).

Parameters

verbosity : int Verbosity level for logging. 0 for WARNING, 1 for INFO, 3 for DEBUG. log_file : str or Path, optional Path to the base log file. If provided, the CIME logger writes to a file with .CIME added to the stem (e.g., mylog.logmylog.CIME.log). If None, logs are not written to a file. log_mode : str, default 'w' Mode to open the log file. 'w' for overwrite, 'a' for append.

Returns

None

Examples

cime_logger(1, Path("output.log"), "w")

Logs to both stdout and 'output.CIME.log' at INFO level.

cime_logger(0)

Logs only to stdout at WARNING level.

Source code in tinkertool/utils/custom_logging.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def cime_logger(
    verbosity: int,
    log_file: Optional[Union[str, Path]] = None,
    log_mode: str = "w",
):
    """
    Set up the CIME logger with a stream handler and an optional file handler.

    If a log file is provided, the CIME logger writes to a file with the same name as `log_file`,
    but with `.CIME` inserted before the suffix (e.g., `build_ppe.log` → `build_ppe.CIME.log`).

    Parameters
    ----------
    verbosity : int
        Verbosity level for logging. 0 for WARNING, 1 for INFO, 3 for DEBUG.
    log_file : str or Path, optional
        Path to the base log file. If provided, the CIME logger writes to a file with
        `.CIME` added to the stem (e.g., `mylog.log` → `mylog.CIME.log`). If None, logs
        are not written to a file.
    log_mode : str, default 'w'
        Mode to open the log file. 'w' for overwrite, 'a' for append.

    Returns
    -------
    None

    Examples
    --------
    >>> cime_logger(1, Path("output.log"), "w")
    # Logs to both stdout and 'output.CIME.log' at INFO level.

    >>> cime_logger(0)
    # Logs only to stdout at WARNING level.
    """
    level = {0: logging.WARNING, 1: logging.INFO, 3: logging.DEBUG}.get(
        verbosity, logging.DEBUG
    )

    handlers = [logging.StreamHandler()]
    if log_file is not None:
        cime_log_file = Path(log_file).with_name(
            Path(log_file).stem + ".CIME" + Path(log_file).suffix
        )
        validate_file(cime_log_file, ".log", "log file", new_file=True)
        if not cime_log_file.exists():
            cime_log_file.parent.mkdir(parents=True, exist_ok=True)
            cime_log_file.touch()
        handlers.append(logging.FileHandler(str(cime_log_file), mode=log_mode))

    # Remove any existing handlers to avoid duplicate logs
    cime_logger_obj = logging.getLogger()
    for h in cime_logger_obj.handlers[:]:
        cime_logger_obj.removeHandler(h)

    for handler in handlers:
        handler.setFormatter(
            logging.Formatter(
                "%(asctime)s [%(levelname)s/CIME] %(message)s",
                datefmt="%Y-%m-%d %H:%M:%S",
            )
        )
        cime_logger_obj.addHandler(handler)

    cime_logger_obj.setLevel(level)

custom_logging

custom_logging(
    verbosity: int,
    log_file: Optional[Union[str, Path]] = None,
    log_mode: str = "w",
    logger_name: str = "tinkertool_log",
)

Set up logging configuration. for a custom logger with a custom level.

Parameters

verbosity : int Verbosity level for logging. 0 for WARNING, 1 for INFO, 2 for INFO_DETAILED, 3 for DEBUG. log_file : str or Path, optional Path to the log file where logs will be written. If None, logs will not be saved to a file. Default is None. log_mode : str Mode for opening the log file. 'w' for write (overwrite), 'a' for append. Default is 'w'. logger_name : str Name of the logger. Default is 'tinkertool_log'.

Source code in tinkertool/utils/custom_logging.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def custom_logging(
    verbosity: int,
    log_file: Optional[Union[str, Path]] = None,
    log_mode: str = "w",
    logger_name: str = "tinkertool_log",
):
    """Set up logging configuration. for a custom logger with a custom level.

    Parameters
    ----------
    verbosity : int
        Verbosity level for logging. 0 for WARNING, 1 for INFO, 2 for INFO_DETAILED, 3 for DEBUG.
    log_file : str or Path, optional
        Path to the log file where logs will be written. If None, logs will not be saved to a file.
        Default is None.
    log_mode : str
        Mode for opening the log file. 'w' for write (overwrite), 'a' for append.
        Default is 'w'.
    logger_name : str
        Name of the logger. Default is 'tinkertool_log'.
    """
    # Patch the logging module to add info_detailed level
    patch_info_detailed()

    # Map verbosity to logging levels
    level = {
        0: logging.WARNING,
        1: logging.INFO,
        2: INFO_DETAILED,
        3: logging.DEBUG,
    }.get(verbosity, logging.DEBUG)

    # Set up the logger
    logger = logging.getLogger(logger_name)
    logger.setLevel(level)
    logger.propagate = False
    # Remove all existing handlers
    for handler in logger.handlers[:]:
        logger.removeHandler(handler)

    # Create new handlers
    handlers = [logging.StreamHandler()]
    if log_file is not None:
        log_file = Path(log_file).resolve()
        validate_file(log_file, '.log', "log file", new_file=True)
        if not log_file.exists():
            log_file.parent.mkdir(parents=True, exist_ok=True)
            log_file.touch()
        handlers.append(logging.FileHandler(str(log_file), mode=log_mode))
    # Set the formatter for the handlers
    formatter = logging.Formatter(
        "%(asctime)s [%(levelname)s/{}] %(message)s".format(logger_name.capitalize()),
        datefmt="%Y-%m-%d %H:%M:%S",
    )

    for handler in handlers:
        handler.setFormatter(formatter)
        logger.addHandler(handler)

    return logger

input_with_timer

input_with_timer(
    prompt: str, timeout: int, default: Optional[str] = None
) -> Optional[str]

Get user input with a timeout.

Parameters

prompt : str The prompt message to display to the user. timeout : int The time in seconds to wait for user input before timing out. default : str, optional The default value to return if the timeout is reached. Default is None.

Returns

str or None The user input if provided within the timeout, otherwise the default value.

Source code in tinkertool/utils/custom_logging.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def input_with_timer(
    prompt: str,
    timeout: int,
    default: Optional[str] = None
) -> Optional[str]:
    """Get user input with a timeout.

    Parameters
    ----------
    prompt : str
        The prompt message to display to the user.
    timeout : int
        The time in seconds to wait for user input before timing out.
    default : str, optional
        The default value to return if the timeout is reached. Default is None.

    Returns
    -------
    str or None
        The user input if provided within the timeout, otherwise the default value.
    """
    import signal

    def timeout_handler(signum, frame):
        raise TimeoutError

    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout)
    try:
        user_input = input(prompt)
        signal.alarm(0)  # Disable the alarm
        return user_input
    except TimeoutError:
        print(f"\nInput timed out after {timeout} seconds.")
        return default

type_check_decorator

type_check_decorator(func)

Decorator [1] for type checking of user input to functions at runtime. Known limitations: - Does not support parameterized typechecking as it is based on isinstance [3]. That is does not support hints like list[int] or numpy._typing._array_like._SupportsArray[numpy.dtype[typing.Any]], etc. Therefor aliases like numpy.typing.ArrayLike are not supported. The workaround is to be more specific with the type hints and use things like Union[int, list, numpy.ndarray].

Parameters

func : Callable A function with implemented type hints [2].

Source code in tinkertool/utils/type_check_decorator.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def type_check_decorator(func):
    """
    Decorator [1] for type checking of user input to functions at runtime.
    Known limitations:
        - Does not support parameterized typechecking as it is based on isinstance [3]. That is does not support
          hints like list[int] or numpy._typing._array_like._SupportsArray[numpy.dtype[typing.Any]], etc. Therefor
          aliases like numpy.typing.ArrayLike are not supported. The workaround is to be more specific with the type hints
          and use things like Union[int, list, numpy.ndarray].

    Parameters
    ----------
    func : Callable
        A function with implemented type hints [2].
    """

    def wrapper(*args, **kwargs):
        type_hints = get_type_hints(func)
        sig = inspect.signature(func)
        bound_args = sig.bind(*args, **kwargs)
        bound_args.apply_defaults()
        user_passed_args = bound_args.arguments.items()

        for name, value in user_passed_args:
            if name in type_hints:
                expected_type = type_hints[name]
                origin = get_origin(expected_type)
                args_ = get_args(expected_type)

                if origin is Union:
                    # Remove NoneType from args for Optional
                    valid_types = tuple(arg for arg in args_ if arg is not type(None))
                    if value is not None and not isinstance(value, valid_types):
                        raise TypeError(
                            f"Argument '{name}' must be of type {expected_type}, but got {type(value)}"
                        )
                else:
                    if value is not None and not isinstance(value, expected_type):
                        raise TypeError(
                            f"Argument '{name}' must be of type {expected_type}, but got {type(value)}"
                        )
        return func(*args, **kwargs)

    return wrapper