Skip to content

Generate Parameter File

ParameterFileConfig dataclass

ParameterFileConfig(
    verbose: int = 0,
    log_dir: Path | str = "",
    log_mode: str = "w",
    *,
    param_ranges_inpath: Path,
    param_sample_outpath: Path,
    nmb_sim: int | None = None,
    method: str = "latin_hypercube",
    chem_mech_file: Path | None = None,
    ctsm_default_param_file: Path | None = None,
    fates_default_param_file: Path | None = None,
    tinkertool_output_dir: Path = resolve(),
    optimization: str | None = None,
    avoid_scramble: bool = False,
    params: list = None,
    exclude_default: bool = False
)

Bases: BaseConfig

Parameter file generation configuration.

get_checked_and_derived_config

get_checked_and_derived_config() -> ParameterFileConfig

Performs full, I/O-dependent validation and populates derived fields on the instance. This method should be called once at the start of the application. It now returns self.

Source code in tinkertool/scripts/generate_paramfile/config.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def get_checked_and_derived_config(self) -> 'ParameterFileConfig':
    """
    Performs full, I/O-dependent validation and populates derived fields on the instance.
    This method should be called once at the start of the application.
    It now returns `self`.
    """
    super().get_checked_and_derived_config()
    # If derived fields are already populated, we can assume this has been run.
    if self.param_ranges is not None:
        logging.debug("Configuration already checked. Skipping full validation.")
        return self

    # --- Read files and perform heavy validation ---
    self.param_ranges = read_config(self.param_ranges_inpath)

    # --- Populate derived fields ---
    # params
    if self.params is not None and len(self.params) > 0:
        for param in self.params:
            if param not in self.param_ranges:
                raise ValueError(f"Parameter '{param}' not found in parameter ranges file {self.param_ranges_inpath}.")
        # Subset param_ranges to only include specified params
        for section in list(self.param_ranges.sections()):
            if section not in self.params:
                self.param_ranges.remove_section(section)
    else:
        self.params = list(self.param_ranges.sections())
    self.nparams = len(self.params)

    # param_sample_outpath (interactive check)
    if self.param_sample_outpath.exists():
        overwrite = input(f"Output file {self.param_sample_outpath} already exists. Overwrite? (y/n): ").strip().lower()
        if overwrite != 'y':
            logging.info(f"Exiting without overwriting existing file.")
            sys.exit(0)
    else:
        self.param_sample_outpath.parent.mkdir(parents=True, exist_ok=True)

    # Check for perturbations and validate corresponding files
    self.change_chem_mech = check_if_chem_mech_is_perturbed(self.param_ranges)
    if self.change_chem_mech:
        if self.chem_mech_file is None:
            raise ValueError("Chemistry perturbations detected, but 'chem_mech_file' was not provided.")
        validate_file(self.chem_mech_file, '.in', "Chemistry mechanism file", new_file=False)

    str_buffer = io.StringIO()
    self.param_ranges.write(str_buffer)
    cfg_str = str_buffer.getvalue()
    self.change_ctsm_params = check_if_ctsm_param_is_perturbed(cfg_str)
    if self.change_ctsm_params:
        if self.ctsm_default_param_file is None:
            raise ValueError("CTSM perturbations detected, but 'ctsm_default_param_file' was not provided.")
        validate_file(self.ctsm_default_param_file, '.nc', "Default CTSM parameter file", new_file=False)

    self.change_fates_params = check_if_fates_param_is_perturbed(str(self.param_ranges_inpath))
    if self.change_fates_params:
        if self.fates_default_param_file is None:
            raise ValueError("FATES perturbations detected, but 'fates_default_param_file' was not provided.")
        validate_file(self.fates_default_param_file, '.nc', "Default FATES parameter file", new_file=False)

    # tinkertool_output_dir
    self.tinkertool_output_dir.mkdir(parents=True, exist_ok=True)

    # avoid_scramble
    self.scramble = not self.avoid_scramble
    if self.method in ['one_at_a_time', 'oat']:
        self.nmb_sim = len(self.params)*2 # this get update if there interdependet parameters
        self.scramble = False
    # exclude_default
    if self.exclude_default:
        if self.nmb_sim == 0:
            raise ValueError("nmb_sim=0 is not allowed when exclude_default=True.")
        self.nmb_sim_dim = np.arange(1, self.nmb_sim + 1)
    else:
        self.nmb_sim_dim = np.arange(0, self.nmb_sim + 1)

    logging.info("Configuration successfully checked and derived fields populated.")
    return self

generate_chem_mech_files

generate_chem_mech_files(
    sample_points: dict, config: ParameterFileConfig
) -> list

