Skip to content

PPE setup interface

iterate_dict_to_set_value

iterate_dict_to_set_value(
    case: case, settings_dict: dict, dict_name: str
)

Iterate through a dictionary and set the values in the case object

Parameters

case : CIME.case The case object to be updated settings_dict : dict Dictionary of settings to be applied to the case

Source code in tinkertool/setup/case.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def iterate_dict_to_set_value(case: CIME.case, settings_dict: dict, dict_name: str):
    """
    Iterate through a dictionary and set the values in the case object

    Parameters
    ----------
    case : CIME.case
        The case object to be updated
    settings_dict : dict
        Dictionary of settings to be applied to the case
    """
    all_success = True
    for key, value in settings_dict.items():
        if not set_value_with_status_update(case, key, value, kill_on_error=False):
            all_success = False

    if all_success:
        logger.debug(f"All key: value pairs where successfully set for {dict_name}: {settings_dict}")

build_base_case

build_base_case(
    basecaseroot: Path,
    overwrite: bool,
    case_settings: dict,
    env_pe_settings: dict,
    env_run_settings: dict,
    env_build_settings: dict,
    namelist_collection_dict: dict,
    paramDataset: Dataset,
    pdim: str,
) -> Path

Create and build the base case that all PPE cases are cloned from

Parameters

basecaseroot : Path The base case root directory overwrite : bool Overwrite existing cases case_settings : dict Dictionary of case settings env_pe_settings : dict Dictionary of environment parallel execution settings env_run_settings : dict Dictionary of environment run settings namelist_collection_dict : dict Dictionary of namelist collections for the different components verbose : int Verbosity level for logging

Returns

Path The root directory of the base case