Generate chemistry mechanism files and return list of generated file paths.

This mutates sample_points by removing the 'SOA_y_scale_chem_mech_in' entry when present.

Source code in tinkertool/scripts/generate_paramfile/generate_paramfile.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def generate_chem_mech_files(sample_points: dict, config: ParameterFileConfig) -> list:
    """Generate chemistry mechanism files and return list of generated file paths.

    This mutates `sample_points` by removing the
    'SOA_y_scale_chem_mech_in' entry when present.
    """

    chem_mech_in = []
    if sample_points.get("SOA_y_scale_chem_mech_in", None):
        SOA_y_scale_chem_mech_in = sample_points["SOA_y_scale_chem_mech_in"]

        for scale_factor in SOA_y_scale_chem_mech_in[1]:
            outfile = generate_chem_in_ppe(
                scale_factor=scale_factor,
                input_file=config.chem_mech_file,
                outfolder_base=config.tinkertool_output_dir,
                outfolder_name="chem_mech_files",
                verbose=True if config.verbose > 2 else False,
            )
            chem_mech_in.append(outfile)

            logging.getLogger().info_detailed(
                f"{outfile} generated with SOA_y_scale_chem_mech_in = {scale_factor}"
            )

        # remove the entry so it is not written to the final dataset
        del sample_points["SOA_y_scale_chem_mech_in"]

    return chem_mech_in

generate_latin_hypercube_sample_points

generate_latin_hypercube_sample_points(
    checked_config: ParameterFileConfig,
) -> dict

Generate sample_points using a Latin Hypercube and scale to ranges.

Returns a dict mapping parameter name -> (["nmb_sim"], values_array).

Source code in tinkertool/scripts/generate_paramfile/generate_paramfile.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def generate_latin_hypercube_sample_points(checked_config: ParameterFileConfig) -> dict:
    """Generate sample_points using a Latin Hypercube and scale to ranges.

    Returns a dict mapping parameter name -> (["nmb_sim"], values_array).
    """
    logging.debug("Generating Latin Hypercube sample")
    opt_literal = cast("Literal['random-cd','lloyd'] | None", checked_config.optimization)
    interdependent_params = [param for param in checked_config.params if safe_get_param_value(checked_config.param_ranges[param], "interdependent_with") is not None]
    hypc_params = [param for param in checked_config.params if param not in interdependent_params]
    hypc_param_paramindx_map = {param: indx for indx, param in enumerate(hypc_params)}

    hypc = stats.qmc.LatinHypercube(
        len(hypc_param_paramindx_map),
        scramble=checked_config.scramble,
        optimization=opt_literal
    )
    hyp_cube_params = hypc.random(checked_config.nmb_sim)

    logging.debug(f"Hypersample shape ({checked_config.nmb_sim_dim}, n_params - n_interdependent_params): {hyp_cube_params.shape}")

    sample_points = {}
    # Scale the values to the parameter ranges
    # and add component information
    for param in checked_config.params:
        pdata = checked_config.param_ranges[param]

        defaultv = safe_get_param_value(pdata, "default")
        assert defaultv is not None, f"Default value for parameter '{param}' cannot be None."

        # Use safe parameter access to handle nan values properly
        scale_fact: float | None = safe_get_param_value(pdata, "scale_fact")
        if scale_fact is not None:
            minv = float(defaultv) - float(defaultv)*float(scale_fact)
            maxv = float(defaultv) + float(defaultv)*float(scale_fact)
        else:
            minv_raw = safe_get_param_value(pdata, "min")
            maxv_raw = safe_get_param_value(pdata, "max")
            assert minv_raw is not None, f"Min value for parameter '{param}' cannot be None when scale_fact is not provided."
            assert maxv_raw is not None, f"Max value for parameter '{param}' cannot be None when scale_fact is not provided."
            minv = float(minv_raw)
            maxv = float(maxv_raw)

        logging.debug(f"Parameter '{param}': min={minv}, max={maxv}, default={defaultv}, scale_fact={scale_fact}")

        # check what index in hyp_cube_params to use
        # if the param is not interdependent, use hypc_param_paramindx_map[param]
        # if it is interdependent use the index of the param
        # that it is interdependent with
        inverse_scaling = False
        param_to_index_with = param
        if param not in hypc_param_paramindx_map: # then it is interdependent
            assert param in interdependent_params, f"Parameter '{param}' not found in hypc_param_paramindx_map or interdependent_params."
            param_to_index_with = safe_get_param_value(pdata, "interdependent_with")
            # check if we are to use inverse scaling by checking for
            # "-" prefix/first character
            if param_to_index_with.startswith("-"):
                param_to_index_with = param_to_index_with[1:]  # remove the "-" character
                logging.debug(f"Parameter '{param}' is interdependent with '{param_to_index_with}' using inverse scaling.")
                # Inverse scaling: we will later do maxv - scaled_value + minv
                # to flip the scaling
                # This is handled in scale_values function by passing minv and maxv swapped
                minv, maxv = maxv, minv
                inverse_scaling = True
            assert param_to_index_with is not None, f"Parameter '{param}' is interdependent but 'interdependent_with' is None."
        i_use = hypc_param_paramindx_map[param_to_index_with]
        logging.debug(f"Parameter '{param}' uses index {i_use} from hyp_cube_params.")

        sampling_method = safe_get_param_value(pdata, "sampling")
        if sampling_method == 'log':
            out_array = np.zeros(len(checked_config.nmb_sim_dim))
            long_vals = scale_values(hyp_cube_params[:,i_use], np.log10(minv), np.log10(maxv))
            if checked_config.exclude_default:
                out_array = 10**long_vals
            else:
                out_array[0] = float(pdata["default"])
                out_array[1:] = 10**long_vals

        elif sampling_method == 'linear':
            out_array = np.zeros(len(checked_config.nmb_sim_dim))
            long_vals = scale_values(hyp_cube_params[:,i_use], minv, maxv)
            if checked_config.exclude_default:
                out_array = long_vals
            else:
                out_array[0] = float(pdata["default"])
                out_array[1:] = long_vals
        else:
            err_msg = f"Unknown sampling method '{sampling_method}' for parameter '{param}'. Supported methods are {', '.join(VALID_SAMPLING_METHODS)}."
            logging.error(err_msg)
            raise ValueError(err_msg)

        ndigits = safe_get_param_value(pdata, "ndigits")
        if ndigits is not None:
            # Convert to float first, then int to handle strings like '5.0'
            out_array = np.around(out_array, int(float(ndigits)))

        if inverse_scaling:
            _test_ranges(maxv, minv, param, out_array)
        else:
            _test_ranges(minv, maxv, param, out_array)
        sample_points[param] = (["nmb_sim"], out_array)

    return sample_points