Source code in tinkertool/setup/case.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def build_base_case(
    basecaseroot:               Path,
    overwrite:                  bool,
    case_settings:              dict,
    env_pe_settings:            dict,
    env_run_settings:           dict,
    env_build_settings:         dict,
    namelist_collection_dict:   dict,
    paramDataset:               xr.Dataset,
    pdim: str
) -> Path:
    """
    Create and build the base case that all PPE cases are cloned from

    Parameters
    ----------
    basecaseroot : Path
        The base case root directory
    overwrite : bool
        Overwrite existing cases
    case_settings : dict
        Dictionary of case settings
    env_pe_settings : dict
        Dictionary of environment parallel execution settings
    env_run_settings : dict
        Dictionary of environment run settings
    namelist_collection_dict : dict
        Dictionary of namelist collections for the different components
    verbose : int
        Verbosity level for logging

    Returns
    -------
    Path
        The root directory of the base case
    """

    if overwrite and basecaseroot.is_dir():
        shutil.rmtree(basecaseroot)
    with Case(str(basecaseroot), read_only=False) as case:
        logging.debug(dir(case))
        if not basecaseroot.is_dir():
            logger.info(">>> BUILDING BASE CASE...")
            logger.info('Creating base case directory: {}'.format(basecaseroot))
            # create the case using the case_settings
            case.create(
                casename=basecaseroot.name,
                srcroot=case_settings.pop("cesmroot"),
                compset_name=case_settings.pop("compset"),
                grid_name=case_settings.pop("res"),
                machine_name=case_settings.pop("mach"),
                project=case_settings.pop("project"),
                driver="nuopc",
                run_unsupported=True,
                answer="r",
                **case_settings,
            )
            case.record_cmd(init=True)

        else:
            logger.info('Reusing existing case directory: {}'.format(str(basecaseroot)))


        # set the case environment variables
        # first using the case's own values
        set_value_with_status_update(case, "EXEROOT", case.get_value("EXEROOT", resolved=True), kill_on_error=True)
        set_value_with_status_update(case, "RUNDIR", case.get_value("RUNDIR", resolved=True), kill_on_error=True)
        # set the PE settings
        # check if the env_pe_settings are not empty dict, or all entries are None
        if not env_pe_settings or all(
            value is None for value in env_pe_settings.values()
        ):
            logging.warning(
                "No environment parallel execution settings provided, using default values."
            )
        else:
            logger.info(">>> Setting environment parallel execution settings...")
            iterate_dict_to_set_value(
                case=case, settings_dict=env_pe_settings, dict_name="env_pe_settings"
            )

        logger.info(">>> base case_setup...")
        case.case_setup()

        logger.info(">>> Setting environment run settings...")
        # set the run settings
        set_value_with_status_update(case, "RUN_TYPE", env_run_settings.pop("RUN_TYPE"), kill_on_error=False)
        set_value_with_status_update(case, 'JOB_WALLCLOCK_TIME', env_run_settings.pop('JOB_WALLCLOCK_TIME_RUN'), subgroup='case.run', kill_on_error=False)
        set_value_with_status_update(case, 'JOB_WALLCLOCK_TIME', env_run_settings.pop('JOB_WALLCLOCK_TIME_ARCHIVE'), subgroup='case.st_archive', kill_on_error=False)
        set_value_with_status_update(case, 'JOB_WALLCLOCK_TIME', env_run_settings.pop('JOB_WALLCLOCK_TIME_COMPRESS'), subgroup='case.compress', kill_on_error=False)
        if env_run_settings.get("GET_REFCASE") is not None:
            set_value_with_status_update(case, "GET_REFCASE", env_run_settings.pop("GET_REFCASE"), kill_on_error=False)
        if env_run_settings.get("RUN_REFCASE") is not None:
            set_value_with_status_update(case, "RUN_REFCASE", env_run_settings.pop('RUN_REFCASE'), kill_on_error=False)
        if any(env_run_settings.get(key) is not None for key in ['RUN_REFDIR', 'RUN_REFDATE']):
            set_value_with_status_update(case, "RUN_REFDIR", env_run_settings.pop("RUN_REFDIR"), kill_on_error=False)
            set_value_with_status_update(case, "RUN_REFDATE", env_run_settings.pop("RUN_REFDATE"), kill_on_error=False)

        set_value_with_status_update(case, "STOP_OPTION",env_run_settings.pop("STOP_OPTION"), kill_on_error=False)
        set_value_with_status_update(case, "STOP_N",env_run_settings.pop("STOP_N"), kill_on_error=False)
        set_value_with_status_update(case, "RUN_STARTDATE",env_run_settings.pop("RUN_STARTDATE"), kill_on_error=False)

        if any(env_run_settings.get(key) is not None for key in ['REST_N', 'REST_OPTION']):
            set_value_with_status_update(case, "REST_OPTION", env_run_settings.pop("REST_OPTION"), kill_on_error=False)
            set_value_with_status_update(case, "REST_N", env_run_settings.pop("REST_N"), kill_on_error=False)

        if env_run_settings.get('CAM_CONFIG_OPTS') is not None:
            if env_run_settings.get('cam_onopts'):
                logging.warning(
                    "Both 'CAM_CONFIG_OPTS' and 'cam_onopts' were provided. "
                    "'CAM_CONFIG_OPTS' will overwrite all previous options including 'cam_onopts'."
                )
            set_value_with_status_update(case, 'CAM_CONFIG_OPTS', env_run_settings.pop('CAM_CONFIG_OPTS'), kill_on_error=False)
        elif env_run_settings.get('cam_onopts') is not None:
            current_opts = case.get_value('CAM_CONFIG_OPTS', resolved=True)
            new_opts = f"{current_opts} {env_run_settings.pop('cam_onopts')}".strip()
            set_value_with_status_update(case, 'CAM_CONFIG_OPTS', new_opts, kill_on_error=False)

        # check if there are any additional run settings
        if env_run_settings or any(
            value is not None for value in env_run_settings.values()
        ):
            iterate_dict_to_set_value(
                case=case, settings_dict=env_run_settings, dict_name="env_run_settings"
            )

        set_value_with_status_update(case, "DEBUG", env_build_settings.pop("DEBUG", "FALSE"), kill_on_error=False)
        # check if the env_build_settings are not empty dict, or all entries are None
        if not env_build_settings or all(
            value is None for value in env_build_settings.values()
        ):
            logging.warning(
                "No environment build settings provided, using default values."
            )
        else:
            logger.info(">>> Setting environment build settings...")
            iterate_dict_to_set_value(
                case=case,
                settings_dict=env_build_settings,
                dict_name="env_build_settings",
            )

        logger.info(">>> base case write user_nl files...")
        # write user_nl files
        fStringParameters = {}
        for var in paramDataset:
            if paramDataset[var].attrs.get("format_to_file_method", None) == "f-string":
                esm_component = paramDataset[var].attrs["esm_component"]
                if fStringParameters.get(esm_component, None) is None:
                    fStringParameters[esm_component] = {}
                fStringParameters[esm_component][var] = paramDataset[var].isel({pdim:0}).values.item()

        for nl_control_name in namelist_collection_dict.keys():
            # get the component name from the file name assuming control_<component> using the name in the .ini file
            component_name = nl_control_name.split('_')[-1]
            user_nl_str = setup_usr_nlstring(namelist_collection_dict[nl_control_name], component_name=component_name)

            if fStringParameters.get(component_name, None) is not None:
                user_nl_str = user_nl_str.format(**fStringParameters[component_name])
            write_user_nl_file(str(basecaseroot), f"user_nl_{component_name}", user_nl_str)

        logger.info(">> base case_build...")
        build.case_build(basecaseroot, case=case)

    return basecaseroot

clone_base_case

clone_base_case(
    baseroot: Path,
    basecaseroot: Path,
    overwrite: bool,
    paramDataset: Dataset,
    componentdict: dict,
    ensemble_idx: str,
    namelist_collection_dict: dict,
    path_base_input: Path = Path(""),
    keepexe: bool = False,
    **kwargs
) -> Path

Clone the base case and update the namelist parameters

Parameters

baseroot : Path The base directory for the cases basecaseroot : Path The base case root directory overwrite : bool Overwrite existing cases paramDataset : xr.Dataset Dataset of namelist parameters to be updated componentdict : dict Dictionary of component names for the parameters ensemble_idx : str The ensemble index for the new case path_base_input : Path The path to the base input files keepexe : bool Keep the executable files **kwargs : dict Additional keyword arguments to be passed to the case updates

Returns

Path The root directory of the cloned case

Source code in tinkertool/setup/case.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
def clone_base_case(
    baseroot:           Path,
    basecaseroot:       Path,
    overwrite:          bool,
    paramDataset:       xr.Dataset,
    componentdict:      dict,
    ensemble_idx:       str,
    namelist_collection_dict: dict,
    path_base_input:    Path=Path(''),
    keepexe:            bool=False,
    **kwargs
) -> Path:
    """
    Clone the base case and update the namelist parameters

    Parameters
    ----------
    baseroot : Path
        The base directory for the cases
    basecaseroot : Path
        The base case root directory
    overwrite : bool
        Overwrite existing cases
    paramDataset : xr.Dataset
        Dataset of namelist parameters to be updated
    componentdict : dict
        Dictionary of component names for the parameters
    ensemble_idx : str
        The ensemble index for the new case
    path_base_input : Path
        The path to the base input files
    keepexe : bool
        Keep the executable files
    **kwargs : dict
        Additional keyword arguments to be passed to the case updates

    Returns
    -------
    Path
        The root directory of the cloned case
    """

    logger.info(">>> CLONING BASE CASE for member {}...".format(ensemble_idx))
    cloneroot = baseroot.joinpath(f'ensemble_member.{ensemble_idx}')
    # should be able to overwrite cloned cases independently of base case... 
    if overwrite and cloneroot.exists():
        shutil.rmtree(cloneroot)
    if not cloneroot.exists():
        logger.debug(f"Creating clone directory: {cloneroot}")
        logger.debug(f"cloneroot type: {type(cloneroot)}")
        with Case(str(basecaseroot), read_only=False) as clone:
            clone.create_clone(str(cloneroot), keepexe=keepexe)
    fstings_params = [False if paramDataset[var].attrs.get("format_to_file_method", None) != "f-string" else True for var in paramDataset]
    logging.info(f"f-string parameters present: {fstings_params}")
    if any(fstings_params) == True:
        namelist_dict = namelist_collection_dict
    else:
        logging.info("No f-string parameters present, setting namelist_collection_dict to None")
        namelist_dict = None    
    with Case(str(cloneroot), read_only=False) as case:
        _per_run_case_updates(
            case=case,
            paramDataset=paramDataset,
            componentdict=componentdict,
            ens_idx=ensemble_idx,
            path_base_input=str(path_base_input),
            keepexe=keepexe,
            namelist_collection_dict=namelist_dict,
            **kwargs,
        )

    return Path(cloneroot)