generate_one_at_a_time_sample_points

generate_one_at_a_time_sample_points(
    checked_config: ParameterFileConfig,
) -> dict

Generate sample_points for one-at-a-time tests.

Behavior: - The first simulation (index 0) is the default value for all parameters. - Subsequent simulations vary one parameter at a time. With its minimum and maximum value for each parameter. Parameters that are not being varied retain their default value.

Source code in tinkertool/scripts/generate_paramfile/generate_paramfile.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def generate_one_at_a_time_sample_points(checked_config: ParameterFileConfig) -> dict:
    """Generate sample_points for one-at-a-time tests.

    Behavior:
    - The first simulation (index 0) is the default value for all parameters.
    - Subsequent simulations vary one parameter at a time. With its minimum and maximum 
      value for each parameter. Parameters that are not being varied retain their default value.
    """
    sample_points = {}

    interdependent_params = [param for param in checked_config.params if safe_get_param_value(checked_config.param_ranges[param], "interdependent_with") is not None]
    params = [param for param in checked_config.params if param not in interdependent_params]
    param_paramindx_map = {param: indx for indx, param in enumerate(params)}
    nparams = len(params)
    n_total = nparams*2 
    # Determine how many variation sims per parameter

    n_variations_base = (n_total - 1) // nparams
    remainder = (n_total - 1) % nparams
    variations_per_param = 2

    # Prepare matrix filled with defaults
    if checked_config.exclude_default:
        values = np.zeros((n_total, len(checked_config.params)), dtype=float)
    else: 
        values = np.zeros((n_total+1, len(checked_config.params)), dtype=float)

    # So here we set the 
    for j, param in enumerate(checked_config.params):
        pdata = checked_config.param_ranges[param]
        default_val = float(pdata.get("default", None))
        if default_val is None:
            raise ValueError(f"Parameter {param} has no default value defined.")
        values[:, j] = default_val

    if checked_config.exclude_default == False:
        idx = 1  # start after default
    else:
        idx = 0  # start at beginning
    for j, param in enumerate(checked_config.params):
        pdata = checked_config.param_ranges[param]

        if pdata.get("scale_fact", None):
            minv = float(pdata["default"]) - float(pdata["default"]) * float(
                pdata["scale_fact"]
            )
            maxv = float(pdata["default"]) + float(pdata["default"]) * float(
                pdata["scale_fact"]
            )
        else:
            minv = float(pdata["min"])
            maxv = float(pdata["max"])

        n_var = variations_per_param

        var_vals = np.linspace(minv, maxv, n_var)
        inverse_scaling = False
        param_to_index_with = param
        if param not in param_paramindx_map: # then it is interdependent
            assert param in interdependent_params, f"Parameter '{param}' not found in aram_paramindx_map or interdependent_params."
            param_to_index_with = safe_get_param_value(pdata, "interdependent_with")
            if param_to_index_with.startswith("-"):
                param_to_index_with = param_to_index_with[1:]  # remove the "-" character
                logging.debug(f"Parameter '{param}' is interdependent with '{param_to_index_with}' using inverse scaling.")
                # Inverse scaling: we will later do maxv - scaled_value + minv
                # to flip the scaling
                # This is handled in scale_values function by passing minv and maxv swapped
                minv, maxv = maxv, minv
                inverse_scaling = True

        i_use = param_paramindx_map[param_to_index_with]
        for k in range(n_var):
            values[i_use+idx, j] = var_vals[k]


    # Build sample_points dict with rounding
    for j, param in enumerate(checked_config.params):
        pdata = checked_config.param_ranges[param]
        out_array = values[:, j]
        if pdata.get("ndigits", None):
            out_array = np.around(out_array, int(pdata["ndigits"]))
        sample_points[param] = (["nmb_sim"], out_array)

    return sample_points, n_total