take

take(n, iterable)

Return first n items of the iterable as a list

Source code in tinkertool/setup/case.py
476
477
478
def take(n, iterable):
    "Return first n items of the iterable as a list"
    return list(islice(iterable, n))

format_value

format_value(value: str) -> str

Format a string for Fortran namelist: booleans and numerics are left as-is, comma-separated lists are handled, other strings are quoted.

Parameters

value : str The string to format.

Source code in tinkertool/setup/namelist.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def format_value(value: str) -> str:
    """
    Format a string for Fortran namelist: booleans and numerics are left as-is,
    comma-separated lists are handled, other strings are quoted.

    Parameters
    ----------
    value : str
        The string to format.
    """
    value = value.strip()

    # Handle Fortran logicals
    if value.lower() in ['.true.', '.false.']:
        return_value = value.lower()
    # Handle comma-separated list of numbers or booleans
    elif "," in value:
        vals = [v.strip() for v in value.split(",")]
        if all(re.match(r'^-?\d+(\.\d*)?([eEdD][+-]?\d+)?$', v) or v.lower() in ['.true.', '.false.'] for v in vals):
            # For numeric lists, join without spaces (Fortran convention)
            return_value = ",".join(vals)
        else:
            # For string lists, quote each item and join with commas (no spaces)
            return_value = ",".join(f"'{v}'" for v in vals)
    # Handle single numeric value (int, float, E or D notation)
    elif re.match(r'^-?\d+(\.\d*)?([eEdD][+-]?\d+)?$', value):
        return_value = value
    else:
        # Otherwise, treat as string
        return_value = f"'{value}'"

    logging.debug(f"'format_value': in - {value}, out {return_value}")
    return return_value

setup_usr_nlstring

setup_usr_nlstring(
    user_nl_config: ConfigParser, component_name: str
) -> str

Takes inn configparser objects of default namelist settings for setting dianoistics and control namelist settings.

Parameters:

user_nl_config : configparser.ConfigParser A configparser object containing default namelist settings. component_name : str Name of the component for which the namelist settings are being set up, e.g. 'cam', 'clm', etc. Per now only 'blom' is the exception where the namelist section is not used.

Source code in tinkertool/setup/namelist.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def setup_usr_nlstring(
  user_nl_config: configparser.ConfigParser,
  component_name: str
) -> str:
  """
  Takes inn configparser objects of default namelist settings for setting dianoistics and control namelist settings.

  Parameters:
  -----------
  user_nl_config : configparser.ConfigParser
      A configparser object containing default namelist settings.
  component_name : str
      Name of the component for which the namelist settings are being set up, e.g. 'cam', 'clm', etc.
      Per now only 'blom' is the exception where the namelist section is not used.
  """
  user_nlstring = ""
  section_list = user_nl_config.sections()
  if 'misc' in section_list:
    for key in user_nl_config['misc']:
      value = user_nl_config['misc'][key]
      if any(substring in key for substring in ["fincl", "fexcl"]):
        # Handle multi-line diagnostic lists
        if "\n" in value:
          diag_list = value.split("\n")
          # For string lists, quote each item
          user_nlstring += key + f" = '{diag_list[0]}',\n"
          for diag in diag_list[1:-1]:
            user_nlstring += f"         '{diag}',\n"
          user_nlstring +=  f"         '{diag_list[-1]}'\n"
        else:
          # Single line - use format_value for proper formatting
          user_nlstring += key + " = " + format_value(value) + "\n"
      else:
        user_nlstring += key + " = " + format_value(user_nl_config['misc'][key]) + "\n"
    section_list.remove('misc')
  for section in section_list:
    if component_name.lower() != 'blom':
      user_nlstring += f"&{section}\n"
    for key in user_nl_config[section]:
      value = user_nl_config[section][key]
      if any(substring in key for substring in ["fincl", "fexcl"]):
        # Handle multi-line diagnostic lists
        if "\n" in value:
          diag_list = value.split("\n")
          # For string lists, quote each item
          user_nlstring += key + f" = '{diag_list[0]}',\n"
          for diag in diag_list[1:-1]:
            user_nlstring += f"         '{diag}',\n"
          user_nlstring +=  f"         '{diag_list[-1]}'\n"
        else:
          # Single line - use format_value for proper formatting
          user_nlstring += key + " = " + format_value(value) + "\n"

      elif key.endswith("_specifier"):
        emis_specfier = value.split("\n")
        user_nlstring += key + f" = '{emis_specfier[0]}',\n"
        for emis in emis_specfier[1:-1]:
          user_nlstring += f"                  '{emis}',\n"
        user_nlstring += f"                  '{emis_specfier[-1]}'\n"
      else:
        user_nlstring += key + " = " + format_value(value) + "\n"
    if component_name.lower() != 'blom':
      user_nlstring += "/\n"
    user_nlstring += "\n"
  return user_nlstring

write_user_nl_file

write_user_nl_file(
    caseroot: str, usernlfile: str, user_nl_str: str
) -> None

write user_nl string to file. Here we OVERWRITE the full file content, i.e. creating a new file. This implies that that the contral files should hold all key-value pairs.

Parameters

caseroot : str root directory of the case usernlfile : str name of the user_nl file, e.g. user_nl_cam, user_nl_clm ... user_nl_str : str string to be written to the user_nl file verbose : bool verbose output

Source code in tinkertool/setup/namelist.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def write_user_nl_file(
    caseroot:       str,
    usernlfile:     str,
    user_nl_str:    str
) -> None:
    """write user_nl string to file. Here we OVERWRITE the
    full file content, i.e. creating a new file. This implies that
    that the contral files should hold all key-value pairs.

    Parameters
    ----------
    caseroot : str
        root directory of the case
    usernlfile : str
        name of the user_nl file, e.g. user_nl_cam, user_nl_clm ...
    user_nl_str : str
        string to be written to the user_nl file
    verbose : bool
        verbose output
    """
    user_nl_file = os.path.join(caseroot, usernlfile)
    logger.info(f"...Writing to user_nl file: {usernlfile}")
    with open(user_nl_file, "w") as funl:
        funl.write(user_nl_str)

add_CIME_paths

add_CIME_paths(cesmroot: str | Path) -> None

Add CIME paths to the system path.

Parameters

cesmroot : str | Path Path to the CESM root directory.

Source code in tinkertool/setup/setup_cime_connection.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def add_CIME_paths(
    cesmroot: str | Path
) -> None:
    """Add CIME paths to the system path.

    Parameters
    ----------
    cesmroot : str | Path
        Path to the CESM root directory.
    """
    cime_path = Path(cesmroot).joinpath("cime").resolve()

    if not cime_path.is_dir():
        raise FileNotFoundError(f"CIME directory not found: {cime_path}")

    sys.path.insert(0, str(cime_path))

add_CIME_paths_and_import

add_CIME_paths_and_import(cesmroot: str | Path) -> None

Add CIME paths to the system path and import necessary functions to build and clone cases.

Parameters

cesmroot : str Path to the CESM root directory.

Source code in tinkertool/setup/setup_cime_connection.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def add_CIME_paths_and_import(
    cesmroot: str | Path
) -> None:
    """Add CIME paths to the system path and import necessary functions to build and clone cases.

    Parameters
    ----------
    cesmroot : str
        Path to the CESM root directory.
    """
    cesmroot = Path(cesmroot).resolve()
    add_CIME_paths(cesmroot)
    try:
        from tinkertool.setup.case import build_base_case, clone_base_case
        # Make functions available at module level
        globals()['build_base_case'] = build_base_case
        globals()['clone_base_case'] = clone_base_case
    except ImportError:
        traceback.print_stack()
        err_msg = f"ERROR: CIME not found in {cesmroot}, update CESMROOT environment variable"
        print(err_msg)