generate_paramfile

generate_paramfile(config: ParameterFileConfig) -> None

Top-level function to generate parameter files based on the provided configuration.

config : ParameterFileConfig The configuration object containing all necessary information for parameter file generation.

Source code in tinkertool/scripts/generate_paramfile/generate_paramfile.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
def generate_paramfile(config: ParameterFileConfig) -> None:
    """
    Top-level function to generate parameter files based on the provided configuration.

    Arguments:
    config : ParameterFileConfig
        The configuration object containing all necessary information for parameter file generation.
    """
    logging.info("> Starting parameter file generation")

    # check if ParameterFileConfig is valid
    logging.debug(f"Checking config: {config.describe(return_string=True)}") # type: ignore
    checked_config: ParameterFileConfig = config.get_checked_and_derived_config()
    log_info_detailed('tinkertool_log', f">> Generating with config: {checked_config.describe(return_string=True)}") # type: ignore
    n_total = len(checked_config.nmb_sim_dim)
    if checked_config.method == 'one_at_a_time' or checked_config.method == 'oat':
        sample_points, n_total = generate_one_at_a_time_sample_points(checked_config)
        if n_total != len(checked_config.nmb_sim_dim):
            if checked_config.exclude_default:
                nmb_sim_dim = np.arange(1, n_total + 1)
            else:
                nmb_sim_dim = np.arange(0, n_total + 1)
        else:
            nmb_sim_dim = checked_config.nmb_sim_dim
    elif n_total <= 1:
        for param in checked_config.params:
            pdata = checked_config.param_ranges[param]
            out_array = np.array([float(pdata.get("default", 0.0))])
            sample_points[param] = (["nmb_sim"], out_array)
    elif checked_config.method == 'latin_hypercube' or checked_config.method == 'lh':
        sample_points = generate_latin_hypercube_sample_points(checked_config)
        nmb_sim_dim = checked_config.nmb_sim_dim
    else:
        raise ValueError(f"Invalid sampling method '{checked_config.method}'.")

    sample_points_with_files = sample_points.copy()

    if config.change_chem_mech:
        logging.debug("Generating chemistry mechanism files")
        chem_mech_in = generate_chem_mech_files(sample_points, config)

    generate_land_model_param_files(sample_points, sample_points_with_files, checked_config)

    logging.debug("Creating xarray dataset")
    raw_ds = xr.Dataset(
        data_vars = sample_points,
        coords={'nmb_sim': nmb_sim_dim}
    )
    out_ds = xr.Dataset(
        data_vars = sample_points_with_files,
        coords={'nmb_sim': nmb_sim_dim}
    )

    for ds in [raw_ds, out_ds]:
        for param in ds.data_vars:
            pdata = checked_config.param_ranges[str(param)]
            ds[param].attrs['description'] = safe_get_param_value(pdata, 'description', 'No description available')
            ds[param].attrs['default'] = safe_get_param_value(pdata, 'default', 'No default value available')
            ds[param].attrs['sampling'] = safe_get_param_value(pdata, 'sampling', 'No sampling method available')

            input_type = safe_get_param_value(pdata, 'input_type', "user_nl")
            assert not isinstance(input_type, str) or input_type.lower() in [it.lower() for it in PARAMFILE_INPUT_TYPES], \
                f"Invalid input type '{input_type}, type({type(input_type)})' for parameter '{param}'. Supported types are: {PARAMFILE_INPUT_TYPES}."
            ds[param].attrs['input_type'] = input_type
            ds[param].attrs['interdependent_with'] = safe_get_param_value(pdata, "interdependent_with", "")
            ds[param].attrs['format_to_file_method'] = safe_get_param_value(pdata, "format_to_file_method", "write-lines")
            component = pdata.get('esm_component')
            if not isinstance(component, str):
                err_msg = f"The component passed to param {param} is of type {type(component)}, expected type str"
                logging.error(err_msg)
                raise TypeError(err_msg)
            if component.lower() not in VALID_COMPONENTS:
                err_msg = f"Invalid component '{component}' for parameter '{param}'. Supported components are: {VALID_COMPONENTS}."
                logging.error(err_msg)
                raise ValueError(err_msg)
            ds[param].attrs['esm_component'] = component.lower()

    # Add variables with irregular names
    if checked_config.change_chem_mech:
        out_ds['chem_mech_in'] = (['nmb_sim'], chem_mech_in)
    current_time = datetime.now().replace(microsecond=0)
    # Assigning to your attribute
    for ds in [raw_ds, out_ds]:
        ds.attrs['created'] = f"Created " + current_time.strftime('%Y-%m-%d %H:%M:%S')
    raw_ds.to_netcdf(checked_config.param_sample_outpath.with_suffix('.raw.nc'))
    out_ds.to_netcdf(checked_config.param_sample_outpath.with_suffix('.nc'))

    logging.info(f">> Raw parameter file (only numbers) {checked_config.param_sample_outpath.with_suffix('.raw.nc')} generated successfully.")
    logging.info(f">> Parameter file (with filepaths) {checked_config.param_sample_outpath.with_suffix('.nc')} generated successfully.")

visualize_paramfile

visualize_paramfile(
    paramfile_path: str | Path,
    save_path: str | Path | None = None,
    show: bool = False,
)

Visualize the paramfile in a pairplot for each dimension.

Parameters

paramfile_path : str | Path path to the parameter file in netCDF format. save_path : str | Path, optional path to save the plot. If None, saves to same directory as paramfile. show : bool, optional whether to show the plot (requires GUI). Default False for headless systems.

Source code in tinkertool/scripts/generate_paramfile/visualize_paramfile.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def visualize_paramfile(
    paramfile_path: str | Path,
    save_path: str | Path | None = None,
    show: bool = False
):
    """Visualize the paramfile in a pairplot for each dimension.

    Parameters
    ----------
    paramfile_path : str | Path
        path to the parameter file in netCDF format.
    save_path : str | Path, optional
        path to save the plot. If None, saves to same directory as paramfile.
    show : bool, optional
        whether to show the plot (requires GUI). Default False for headless systems.
    """
    paramfile_path = Path(paramfile_path)
    if not paramfile_path.is_file():
        raise FileNotFoundError(f"Parameter file not found: {paramfile_path}")

    # Load the parameter file into a DataFrame
    df = xr.open_dataset(paramfile_path).to_dataframe()

    # plot pairplot
    pairplot = sns.pairplot(df)
    # overlay the highlighted point on every off-diagonal scatter axis
    # TODO: make which point to highlight configurable and optional
    # right now it assumes the point to highlight is the first row
    # which is generally true if exclude_default=False in the paramfile generation.
    # However, this may not always be the case.
    default_row = df.iloc[0]
    vars = list(df.columns)
    for i, yvar in enumerate(vars):
        for j, xvar in enumerate(vars):
            ax = pairplot.axes[i, j]
            if i == j:
                # diagonal: draw a vertical line at the value
                ax.axvline(default_row[yvar], color="red", linewidth=2, zorder=5)
            else:
                # off-diagonal: plot the single highlighted point
                ax.scatter(default_row[xvar], default_row[yvar],
                        color="red", edgecolor="k", s=80, zorder=100)

    # Save or show the plot
    if save_path is None:
        # Default: save to same directory as paramfile
        save_path = paramfile_path.parent.joinpath(f"{paramfile_path.stem}_pairplot.png")
    else:
        save_path = Path(save_path).resolve()
        save_path.parent.mkdir(parents=True, exist_ok=True)

    # Save the plot
    pairplot.savefig(save_path, dpi=300, bbox_inches='tight')
    print(f"Plot saved to: {save_path}")

    # Optionally show (only if GUI available)
    if show:
        try:
            plt.show()
        except Exception as e:
            print(f"Could not show plot (no display available): {e}")
            print(f"Plot has been saved to: {save_path}")

    plt.close()  # Clean up to avoid memory issues